You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2019/03/12 18:35:33 UTC
[lucene-solr] 03/05: SOLR-11127: Improved error handling, bug fixes,
more tests.
This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch jira/solr-11127-2
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit fb37b7c730e01e2b3b246cb32f34c7001269196e
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Tue Mar 12 18:02:43 2019 +0100
SOLR-11127: Improved error handling, bug fixes, more tests.
---
.../OverseerCollectionMessageHandler.java | 2 +-
.../api/collections/ReindexCollectionCmd.java | 176 ++++++++++++++++-----
.../java/org/apache/solr/core/CoreContainer.java | 6 +
.../solr/handler/admin/CollectionsHandler.java | 7 +-
.../java/org/apache/solr/util/TestInjection.java | 35 ++++
.../apache/solr/cloud/ReindexCollectionTest.java | 150 ++++++++++++++++--
.../solrj/request/CollectionAdminRequest.java | 32 +++-
.../solr/common/params/CollectionParams.java | 3 +-
8 files changed, 349 insertions(+), 62 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 26818c4..86b8570 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -241,7 +241,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
.put(DELETEREPLICA, new DeleteReplicaCmd(this))
.put(ADDREPLICA, new AddReplicaCmd(this))
.put(MOVEREPLICA, new MoveReplicaCmd(this))
- .put(REINDEX_COLLECTION, new ReindexCollectionCmd(this))
+ .put(REINDEXCOLLECTION, new ReindexCollectionCmd(this))
.put(UTILIZENODE, new UtilizeNodeCmd(this))
.build()
;
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
index d676791..452e394 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
@@ -38,6 +38,7 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
@@ -52,6 +53,7 @@ import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
+import org.apache.solr.util.TestInjection;
import org.apache.solr.util.TimeOut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,12 +86,11 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String ABORT = "abort";
- public static final String KEEP_SOURCE = "keepSource";
+ public static final String REMOVE_SOURCE = "removeSource";
public static final String TARGET = "target";
- public static final String TARGET_COL_PREFIX = ".reindex_";
- public static final String CHK_COL_PREFIX = ".reindex_ck_";
- public static final String REINDEXING_PROP = CollectionAdminRequest.PROPERTY_PREFIX + "reindexing";
- public static final String REINDEX_PHASE_PROP = CollectionAdminRequest.PROPERTY_PREFIX + "reindex_phase";
+ public static final String TARGET_COL_PREFIX = ".rx_";
+ public static final String CHK_COL_PREFIX = ".rx_ck_";
+ public static final String REINDEXING_PROP = CollectionAdminRequest.PROPERTY_PREFIX + "rx";
private static final List<String> COLLECTION_PARAMS = Arrays.asList(
ZkStateReader.CONFIGNAME_PROP,
@@ -141,21 +142,39 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
@Override
public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
- log.info("*** called: {}", message);
+ log.debug("*** called: {}", message);
String collection = message.getStr(CommonParams.NAME);
+ // before resolving aliases
+ String originalCollection = collection;
+ Aliases aliases = ocmh.zkStateReader.getAliases();
+ if (collection != null) {
+ // resolve aliases - the source may be an alias
+ List<String> aliasList = aliases.resolveAliases(collection);
+ if (aliasList != null && !aliasList.isEmpty()) {
+ collection = aliasList.get(0);
+ }
+ }
+
if (collection == null || clusterState.getCollectionOrNull(collection) == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection name must be specified and must exist");
}
String target = message.getStr(TARGET);
if (target == null) {
target = collection;
+ } else {
+ // resolve aliases
+ List<String> aliasList = aliases.resolveAliases(target);
+ if (aliasList != null && !aliasList.isEmpty()) {
+ target = aliasList.get(0);
+ }
}
- boolean sameTarget = target.equals(collection);
- boolean keepSource = message.getBool(KEEP_SOURCE, true);
+ boolean sameTarget = target.equals(collection) || target.equals(originalCollection);
+ boolean removeSource = message.getBool(REMOVE_SOURCE, false);
boolean abort = message.getBool(ABORT, false);
DocCollection coll = clusterState.getCollection(collection);
if (abort) {
+ log.info("Abort requested for collection " + collection + ", setting the state to ABORTED.");
ZkNodeProps props = new ZkNodeProps(
Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
ZkStateReader.COLLECTION_PROP, collection,
@@ -181,6 +200,7 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
boolean aborted = false;
int batchSize = message.getInt(CommonParams.ROWS, 100);
String query = message.getStr(CommonParams.Q, "*:*");
+ String fl = message.getStr(CommonParams.FL, "*");
Integer rf = message.getInt(ZkStateReader.REPLICATION_FACTOR, coll.getReplicationFactor());
Integer numNrt = message.getInt(ZkStateReader.NRT_REPLICAS, coll.getNumNrtReplicas());
Integer numTlog = message.getInt(ZkStateReader.TLOG_REPLICAS, coll.getNumTlogReplicas());
@@ -193,12 +213,23 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
}
String configName = message.getStr(ZkStateReader.CONFIGNAME_PROP, ocmh.zkStateReader.readConfigName(collection));
+ String targetCollection;
int seq = tmpCollectionSeq.getAndIncrement();
- String targetCollection = sameTarget ?
- TARGET_COL_PREFIX + collection + "_" + seq : target;
- String chkCollection = CHK_COL_PREFIX + collection + "_" + seq;
+ if (sameTarget) {
+ do {
+ targetCollection = TARGET_COL_PREFIX + originalCollection + "_" + seq;
+ if (!clusterState.hasCollection(targetCollection)) {
+ break;
+ }
+ seq = tmpCollectionSeq.getAndIncrement();
+ } while (clusterState.hasCollection(targetCollection));
+ } else {
+ targetCollection = target;
+ }
+ String chkCollection = CHK_COL_PREFIX + originalCollection + "_" + seq;
String daemonUrl = null;
Exception exc = null;
+ boolean createdTarget = false;
try {
// 0. set up target and checkpoint collections
NamedList<Object> cmdResults = new NamedList<>();
@@ -214,7 +245,7 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
CoreAdminParams.DELETE_METRICS_HISTORY, "true"
);
ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
- // nocommit error checking
+ checkResults("deleting old checkpoint collection " + chkCollection, cmdResults, true);
}
if (maybeAbort(collection)) {
@@ -259,8 +290,10 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
}
// create the target collection
cmd = new ZkNodeProps(propMap);
+ cmdResults = new NamedList<>();
ocmh.commandMap.get(CollectionParams.CollectionAction.CREATE).call(clusterState, cmd, cmdResults);
- // nocommit error checking
+ createdTarget = true;
+ checkResults("creating target collection " + targetCollection, cmdResults, true);
// create the checkpoint collection - use RF=1 and 1 shard
cmd = new ZkNodeProps(
@@ -271,8 +304,9 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
CollectionAdminParams.COLL_CONF, "_default",
CommonAdminParams.WAIT_FOR_FINAL_STATE, "true"
);
+ cmdResults = new NamedList<>();
ocmh.commandMap.get(CollectionParams.CollectionAction.CREATE).call(clusterState, cmd, cmdResults);
- // nocommit error checking
+ checkResults("creating checkpoint collection " + chkCollection, cmdResults, true);
// wait for a while until we see both collections
TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS, ocmh.timeSource);
boolean created = false;
@@ -296,7 +330,14 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
ZkStateReader.COLLECTION_PROP, collection,
ZkStateReader.READ_ONLY, "true");
- ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(cmd));
+
+ TestInjection.injectReindexLatch();
+
+ if (maybeAbort(collection)) {
+ aborted = true;
+ return;
+ }
// 2. copy the documents to target
// Recipe taken from: http://joelsolr.blogspot.com/2016/10/solr-63-batch-jobs-parallel-etl-and.html
@@ -312,12 +353,12 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
"topic(" + chkCollection + "," +
collection + "," +
"q=\"" + query + "\"," +
- "fl=\"*\"," +
+ "fl=\"" + fl + "\"," +
"id=\"topic_" + targetCollection + "\"," +
// some of the documents eg. in .system contain large blobs
"rows=\"" + batchSize + "\"," +
"initialCheckpoint=\"0\"))))");
- log.info("- starting copying documents from " + collection + " to " + targetCollection);
+ log.debug("- starting copying documents from " + collection + " to " + targetCollection);
SolrResponse rsp = null;
try {
rsp = ocmh.cloudManager.request(new QueryRequest(q));
@@ -337,14 +378,18 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
aborted = true;
return;
}
- log.info("- finished copying from " + collection + " to " + targetCollection);
+ TestInjection.injectReindexFailure();
+ log.debug("- finished copying from " + collection + " to " + targetCollection);
// 5. if (sameTarget) set up an alias to use targetCollection as the source name
if (sameTarget) {
+ log.debug("- setting up alias from " + originalCollection + " to " + targetCollection);
cmd = new ZkNodeProps(
- CommonParams.NAME, collection,
+ CommonParams.NAME, originalCollection,
"collections", targetCollection);
+ cmdResults = new NamedList<>();
ocmh.commandMap.get(CollectionParams.CollectionAction.CREATEALIAS).call(clusterState, cmd, results);
+ checkResults("setting up alias " + originalCollection + " -> " + targetCollection, cmdResults, true);
}
if (maybeAbort(collection)) {
@@ -352,42 +397,51 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
return;
}
// 6. delete the checkpoint collection
+ log.debug("- deleting " + chkCollection);
cmd = new ZkNodeProps(
Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.DELETE.toLower(),
CommonParams.NAME, chkCollection,
CoreAdminParams.DELETE_METRICS_HISTORY, "true"
);
+ cmdResults = new NamedList<>();
ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
-
- // nocommit error checking
+ checkResults("deleting checkpoint collection " + chkCollection, cmdResults, true);
// 7. optionally delete the source collection
- if (keepSource) {
- // 8. set the FINISHED state and clear readOnly
- props = new ZkNodeProps(
- Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
- ZkStateReader.COLLECTION_PROP, collection,
- REINDEXING_PROP, State.FINISHED.toLower(),
- ZkStateReader.READ_ONLY, "");
- ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
- } else {
+ if (removeSource) {
+ log.debug("- deleting source collection");
cmd = new ZkNodeProps(
Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.DELETE.toLower(),
CommonParams.NAME, collection,
CoreAdminParams.DELETE_METRICS_HISTORY, "true"
);
+ cmdResults = new NamedList<>();
ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
+ checkResults("deleting source collection " + collection, cmdResults, true);
+ } else {
+ // 8. clear readOnly on source
+ props = new ZkNodeProps(
+ Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
+ ZkStateReader.COLLECTION_PROP, collection,
+ ZkStateReader.READ_ONLY, null);
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
}
+ // 9. set FINISHED state on the target
+ props = new ZkNodeProps(
+ Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
+ ZkStateReader.COLLECTION_PROP, targetCollection,
+ REINDEXING_PROP, State.FINISHED.toLower());
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
- results.add(State.FINISHED.toLower(), collection);
+ results.add(State.FINISHED.toLower(), originalCollection);
} catch (Exception e) {
- log.warn("Error during reindexing of " + collection, e);
+ log.warn("Error during reindexing of " + originalCollection, e);
exc = e;
aborted = true;
throw e;
} finally {
if (aborted) {
- cleanup(collection, targetCollection, chkCollection, daemonUrl, targetCollection);
+ cleanup(collection, targetCollection, chkCollection, daemonUrl, targetCollection, createdTarget);
results.add(State.ABORTED.toLower(), collection);
if (exc != null) {
results.add("error", exc.toString());
@@ -396,16 +450,33 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
}
}
+ private void checkResults(String label, NamedList<Object> results, boolean failureIsFatal) throws Exception {
+ Object failure = results.get("failure");
+ if (failure == null) {
+ failure = results.get("error");
+ }
+ if (failure != null) {
+ String msg = "Error: " + label + ": " + Utils.toJSONString(results);
+ if (failureIsFatal) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, msg);
+ } else {
+ log.error(msg);
+ }
+ }
+ }
+
private boolean maybeAbort(String collection) throws Exception {
DocCollection coll = ocmh.cloudManager.getClusterStateProvider().getClusterState().getCollectionOrNull(collection);
if (coll == null) {
// collection no longer present - abort
+ log.info("## Aborting - collection {} no longer present.", collection);
return true;
}
State state = State.get(coll.getStr(REINDEXING_PROP, State.RUNNING.toLower()));
if (state != State.ABORTED) {
return false;
}
+ log.info("## Aborting - collection {} state is {}", collection, state);
return true;
}
@@ -413,11 +484,11 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
private String getDaemonUrl(SolrResponse rsp, DocCollection coll) {
Map<String, Object> rs = (Map<String, Object>)rsp.getResponse().get("result-set");
if (rs == null || rs.isEmpty()) {
- log.debug("Missing daemon information in response: " + Utils.toJSONString(rsp));
+ log.debug(" -- Missing daemon information in response: " + Utils.toJSONString(rsp));
}
List<Object> list = (List<Object>)rs.get("docs");
if (list == null) {
- log.debug("Missing daemon information in response: " + Utils.toJSONString(rsp));
+ log.debug(" -- Missing daemon information in response: " + Utils.toJSONString(rsp));
return null;
}
String replicaName = null;
@@ -429,7 +500,7 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
}
String[] parts = op.split("\\s+");
if (parts.length != 4) {
- log.debug("Invalid daemon location info, expected 4 tokens: " + op);
+ log.debug(" -- Invalid daemon location info, expected 4 tokens: " + op);
return null;
}
// check if it's plausible
@@ -437,7 +508,7 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
replicaName = parts[3];
break;
} else {
- log.debug("daemon location info likely invalid: " + op);
+ log.debug(" -- daemon location info likely invalid: " + op);
return null;
}
}
@@ -493,12 +564,13 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception copying the documents", e);
}
- ocmh.cloudManager.getTimeSource().sleep(5000);
+ ocmh.cloudManager.getTimeSource().sleep(2000);
} while (isRunning && !maybeAbort(collection));
}
}
private void killDaemon(String daemonName, String daemonUrl) throws Exception {
+ log.debug("-- killing daemon " + daemonName + " at " + daemonUrl);
HttpClient client = ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient();
try (HttpSolrClient solrClient = new HttpSolrClient.Builder()
.withHttpClient(client)
@@ -516,7 +588,9 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
}
}
- private void cleanup(String collection, String targetCollection, String chkCollection, String daemonUrl, String daemonName) throws Exception {
+ private void cleanup(String collection, String targetCollection, String chkCollection,
+ String daemonUrl, String daemonName, boolean createdTarget) throws Exception {
+ log.info("## Cleaning up after abort or error");
// 1. kill the daemon
// 2. cleanup target / chk collections IFF the source collection still exists and is not empty
// 3. cleanup collection state
@@ -526,20 +600,36 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
}
ClusterState clusterState = ocmh.cloudManager.getClusterStateProvider().getClusterState();
NamedList<Object> cmdResults = new NamedList<>();
- if (!collection.equals(targetCollection) && clusterState.hasCollection(targetCollection)) {
+ if (createdTarget && !collection.equals(targetCollection) && clusterState.hasCollection(targetCollection)) {
+ log.debug(" -- removing " + targetCollection);
ZkNodeProps cmd = new ZkNodeProps(
Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.DELETE.toLower(),
CommonParams.NAME, targetCollection,
CoreAdminParams.DELETE_METRICS_HISTORY, "true"
);
ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
- // nocommit error checking
+ checkResults("CLEANUP: deleting target collection " + targetCollection, cmdResults, false);
+
+ }
+ // remove chk collection
+ if (clusterState.hasCollection(chkCollection)) {
+ log.debug(" -- removing " + chkCollection);
+ ZkNodeProps cmd = new ZkNodeProps(
+ Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.DELETE.toLower(),
+ CommonParams.NAME, chkCollection,
+ CoreAdminParams.DELETE_METRICS_HISTORY, "true"
+ );
+ cmdResults = new NamedList<>();
+ ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
+ checkResults("CLEANUP: deleting checkpoint collection " + chkCollection, cmdResults, false);
}
+ log.debug(" -- turning readOnly mode off for " + collection);
ZkNodeProps props = new ZkNodeProps(
Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
ZkStateReader.COLLECTION_PROP, collection,
- REINDEXING_PROP, State.ABORTED.toLower(),
- ZkStateReader.READ_ONLY, "");
+ // remove the rx flag, we already aborted
+ REINDEXING_PROP, null,
+ ZkStateReader.READ_ONLY, null);
ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
}
}
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 8c5e227..58c9f69 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -747,10 +747,16 @@ public class CoreContainer {
containerHandlers.put(AutoScalingHandler.HANDLER_PATH, autoScalingHandler);
autoScalingHandler.initializeMetrics(metricManager, SolrInfoBean.Group.node.toString(), metricTag, AutoScalingHandler.HANDLER_PATH);
}
+ // verify .system compatibility
+ systemCollCompatCheck();
// This is a bit redundant but these are two distinct concepts for all they're accomplished at the same time.
status |= LOAD_COMPLETE | INITIAL_CORE_LOAD_COMPLETE;
}
+ private void systemCollCompatCheck() {
+
+ }
+
// MetricsHistoryHandler supports both cloud and standalone configs
private void createMetricsHistoryHandler() {
PluginInfo plugin = cfg.getMetricsConfig().getHistoryHandler();
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 6b53c00..106eece 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -542,11 +542,11 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
RELOAD_OP(RELOAD, (req, rsp, h) -> copy(req.getParams().required(), null, NAME)),
- REINDEX_COLLECTION_OP(REINDEX_COLLECTION, (req, rsp, h) -> {
+ REINDEXCOLLECTION_OP(REINDEXCOLLECTION, (req, rsp, h) -> {
Map<String, Object> m = copy(req.getParams().required(), null, NAME);
copy(req.getParams(), m,
ReindexCollectionCmd.ABORT,
- ReindexCollectionCmd.KEEP_SOURCE,
+ ReindexCollectionCmd.REMOVE_SOURCE,
ReindexCollectionCmd.TARGET,
ZkStateReader.CONFIGNAME_PROP,
NUM_SLICES,
@@ -561,7 +561,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
AUTO_ADD_REPLICAS,
"shards",
CommonParams.ROWS,
- CommonParams.Q);
+ CommonParams.Q,
+ CommonParams.FL);
copyPropertiesWithPrefix(req.getParams(), m, "router.");
return m;
}),
diff --git a/solr/core/src/java/org/apache/solr/util/TestInjection.java b/solr/core/src/java/org/apache/solr/util/TestInjection.java
index cf7681e..7a49ba4 100644
--- a/solr/core/src/java/org/apache/solr/util/TestInjection.java
+++ b/solr/core/src/java/org/apache/solr/util/TestInjection.java
@@ -126,6 +126,10 @@ public class TestInjection {
public volatile static CountDownLatch splitLatch = null;
+ public volatile static CountDownLatch reindexLatch = null;
+
+ public volatile static String reindexFailure = null;
+
public volatile static String failIndexFingerprintRequests = null;
public volatile static String wrongIndexFingerprint = null;
@@ -156,6 +160,8 @@ public class TestInjection {
splitFailureBeforeReplicaCreation = null;
splitFailureAfterReplicaCreation = null;
splitLatch = null;
+ reindexLatch = null;
+ reindexFailure = null;
prepRecoveryOpPauseForever = null;
countPrepRecoveryOpPauseForever = new AtomicInteger(0);
failIndexFingerprintRequests = null;
@@ -423,6 +429,35 @@ public class TestInjection {
return true;
}
+ public static boolean injectReindexFailure() {
+ if (reindexFailure != null) {
+ Random rand = random();
+ if (null == rand) return true;
+
+ Pair<Boolean,Integer> pair = parseValue(reindexFailure);
+ boolean enabled = pair.first();
+ int chanceIn100 = pair.second();
+ if (enabled && rand.nextInt(100) >= (100 - chanceIn100)) {
+ log.info("Test injection failure");
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Test injection failure");
+ }
+ }
+ return true;
+ }
+
+
+ public static boolean injectReindexLatch() {
+ if (reindexLatch != null) {
+ try {
+ log.info("Waiting in ReindexCollectionCmd for up to 60s");
+ return reindexLatch.await(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ return true;
+ }
+
private static Pair<Boolean,Integer> parseValue(final String raw) {
if (raw == null) return new Pair<>(false, 0);
Matcher m = ENABLED_PERCENT.matcher(raw);
diff --git a/solr/core/src/test/org/apache/solr/cloud/ReindexCollectionTest.java b/solr/core/src/test/org/apache/solr/cloud/ReindexCollectionTest.java
index 2a9cfd4..218476c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ReindexCollectionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ReindexCollectionTest.java
@@ -21,21 +21,27 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.impl.BaseHttpSolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.api.collections.ReindexCollectionCmd;
import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.util.LogLevel;
+import org.apache.solr.util.TestInjection;
+import org.apache.solr.util.TimeOut;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -44,6 +50,7 @@ import org.junit.Test;
/**
*
*/
+@LogLevel("org.apache.solr.cloud.api.collections.ReindexCollectionCmd=DEBUG")
public class ReindexCollectionTest extends SolrCloudTestCase {
@BeforeClass
@@ -74,6 +81,8 @@ public class ReindexCollectionTest extends SolrCloudTestCase {
cluster.deleteAllCollections(); // deletes aliases too
solrClient.close();
+
+ TestInjection.reset();
}
private static final int NUM_DOCS = 200; // at least two batches, default batchSize=100
@@ -93,7 +102,7 @@ public class ReindexCollectionTest extends SolrCloudTestCase {
.setTarget(targetCollection);
req.process(solrClient);
- CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", sourceCollection, (liveNodes, coll) -> {
+ CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", targetCollection, (liveNodes, coll) -> {
ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_PROP));
return ReindexCollectionCmd.State.FINISHED == state;
});
@@ -114,7 +123,24 @@ public class ReindexCollectionTest extends SolrCloudTestCase {
.setTarget(targetCollection);
req.process(solrClient);
- CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", sourceCollection, (liveNodes, coll) -> {
+ String realTargetCollection = null;
+ TimeOut timeOut = new TimeOut(30, TimeUnit.SECONDS, cloudManager.getTimeSource());
+ String prefix = ReindexCollectionCmd.TARGET_COL_PREFIX + targetCollection;
+ while (!timeOut.hasTimedOut()) {
+ timeOut.sleep(500);
+ for (String name : cloudManager.getClusterStateProvider().getClusterState().getCollectionsMap().keySet()) {
+ if (name.startsWith(prefix)) {
+ realTargetCollection = name;
+ break;
+ }
+ }
+ if (realTargetCollection != null) {
+ break;
+ }
+ }
+ assertNotNull("target collection not present after 30s", realTargetCollection);
+
+ CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", realTargetCollection, (liveNodes, coll) -> {
ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_PROP));
return ReindexCollectionCmd.State.FINISHED == state;
});
@@ -142,7 +168,7 @@ public class ReindexCollectionTest extends SolrCloudTestCase {
.setConfigName("conf3");
req.process(solrClient);
- CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", sourceCollection, (liveNodes, coll) -> {
+ CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", targetCollection, (liveNodes, coll) -> {
ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_PROP));
return ReindexCollectionCmd.State.FINISHED == state;
});
@@ -159,26 +185,38 @@ public class ReindexCollectionTest extends SolrCloudTestCase {
@Test
public void testReshapeReindexing() throws Exception {
final String sourceCollection = "reshapeReindexing";
- final String targetCollection = sourceCollection;
+ final String targetCollection = "reshapeReindexingTarget";
createCollection(sourceCollection, "conf1", 2, 2);
indexDocs(sourceCollection, NUM_DOCS,
- i -> new SolrInputDocument("id", String.valueOf(i), "string_s", String.valueOf(i)));
+ i -> new SolrInputDocument(
+ "id", String.valueOf(i),
+ "string_s", String.valueOf(i),
+ "remove_s", String.valueOf(i)));
CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
.setTarget(targetCollection)
.setCollectionParam(ZkStateReader.NUM_SHARDS_PROP, 3)
.setCollectionParam(ZkStateReader.REPLICATION_FACTOR, 1)
.setCollectionParam("router.name", ImplicitDocRouter.NAME)
- .setCollectionParam("shards", "foo,bar,baz");
+ .setCollectionParam("shards", "foo,bar,baz")
+ .setCollectionParam("fl", "id,string_s")
+ .setCollectionParam("q", "id:10*");
req.process(solrClient);
- CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", sourceCollection, (liveNodes, coll) -> {
+ CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", targetCollection, (liveNodes, coll) -> {
ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_PROP));
return ReindexCollectionCmd.State.FINISHED == state;
});
// verify the target docs exist
QueryResponse rsp = solrClient.query(targetCollection, params(CommonParams.Q, "*:*"));
- assertEquals("copied num docs", NUM_DOCS, rsp.getResults().getNumFound());
+ // 10 and 100-109
+ assertEquals("copied num docs", 11, rsp.getResults().getNumFound());
+ // verify the correct fields exist
+ for (SolrDocument doc : rsp.getResults()) {
+ assertNotNull(doc.getFieldValue("id"));
+ assertNotNull(doc.getFieldValue("string_s"));
+ assertNull(doc.getFieldValue("remove_s"));
+ }
// check the shape of the new collection
ClusterState clusterState = solrClient.getClusterStateProvider().getClusterState();
@@ -195,6 +233,100 @@ public class ReindexCollectionTest extends SolrCloudTestCase {
assertEquals(ImplicitDocRouter.NAME, coll.getRouter().getName());
}
+ @Test
+ public void testFailure() throws Exception {
+ final String sourceCollection = "failReindexing";
+ final String targetCollection = "failReindexingTarget";
+ final String aliasTarget = "failAlias";
+ createCollection(sourceCollection, "conf1", 2, 2);
+ createCollection(targetCollection, "conf1", 1, 1);
+ CollectionAdminRequest.createAlias(aliasTarget, targetCollection).process(solrClient);
+ indexDocs(sourceCollection, NUM_DOCS,
+ i -> new SolrInputDocument(
+ "id", String.valueOf(i),
+ "string_s", String.valueOf(i)));
+
+ CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
+ .setTarget(targetCollection);
+ try {
+ req.process(solrClient);
+ fail("succeeded but expected reindexing to fail due to the target collection already present");
+ } catch (Exception e) {
+ assertTrue(e instanceof BaseHttpSolrClient.RemoteSolrException);
+ BaseHttpSolrClient.RemoteSolrException rse = (BaseHttpSolrClient.RemoteSolrException)e;
+ assertEquals(SolrException.ErrorCode.SERVER_ERROR.code, rse.code());
+ }
+ req = CollectionAdminRequest.reindexCollection(sourceCollection)
+ .setTarget(aliasTarget);
+ try {
+ req.process(solrClient);
+ fail("succeeded but expected reindexing to fail due to the target collection already present");
+ } catch (Exception e) {
+ assertTrue(e instanceof BaseHttpSolrClient.RemoteSolrException);
+ BaseHttpSolrClient.RemoteSolrException rse = (BaseHttpSolrClient.RemoteSolrException)e;
+ assertEquals(SolrException.ErrorCode.SERVER_ERROR.code, rse.code());
+ }
+
+ CollectionAdminRequest.deleteAlias(aliasTarget).process(solrClient);
+ CollectionAdminRequest.deleteCollection(targetCollection).process(solrClient);
+
+ req = CollectionAdminRequest.reindexCollection(sourceCollection)
+ .setTarget(targetCollection);
+
+ TestInjection.reindexFailure = "true:100";
+ try {
+ req.process(solrClient);
+ fail("succeeded but expected reindexing to fail due to a test-injected failure");
+ } catch (Exception e) {
+ assertTrue(e instanceof BaseHttpSolrClient.RemoteSolrException);
+ BaseHttpSolrClient.RemoteSolrException rse = (BaseHttpSolrClient.RemoteSolrException)e;
+ assertEquals(SolrException.ErrorCode.SERVER_ERROR.code, rse.code());
+ }
+ // verify that the target and checkpoint collections don't exist
+ cloudManager.getClusterStateProvider().getClusterState().forEachCollection(coll -> {
+ assertFalse(coll.getName() + " still exists", coll.getName().startsWith(ReindexCollectionCmd.TARGET_COL_PREFIX));
+ assertFalse(coll.getName() + " still exists", coll.getName().startsWith(ReindexCollectionCmd.CHK_COL_PREFIX));
+ });
+ // verify that the source collection is read-write and has no reindexing flags
+ CloudTestUtils.waitForState(cloudManager, "collection state is incorrect", sourceCollection,
+ ((liveNodes, collectionState) ->
+ !collectionState.isReadOnly() &&
+ collectionState.getStr(ReindexCollectionCmd.REINDEXING_PROP) == null));
+ }
+
+ @Test
+ public void testAbort() throws Exception {
+ final String sourceCollection = "abortReindexing";
+ final String targetCollection = "abortReindexingTarget";
+ createCollection(sourceCollection, "conf1", 2, 1);
+
+ TestInjection.reindexLatch = new CountDownLatch(1);
+ CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
+ .setTarget(targetCollection);
+ String asyncId = req.processAsync(solrClient);
+ // wait for the source collection to be put in readOnly mode
+ CloudTestUtils.waitForState(cloudManager, "source collection didn't become readOnly",
+ sourceCollection, (liveNodes, coll) -> coll.isReadOnly());
+
+ req = CollectionAdminRequest.reindexCollection(sourceCollection);
+ req.setAbort(true);
+ req.process(solrClient);
+ CloudTestUtils.waitForState(cloudManager, "incorrect collection state", sourceCollection,
+ ((liveNodes, collectionState) ->
+ collectionState.isReadOnly() &&
+ ReindexCollectionCmd.State.ABORTED.toLower().equals(collectionState.getStr(ReindexCollectionCmd.REINDEXING_PROP))));
+ // let the process continue
+ TestInjection.reindexLatch.countDown();
+ CloudTestUtils.waitForState(cloudManager, "source collection is in wrong state",
+ sourceCollection, (liveNodes, docCollection) -> {
+ System.err.println("-- coll " + docCollection);
+ return !docCollection.isReadOnly() && docCollection.getStr(ReindexCollectionCmd.REINDEXING_PROP) == null;
+ });
+ // verify the response
+ CollectionAdminRequest.RequestStatusResponse rsp = CollectionAdminRequest.requestStatus(asyncId).process(solrClient);
+ rsp.getRequestStatus();
+ }
+
private void createCollection(String name, String config, int numShards, int numReplicas) throws Exception {
CollectionAdminRequest.createCollection(name, config, numShards, numReplicas)
.setMaxShardsPerNode(-1)
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index ddfe90e..d0ae477 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -793,40 +793,60 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
public static class ReindexCollection extends AsyncCollectionSpecificAdminRequest {
String target;
String query;
+ String fields;
String configName;
- Boolean keepSource;
+ Boolean removeSource;
+ Boolean abort;
Integer batchSize;
Map<String, Object> collectionParams = new HashMap<>();
private ReindexCollection(String collection) {
- super(CollectionAction.REINDEX_COLLECTION, collection);
+ super(CollectionAction.REINDEXCOLLECTION, collection);
}
+ /** Target collection name (null if the same). */
public ReindexCollection setTarget(String target) {
this.target = target;
return this;
}
+ /** Set to true to abort already running requests. */
+ public ReindexCollection setAbort(boolean abort) {
+ this.abort = abort;
+ return this;
+ }
+
+ /** Query matching the documents to reindex (default is '*:*'). */
public ReindexCollection setQuery(String query) {
this.query = query;
return this;
}
- public ReindexCollection setKeepSource(boolean keepSource) {
- this.keepSource = keepSource;
+ /** Fields to reindex (the same syntax as {@link CommonParams#FL}), default is '*'. */
+ public ReindexCollection setFields(String fields) {
+ this.fields = fields;
+ return this;
+ }
+
+ /** Remove source collection after success. Default is false. */
+ public ReindexCollection setRemoveSource(boolean removeSource) {
+ this.removeSource = removeSource;
return this;
}
+ /** Copy documents in batches of this size. Default is 100. */
public ReindexCollection setBatchSize(int batchSize) {
this.batchSize = batchSize;
return this;
}
+ /** Config name for the target collection. Default is the same as source. */
public ReindexCollection setConfigName(String configName) {
this.configName = configName;
return this;
}
+ /** Set other supported collection CREATE parameters. */
public ReindexCollection setCollectionParam(String key, Object value) {
this.collectionParams.put(key, value);
return this;
@@ -836,9 +856,11 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
public SolrParams getParams() {
ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
params.setNonNull("target", target);
+ params.setNonNull("abort", abort);
params.setNonNull(ZkStateReader.CONFIGNAME_PROP, configName);
params.setNonNull(CommonParams.Q, query);
- params.setNonNull("keepSource", keepSource);
+ params.setNonNull(CommonParams.FL, fields);
+ params.setNonNull("removeSource", removeSource);
params.setNonNull(CommonParams.ROWS, batchSize);
collectionParams.forEach((k, v) -> params.setNonNull(k, v));
return params;
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
index df97c35..1485e34 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
@@ -123,7 +123,8 @@ public interface CollectionParams {
// TODO: not implemented yet
MERGESHARDS(true, LockLevel.SHARD),
COLSTATUS(true, LockLevel.NONE),
- REINDEX_COLLECTION(true, LockLevel.COLLECTION)
+ // this command implements its own locking
+ REINDEXCOLLECTION(true, LockLevel.NONE)
;
public final boolean isWrite;