You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2019/03/13 21:25:38 UTC
[lucene-solr] branch jira/solr-11127-2 updated: SOLR-11127:
Improved status reporting and error handling. Documentation.
This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch jira/solr-11127-2
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/jira/solr-11127-2 by this push:
new 5ad99eb SOLR-11127: Improved status reporting and error handling. Documentation.
5ad99eb is described below
commit 5ad99eb131df3bf7f272b3bda355d0a349950f81
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Wed Mar 13 22:25:10 2019 +0100
SOLR-11127: Improved status reporting and error handling. Documentation.
---
.../api/collections/ReindexCollectionCmd.java | 226 ++++++++++++++++-----
.../solr/handler/admin/CollectionsHandler.java | 6 +-
.../apache/solr/cloud/ReindexCollectionTest.java | 115 +++++++----
solr/solr-ref-guide/src/collections-api.adoc | 120 +++++++++++
.../solr/client/solrj/io/stream/DaemonStream.java | 13 +-
.../solrj/request/CollectionAdminRequest.java | 10 +-
.../java/org/apache/solr/common/util/Utils.java | 3 +-
7 files changed, 381 insertions(+), 112 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
index f1b31d7..553c4bf 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
@@ -24,18 +24,24 @@ import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Aliases;
@@ -55,42 +61,46 @@ import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
import org.apache.solr.util.TestInjection;
import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Reindex a collection, usually in order to change the index schema.
- * <p>WARNING: Reindexing is a potentially lossy operation - some indexed data that is not available as
+ * <p>WARNING: Reindexing is potentially a lossy operation - some indexed data that is not available as
* stored fields may be irretrievably lost, so users should use this command with caution, evaluating
* the potential impact by using different source and target collection names first, and preserving
* the source collection until the evaluation is complete.</p>
* <p>Reindexing follows these steps:</p>
* <ol>
- * <li>create a temporary collection using the most recent schema of the source collection
- * (or the one specified in the parameters, which must already exist).</li>
- * <li>copy the source documents to the temporary collection, reconstructing them from their stored
- * fields and reindexing them using the specified schema. NOTE: some data
+ * <li>creates a temporary collection using the most recent schema of the source collection
+ * (or the one specified in the parameters, which must already exist), and the shape of the original
+ * collection, unless overridden by parameters.</li>
+ * <li>copy the source documents to the temporary collection, using their stored fields and
+ * reindexing them using the specified schema. NOTE: some data
* loss may occur if the original stored field data is not available!</li>
- * <li>if the target collection name is not specified
- * then the same name as the source is assumed and at this step the source collection is permanently removed.</li>
* <li>create the target collection from scratch with the specified name (or the same as source if not
- * specified), but using the new specified schema. NOTE: if the target name was not specified or is the same
- * as the source collection then the original collection has been deleted in the previous step and it's
- * not possible to roll-back the changes if the process is interrupted. The (possibly incomplete) data
- * is still available in the temporary collection.</li>
- * <li>copy the documents from the temporary collection to the target collection, using the specified schema.</li>
- * <li>delete temporary collection(s) and optionally delete the source collection if it still exists.</li>
+ * specified) and the specified parameters. NOTE: if the target name was not specified or is the same
+ * as the source collection then a unique sequential collection name will be used.</li>
+ * <li>copy the documents from the source collection to the target collection.</li>
+ * <li>if the source and target collection name was the same then set up an alias pointing from the source collection name to the actual
+ * (sequentially named) target collection</li>
+ * <li>optionally delete the source collection.</li>
* </ol>
*/
public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cmd {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- public static final String ABORT = "abort";
+ public static final String COMMAND = "cmd";
+ public static final String REINDEX_STATUS = "reindexStatus";
public static final String REMOVE_SOURCE = "removeSource";
public static final String TARGET = "target";
public static final String TARGET_COL_PREFIX = ".rx_";
public static final String CHK_COL_PREFIX = ".rx_ck_";
- public static final String REINDEXING_PROP = CollectionAdminRequest.PROPERTY_PREFIX + "rx";
+ public static final String REINDEXING_STATE = CollectionAdminRequest.PROPERTY_PREFIX + "rx";
+
+ public static final String STATE = "state";
+ public static final String PHASE = "phase";
private static final List<String> COLLECTION_PARAMS = Arrays.asList(
ZkStateReader.CONFIGNAME_PROP,
@@ -121,20 +131,40 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
return toString().toLowerCase(Locale.ROOT);
}
- public static State get(String p) {
+ public static State get(Object p) {
if (p == null) {
return null;
}
- p = p.toLowerCase(Locale.ROOT);
- if (p.startsWith(CollectionAdminRequest.PROPERTY_PREFIX)) {
- p = p.substring(CollectionAdminRequest.PROPERTY_PREFIX.length());
- }
+ p = String.valueOf(p).toLowerCase(Locale.ROOT);
return states.get(p);
}
static Map<String, State> states = Collections.unmodifiableMap(
Stream.of(State.values()).collect(Collectors.toMap(State::toLower, Function.identity())));
}
+ public enum Cmd {
+ START,
+ ABORT,
+ STATUS;
+
+ public String toLower() {
+ return toString().toLowerCase(Locale.ROOT);
+ }
+
+ public static Cmd get(String p) {
+ if (p == null) {
+ return null;
+ }
+ p = p.toLowerCase(Locale.ROOT);
+ return cmds.get(p);
+ }
+ static Map<String, Cmd> cmds = Collections.unmodifiableMap(
+ Stream.of(Cmd.values()).collect(Collectors.toMap(Cmd::toLower, Function.identity())));
+ }
+
+ private SolrClientCache solrClientCache;
+ private String zkHost;
+
public ReindexCollectionCmd(OverseerCollectionMessageHandler ocmh) {
this.ocmh = ocmh;
}
@@ -156,8 +186,8 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
}
}
- if (collection == null || clusterState.getCollectionOrNull(collection) == null) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection name must be specified and must exist");
+ if (collection == null || !clusterState.hasCollection(collection)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Source collection name must be specified and must exist");
}
String target = message.getStr(TARGET);
if (target == null) {
@@ -171,32 +201,40 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
}
boolean sameTarget = target.equals(collection) || target.equals(originalCollection);
boolean removeSource = message.getBool(REMOVE_SOURCE, false);
- boolean abort = message.getBool(ABORT, false);
- DocCollection coll = clusterState.getCollection(collection);
- if (abort) {
- log.info("Abort requested for collection " + collection + ", setting the state to ABORTED.");
- ZkNodeProps props = new ZkNodeProps(
- Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
- ZkStateReader.COLLECTION_PROP, collection,
- REINDEXING_PROP, State.ABORTED.toLower());
- ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
- results.add(State.ABORTED.toLower(), collection);
+ Cmd command = Cmd.get(message.getStr(COMMAND, Cmd.START.toLower()));
+ if (command == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown command: " + message.getStr(COMMAND));
+ }
+ Map<String, Object> reindexingState = getReindexingState(ocmh.cloudManager.getDistribStateManager(), collection);
+ if (!reindexingState.containsKey(STATE)) {
+ reindexingState.put(STATE, State.IDLE.toLower());
+ }
+ State state = State.get(reindexingState.get(STATE));
+ if (command == Cmd.ABORT) {
+ log.info("Abort requested for collection {}, setting the state to ABORTED.", collection);
+ // check that it's running
+ if (state != State.RUNNING) {
+ log.debug("Abort requested for collection {} but command is not running: {}", collection, state);
+ return;
+ }
+ setReindexingState(collection, State.ABORTED, null);
+ reindexingState.put(STATE, "aborting");
+ results.add(REINDEX_STATUS, reindexingState);
// if needed the cleanup will be performed by the running instance of the command
return;
+ } else if (command == Cmd.STATUS) {
+ results.add(REINDEX_STATUS, reindexingState);
+ return;
}
+ // command == Cmd.START
+
// check it's not already running
- State state = State.get(coll.getStr(REINDEXING_PROP, State.IDLE.toLower()));
if (state == State.RUNNING) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Reindex is already running for collection " + collection +
- ". If you are sure this is not the case you can issue &abort=true to clean up this state.");
+ ". If you are sure this is not the case you can issue &cmd=abort to clean up this state.");
}
- // set the running flag
- ZkNodeProps props = new ZkNodeProps(
- Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
- ZkStateReader.COLLECTION_PROP, collection,
- REINDEXING_PROP, State.RUNNING.toLower());
- ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+ DocCollection coll = clusterState.getCollection(collection);
boolean aborted = false;
int batchSize = message.getInt(CommonParams.ROWS, 100);
String query = message.getStr(CommonParams.Q, "*:*");
@@ -226,11 +264,22 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
} else {
targetCollection = target;
}
- String chkCollection = CHK_COL_PREFIX + originalCollection + "_" + seq;
+ String chkCollection = CHK_COL_PREFIX + originalCollection;
String daemonUrl = null;
Exception exc = null;
boolean createdTarget = false;
try {
+ solrClientCache = new SolrClientCache(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
+ zkHost = ocmh.zkStateReader.getZkClient().getZkServerAddress();
+ // set the running flag
+ reindexingState.clear();
+ reindexingState.put("actualSourceCollection", collection);
+ reindexingState.put("actualTargetCollection", targetCollection);
+ reindexingState.put("checkpointCollection", chkCollection);
+ reindexingState.put("inputDocs", getNumberOfDocs(collection));
+ reindexingState.put(PHASE, "creating target and checkpoint collections");
+ setReindexingState(collection, State.RUNNING, reindexingState);
+
// 0. set up target and checkpoint collections
NamedList<Object> cmdResults = new NamedList<>();
ZkNodeProps cmd;
@@ -276,6 +325,7 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
propMap.put(ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode);
propMap.put(CommonAdminParams.WAIT_FOR_FINAL_STATE, true);
+ propMap.put(DocCollection.STATE_FORMAT, message.getInt(DocCollection.STATE_FORMAT, coll.getStateFormat()));
if (rf != null) {
propMap.put(ZkStateReader.REPLICATION_FACTOR, rf);
}
@@ -301,6 +351,7 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
CommonParams.NAME, chkCollection,
ZkStateReader.NUM_SHARDS_PROP, "1",
ZkStateReader.REPLICATION_FACTOR, "1",
+ DocCollection.STATE_FORMAT, "2",
CollectionAdminParams.COLL_CONF, "_default",
CommonAdminParams.WAIT_FOR_FINAL_STATE, "true"
);
@@ -371,9 +422,13 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to copy documents from " +
collection + " to " + targetCollection + ": " + Utils.toJSONString(rsp));
}
+ reindexingState.put("daemonUrl", daemonUrl);
+ reindexingState.put("daemonName", targetCollection);
+ reindexingState.put(PHASE, "copying documents");
+ setReindexingState(collection, State.RUNNING, reindexingState);
// wait for the daemon to finish
- waitForDaemon(targetCollection, daemonUrl, collection);
+ waitForDaemon(targetCollection, daemonUrl, collection, targetCollection, reindexingState);
if (maybeAbort(collection)) {
aborted = true;
return;
@@ -391,8 +446,15 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
cmdResults = new NamedList<>();
ocmh.commandMap.get(CollectionParams.CollectionAction.CREATEALIAS).call(clusterState, cmd, results);
checkResults("setting up alias " + originalCollection + " -> " + targetCollection, cmdResults, true);
+ reindexingState.put("alias", originalCollection + " -> " + targetCollection);
}
+ reindexingState.remove("daemonUrl");
+ reindexingState.remove("daemonName");
+ reindexingState.put("processedDocs", getNumberOfDocs(targetCollection));
+ reindexingState.put(PHASE, "copying done, finalizing");
+ setReindexingState(collection, State.RUNNING, reindexingState);
+
if (maybeAbort(collection)) {
aborted = true;
return;
@@ -421,33 +483,83 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
checkResults("deleting source collection " + collection, cmdResults, true);
} else {
// 8. clear readOnly on source
- props = new ZkNodeProps(
+ ZkNodeProps props = new ZkNodeProps(
Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
ZkStateReader.COLLECTION_PROP, collection,
ZkStateReader.READ_ONLY, null);
ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
}
- // 9. set FINISHED state on the target
- props = new ZkNodeProps(
+ // 9. set FINISHED state on the target and clear the state on the source
+ ZkNodeProps props = new ZkNodeProps(
Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
ZkStateReader.COLLECTION_PROP, targetCollection,
- REINDEXING_PROP, State.FINISHED.toLower());
+ REINDEXING_STATE, State.FINISHED.toLower());
ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
- results.add(State.FINISHED.toLower(), originalCollection);
+ reindexingState.put(STATE, State.FINISHED.toLower());
+ reindexingState.put(PHASE, "done");
+ removeReindexingState(collection);
} catch (Exception e) {
log.warn("Error during reindexing of " + originalCollection, e);
exc = e;
aborted = true;
- throw e;
} finally {
+ solrClientCache.close();
if (aborted) {
cleanup(collection, targetCollection, chkCollection, daemonUrl, targetCollection, createdTarget);
- results.add(State.ABORTED.toLower(), collection);
if (exc != null) {
results.add("error", exc.toString());
}
+ reindexingState.put(STATE, State.ABORTED.toLower());
}
+ results.add(REINDEX_STATUS, reindexingState);
+ }
+ }
+
+ private static final String REINDEXING_STATE_PATH = "/.reindexing";
+
+ private Map<String, Object> setReindexingState(String collection, State state, Map<String, Object> props) throws Exception {
+ String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + REINDEXING_STATE_PATH;
+ DistribStateManager stateManager = ocmh.cloudManager.getDistribStateManager();
+ Map<String, Object> copyProps = new HashMap<>();
+ if (props == null) { // retrieve existing props, if any
+ props = Utils.getJson(stateManager, path);
+ }
+ copyProps.putAll(props);
+ copyProps.put("state", state.toLower());
+ if (stateManager.hasData(path)) {
+ stateManager.setData(path, Utils.toJSON(copyProps), -1);
+ } else {
+ stateManager.makePath(path, Utils.toJSON(copyProps), CreateMode.PERSISTENT, false);
+ }
+ return copyProps;
+ }
+
+ private void removeReindexingState(String collection) throws Exception {
+ String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + REINDEXING_STATE_PATH;
+ DistribStateManager stateManager = ocmh.cloudManager.getDistribStateManager();
+ if (stateManager.hasData(path)) {
+ stateManager.removeData(path, -1);
+ }
+ }
+
+ @VisibleForTesting
+ public static Map<String, Object> getReindexingState(DistribStateManager stateManager, String collection) throws Exception {
+ String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + REINDEXING_STATE_PATH;
+ // make it modifiable
+ return new TreeMap<>(Utils.getJson(stateManager, path));
+ }
+
+ private long getNumberOfDocs(String collection) {
+ CloudSolrClient solrClient = solrClientCache.getCloudSolrClient(zkHost);
+ try {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.add(CommonParams.Q, "*:*");
+ params.add(CommonParams.ROWS, "0");
+ QueryResponse rsp = solrClient.query(collection, params);
+ return rsp.getResults().getNumFound();
+ } catch (Exception e) {
+ return 0L;
}
}
@@ -473,7 +585,8 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
log.info("## Aborting - collection {} no longer present.", collection);
return true;
}
- State state = State.get(coll.getStr(REINDEXING_PROP, State.RUNNING.toLower()));
+ Map<String, Object> reindexingState = getReindexingState(ocmh.cloudManager.getDistribStateManager(), collection);
+ State state = State.get(reindexingState.getOrDefault(STATE, State.RUNNING.toLower()));
if (state != State.ABORTED) {
return false;
}
@@ -528,7 +641,7 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
// XXX currently this is complicated to due a bug in the way the daemon 'list'
// XXX operation is implemented - see SOLR-13245. We need to query the actual
// XXX SolrCore where the daemon is running
- private void waitForDaemon(String daemonName, String daemonUrl, String collection) throws Exception {
+ private void waitForDaemon(String daemonName, String daemonUrl, String sourceCollection, String targetCollection, Map<String, Object> reindexingState) throws Exception {
HttpClient client = ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient();
try (HttpSolrClient solrClient = new HttpSolrClient.Builder()
.withHttpClient(client)
@@ -539,8 +652,10 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
q.set(CommonParams.DISTRIB, false);
QueryRequest req = new QueryRequest(q);
boolean isRunning;
+ int statusCheck = 0;
do {
isRunning = false;
+ statusCheck++;
try {
NamedList<Object> rsp = solrClient.request(req);
Map<String, Object> rs = (Map<String, Object>)rsp.get("result-set");
@@ -568,8 +683,12 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception waiting for daemon " +
daemonName + " at " + daemonUrl, e);
}
+ if (statusCheck % 5 == 0) {
+ reindexingState.put("processedDocs", getNumberOfDocs(targetCollection));
+ setReindexingState(sourceCollection, State.RUNNING, reindexingState);
+ }
ocmh.cloudManager.getTimeSource().sleep(2000);
- } while (isRunning && !maybeAbort(collection));
+ } while (isRunning && !maybeAbort(sourceCollection));
}
}
@@ -698,9 +817,8 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
ZkNodeProps props = new ZkNodeProps(
Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
ZkStateReader.COLLECTION_PROP, collection,
- // remove the rx flag, we already aborted
- REINDEXING_PROP, null,
ZkStateReader.READ_ONLY, null);
ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+ removeReindexingState(collection);
}
}
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 106eece..0772def 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -545,7 +545,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
REINDEXCOLLECTION_OP(REINDEXCOLLECTION, (req, rsp, h) -> {
Map<String, Object> m = copy(req.getParams().required(), null, NAME);
copy(req.getParams(), m,
- ReindexCollectionCmd.ABORT,
+ ReindexCollectionCmd.COMMAND,
ReindexCollectionCmd.REMOVE_SOURCE,
ReindexCollectionCmd.TARGET,
ZkStateReader.CONFIGNAME_PROP,
@@ -560,9 +560,13 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
CREATE_NODE_SET_SHUFFLE,
AUTO_ADD_REPLICAS,
"shards",
+ STATE_FORMAT,
CommonParams.ROWS,
CommonParams.Q,
CommonParams.FL);
+ if (req.getParams().get("collection." + ZkStateReader.CONFIGNAME_PROP) != null) {
+ m.put(ZkStateReader.CONFIGNAME_PROP, req.getParams().get("collection." + ZkStateReader.CONFIGNAME_PROP));
+ }
copyPropertiesWithPrefix(req.getParams(), m, "router.");
return m;
}),
diff --git a/solr/core/src/test/org/apache/solr/cloud/ReindexCollectionTest.java b/solr/core/src/test/org/apache/solr/cloud/ReindexCollectionTest.java
index 218476c..a9f9a22 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ReindexCollectionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ReindexCollectionTest.java
@@ -20,19 +20,20 @@ package org.apache.solr.cloud;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
-import org.apache.solr.client.solrj.impl.BaseHttpSolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.api.collections.ReindexCollectionCmd;
import org.apache.solr.common.SolrDocument;
-import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
@@ -67,15 +68,41 @@ public class ReindexCollectionTest extends SolrCloudTestCase {
private CloudSolrClient solrClient;
private SolrCloudManager cloudManager;
+ private DistribStateManager stateManager;
@Before
public void doBefore() throws Exception {
ZkController zkController = cluster.getJettySolrRunner(0).getCoreContainer().getZkController();
cloudManager = zkController.getSolrCloudManager();
+ stateManager = cloudManager.getDistribStateManager();
solrClient = new CloudSolrClientBuilder(Collections.singletonList(zkController.getZkServerAddress()),
Optional.empty()).build();
}
+ private ReindexCollectionCmd.State getState(String collection) {
+ try {
+ return ReindexCollectionCmd.State.get(ReindexCollectionCmd
+ .getReindexingState(stateManager, collection)
+ .get(ReindexCollectionCmd.STATE));
+ } catch (Exception e) {
+ fail("Unexpected exception checking state of " + collection + ": " + e);
+ return null;
+ }
+ }
+
+ private void waitForState(String collection, ReindexCollectionCmd.State expected) throws Exception {
+ TimeOut timeOut = new TimeOut(30, TimeUnit.SECONDS, cloudManager.getTimeSource());
+ ReindexCollectionCmd.State current = null;
+ while (!timeOut.hasTimedOut()) {
+ current = getState(collection);
+ if (expected == current) {
+ return;
+ }
+ timeOut.sleep(500);
+ }
+ throw new Exception("timeout waiting for state, current=" + current + ", expected=" + expected);
+ }
+
@After
public void doAfter() throws Exception {
cluster.deleteAllCollections(); // deletes aliases too
@@ -100,15 +127,19 @@ public class ReindexCollectionTest extends SolrCloudTestCase {
CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
.setTarget(targetCollection);
- req.process(solrClient);
+ CollectionAdminResponse rsp = req.process(solrClient);
+ assertNotNull(rsp.toString(), rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS));
+ Map<String, Object> status = (Map<String, Object>)rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS);
+ assertEquals(status.toString(), (long)NUM_DOCS, ((Number)status.get("inputDocs")).longValue());
+ assertEquals(status.toString(), (long)NUM_DOCS, ((Number)status.get("processedDocs")).longValue());
CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", targetCollection, (liveNodes, coll) -> {
- ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_PROP));
+ ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_STATE));
return ReindexCollectionCmd.State.FINISHED == state;
});
// verify the target docs exist
- QueryResponse rsp = solrClient.query(targetCollection, params(CommonParams.Q, "*:*"));
- assertEquals("copied num docs", NUM_DOCS, rsp.getResults().getNumFound());
+ QueryResponse queryResponse = solrClient.query(targetCollection, params(CommonParams.Q, "*:*"));
+ assertEquals("copied num docs", NUM_DOCS, queryResponse.getResults().getNumFound());
}
public void testSameTargetReindexing() throws Exception {
@@ -141,7 +172,7 @@ public class ReindexCollectionTest extends SolrCloudTestCase {
assertNotNull("target collection not present after 30s", realTargetCollection);
CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", realTargetCollection, (liveNodes, coll) -> {
- ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_PROP));
+ ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_STATE));
return ReindexCollectionCmd.State.FINISHED == state;
});
// verify the target docs exist
@@ -169,7 +200,7 @@ public class ReindexCollectionTest extends SolrCloudTestCase {
req.process(solrClient);
CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", targetCollection, (liveNodes, coll) -> {
- ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_PROP));
+ ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_STATE));
return ReindexCollectionCmd.State.FINISHED == state;
});
// verify the target docs exist
@@ -204,7 +235,7 @@ public class ReindexCollectionTest extends SolrCloudTestCase {
req.process(solrClient);
CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", targetCollection, (liveNodes, coll) -> {
- ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_PROP));
+ ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_STATE));
return ReindexCollectionCmd.State.FINISHED == state;
});
// verify the target docs exist
@@ -248,24 +279,15 @@ public class ReindexCollectionTest extends SolrCloudTestCase {
CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
.setTarget(targetCollection);
- try {
- req.process(solrClient);
- fail("succeeded but expected reindexing to fail due to the target collection already present");
- } catch (Exception e) {
- assertTrue(e instanceof BaseHttpSolrClient.RemoteSolrException);
- BaseHttpSolrClient.RemoteSolrException rse = (BaseHttpSolrClient.RemoteSolrException)e;
- assertEquals(SolrException.ErrorCode.SERVER_ERROR.code, rse.code());
- }
+ CollectionAdminResponse rsp = req.process(solrClient);
+ assertNotNull(rsp.getResponse().get("error"));
+ assertTrue(rsp.toString(), rsp.getResponse().get("error").toString().contains("already exists"));
+
req = CollectionAdminRequest.reindexCollection(sourceCollection)
.setTarget(aliasTarget);
- try {
- req.process(solrClient);
- fail("succeeded but expected reindexing to fail due to the target collection already present");
- } catch (Exception e) {
- assertTrue(e instanceof BaseHttpSolrClient.RemoteSolrException);
- BaseHttpSolrClient.RemoteSolrException rse = (BaseHttpSolrClient.RemoteSolrException)e;
- assertEquals(SolrException.ErrorCode.SERVER_ERROR.code, rse.code());
- }
+ rsp = req.process(solrClient);
+ assertNotNull(rsp.getResponse().get("error"));
+ assertTrue(rsp.toString(), rsp.getResponse().get("error").toString().contains("already exists"));
CollectionAdminRequest.deleteAlias(aliasTarget).process(solrClient);
CollectionAdminRequest.deleteCollection(targetCollection).process(solrClient);
@@ -274,14 +296,10 @@ public class ReindexCollectionTest extends SolrCloudTestCase {
.setTarget(targetCollection);
TestInjection.reindexFailure = "true:100";
- try {
- req.process(solrClient);
- fail("succeeded but expected reindexing to fail due to a test-injected failure");
- } catch (Exception e) {
- assertTrue(e instanceof BaseHttpSolrClient.RemoteSolrException);
- BaseHttpSolrClient.RemoteSolrException rse = (BaseHttpSolrClient.RemoteSolrException)e;
- assertEquals(SolrException.ErrorCode.SERVER_ERROR.code, rse.code());
- }
+ rsp = req.process(solrClient);
+ assertNotNull(rsp.getResponse().get("error"));
+ assertTrue(rsp.toString(), rsp.getResponse().get("error").toString().contains("waiting for daemon"));
+
// verify that the target and checkpoint collections don't exist
cloudManager.getClusterStateProvider().getClusterState().forEachCollection(coll -> {
assertFalse(coll.getName() + " still exists", coll.getName().startsWith(ReindexCollectionCmd.TARGET_COL_PREFIX));
@@ -291,7 +309,8 @@ public class ReindexCollectionTest extends SolrCloudTestCase {
CloudTestUtils.waitForState(cloudManager, "collection state is incorrect", sourceCollection,
((liveNodes, collectionState) ->
!collectionState.isReadOnly() &&
- collectionState.getStr(ReindexCollectionCmd.REINDEXING_PROP) == null));
+ collectionState.getStr(ReindexCollectionCmd.REINDEXING_STATE) == null &&
+ getState(sourceCollection) == null));
}
@Test
@@ -309,22 +328,32 @@ public class ReindexCollectionTest extends SolrCloudTestCase {
sourceCollection, (liveNodes, coll) -> coll.isReadOnly());
req = CollectionAdminRequest.reindexCollection(sourceCollection);
- req.setAbort(true);
- req.process(solrClient);
+ req.setCommand("abort");
+ CollectionAdminResponse rsp = req.process(solrClient);
+ Map<String, Object> status = (Map<String, Object>)rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS);
+ assertNotNull(rsp.toString(), status);
+ assertEquals(status.toString(), "aborting", status.get("state"));
+
CloudTestUtils.waitForState(cloudManager, "incorrect collection state", sourceCollection,
((liveNodes, collectionState) ->
collectionState.isReadOnly() &&
- ReindexCollectionCmd.State.ABORTED.toLower().equals(collectionState.getStr(ReindexCollectionCmd.REINDEXING_PROP))));
+ getState(sourceCollection) == ReindexCollectionCmd.State.ABORTED));
+
+ // verify status
+ req.setCommand("status");
+ rsp = req.process(solrClient);
+ status = (Map<String, Object>)rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS);
+ assertNotNull(rsp.toString(), status);
+ assertEquals(status.toString(), "aborted", status.get("state"));
// let the process continue
TestInjection.reindexLatch.countDown();
CloudTestUtils.waitForState(cloudManager, "source collection is in wrong state",
- sourceCollection, (liveNodes, docCollection) -> {
- System.err.println("-- coll " + docCollection);
- return !docCollection.isReadOnly() && docCollection.getStr(ReindexCollectionCmd.REINDEXING_PROP) == null;
- });
+ sourceCollection, (liveNodes, docCollection) -> !docCollection.isReadOnly() && getState(sourceCollection) == null);
// verify the response
- CollectionAdminRequest.RequestStatusResponse rsp = CollectionAdminRequest.requestStatus(asyncId).process(solrClient);
- rsp.getRequestStatus();
+ rsp = CollectionAdminRequest.requestStatus(asyncId).process(solrClient);
+ status = (Map<String, Object>)rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS);
+ assertNotNull(rsp.toString(), status);
+ assertEquals(status.toString(), "aborted", status.get("state"));
}
private void createCollection(String name, String config, int numShards, int numReplicas) throws Exception {
diff --git a/solr/solr-ref-guide/src/collections-api.adoc b/solr/solr-ref-guide/src/collections-api.adoc
index a66ddfa..48bef7e 100644
--- a/solr/solr-ref-guide/src/collections-api.adoc
+++ b/solr/solr-ref-guide/src/collections-api.adoc
@@ -198,6 +198,7 @@ The attributes that can be modified are:
See the <<create,CREATE action>> section above for details on these attributes.
+[[readonlymode]]
==== Read-only mode
Setting the `readOnly` attribute to `true` puts the collection in read-only mode,
in which any index update requests are rejected. Other collection-level actions (eg. adding /
@@ -218,6 +219,125 @@ NOTE: This may potentially take a long time if there are still major segment mer
Removing the `readOnly` property or setting it to false enables the
processing of updates and reloads the collection.
+[[reindexcollection]]
+== REINDEXCOLLECTION: Re-index a Collection
+
+`/admin/collections?action=REINDEXCOLLECTION&name=_name_`
+
+The REINDEXCOLLECTION command re-indexes a collection using existing data from the
+source collection.
+
+NOTE: Re-indexing is potentially a lossy operation - some of the existing indexed data that is not
+available as stored fields may be lost, so users should use this command
+with caution, evaluating the potential impact by using different source and target
+collection names first, and preserving the source collection until the evaluation is
+complete.
+
+The target collection must not exist (and may not be an alias). If the target
+collection name is the same as the source collection then first a unique sequential name
+will be generated for the target collection, and then after re-indexing is done an alias
+will be created that points from the source name to the actual sequentially-named target collection.
+
+When re-indexing is started the source collection is put in <<readonlymode,read-only mode>> to ensure that
+all source documents are properly processed.
+
+Using optional parameters a different index schema, collection shape (number of shards and replicas)
+or routing parameters can be requested for the target collection.
+
+Re-indexing is executed as a streaming expression daemon, which runs on one of the
+source collection's replicas. It is usually a time-consuming operation so it's recommended to execute
+it as an asynchronous request in order to avoid request time outs. Only one re-indexing operation may
+execute concurrently for a given source collection. Long-running, erroneous or crashed re-indexing
+operations may be terminated by using the `abort` option, which also removes partial results.
+
+=== REINDEXCOLLECTION Parameters
+
+`name`::
+Source collection name, may be an alias. This parameter is required.
+
+`cmd`::
+Optional command. Default command is `start`. Currently supported commands are:
+* `start` - default, starts processing if not already running,
+* `abort` - aborts an already running re-indexing (or clears a left-over status after a crash),
+and deletes partial results,
+* `status` - returns detailed status of a running re-indexing command.
+
+`target`::
+Target collection name, optional. If not specified a unique name will be generated and
+after all documents have been copied an alias will be created that points from the source
+collection name to the unique sequentially-named collection, effectively "hiding"
+the original source collection from regular update and search operations.
+
+`q`::
+Optional query to select documents for re-indexing. Default value is `\*:*`.
+
+`fl`::
+Optional list of fields to re-index. Default value is `*`.
+
+`rows`::
+Documents are transferred in batches. Depending on the average size of the document large
+batch sizes may cause memory issues. Default value is 100.
+
+`configName`::
+`collection.configName`::
+Optional name of the configset for the target collection. Default is the same as the
+source collection.
+
+There's a number of optional parameters that determine the target collection layout. If they
+are not specified in the request then their values are copied from the source collection.
+The following parameters are currently supported (described in details in the <<create,CREATE collection>> section):
+`numShards`, `replicationFactor`, `nrtReplicas`, `tlogReplicas`, `pullReplicas`, `maxShardsPerNode`,
+`autoAddReplicas`, `shards`, `policy`, `createNodeSet`, `createNodeSet.shuffle`, `router.*`.
+
+`removeSource`::
+Optional boolean. If true then after the processing is successfully finished the source collection will
+be deleted.
+
+`async`::
+Optional request ID to track this action which will be <<Asynchronous Calls,processed asynchronously>>.
+
+When the re-indexing process has completed the target collection is marked using
+`property.rx: "finished"`, and the source collection state is updated to become read-write.
+On any errors the command will delete any temporary and target collections and also reset the
+state of the source collection's read-only flag.
+
+=== Examples using REINDEXCOLLECTION
+
+*Input*
+
+[source,text]
+----
+http://localhost:8983/solr/admin/collections?action=REINDEXCOLLECTION&name=newCollection&numShards=3&configName=conf2&q=id:aa*&fl=id,string_s
+----
+This request specifies a different schema for the target collection, copies only some of the fields, selects only the documents
+matching a query, and also potentially re-shapes the collection by explicitly specifying 3 shards. Since the target collection
+hasn't been specified in the parameters a collection with a unique name eg. `.rx_newCollection_2` will be created and on success
+an alias pointing from `newCollection -> .rx_newCollection_2` will be created, effectively replacing the source collection
+for the purpose of indexing and searching. The source collection is assumed to be small so a synchronous request was made.
+
+*Output*
+
+[source,json]
+----
+{
+ "responseHeader":{
+ "status":0,
+ "QTime":10757},
+ "reindexStatus":{
+ "phase":"done",
+ "inputDocs":13416,
+ "processedDocs":376,
+ "actualSourceCollection":".rx_newCollection_1",
+ "state":"finished",
+ "actualTargetCollection":".rx_newCollection_2",
+ "checkpointCollection":".rx_ck_newCollection"
+ }
+}
+----
+As a result a new collection `.rx_newCollection_2` has been created, with selected documents re-indexed to 3 shards, and
+with an alias pointing from `newCollection` to this one. The status also shows that the source collection
+was already an alias to `.rx_newCollection_1`, which was likely a result of a previous re-indexing.
+
[[reload]]
== RELOAD: Reload a Collection
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
index 9d02ec2..ec56bfe 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
@@ -51,7 +52,7 @@ public class DaemonStream extends TupleStream implements Expressible {
private ArrayBlockingQueue<Tuple> queue;
private int queueSize;
private boolean eatTuples;
- private long iterations;
+ private AtomicLong iterations = new AtomicLong();
private long startTime;
private long stopTime;
private Exception exception;
@@ -240,7 +241,7 @@ public class DaemonStream extends TupleStream implements Expressible {
tuple.put(ID, id);
tuple.put("startTime", startTime);
tuple.put("stopTime", stopTime);
- tuple.put("iterations", iterations);
+ tuple.put("iterations", iterations.get());
tuple.put("state", streamRunner.getState().toString());
if(exception != null) {
tuple.put("exception", exception.getMessage());
@@ -253,10 +254,6 @@ public class DaemonStream extends TupleStream implements Expressible {
this.daemons = daemons;
}
- private synchronized void incrementIterations() {
- ++iterations;
- }
-
private synchronized void setStartTime(long startTime) {
this.startTime = startTime;
}
@@ -332,7 +329,7 @@ public class DaemonStream extends TupleStream implements Expressible {
log.error("Error in DaemonStream:" + id, e);
++errors;
if (errors > 100) {
- log.error("Too many consectutive errors. Stopping DaemonStream:" + id);
+ log.error("Too many consecutive errors. Stopping DaemonStream:" + id);
break OUTER;
}
} catch (Throwable t) {
@@ -351,7 +348,7 @@ public class DaemonStream extends TupleStream implements Expressible {
}
}
}
- incrementIterations();
+ iterations.incrementAndGet();
if (sleepMillis > 0) {
try {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index d0ae477..799f7cf 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -796,7 +796,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
String fields;
String configName;
Boolean removeSource;
- Boolean abort;
+ String cmd;
Integer batchSize;
Map<String, Object> collectionParams = new HashMap<>();
@@ -810,9 +810,9 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
return this;
}
- /** Set to true to abort already running requests. */
- public ReindexCollection setAbort(boolean abort) {
- this.abort = abort;
+ /** Set optional command (eg. abort, status). */
+ public ReindexCollection setCommand(String command) {
+ this.cmd = command;
return this;
}
@@ -856,7 +856,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
public SolrParams getParams() {
ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
params.setNonNull("target", target);
- params.setNonNull("abort", abort);
+ params.setNonNull("cmd", cmd);
params.setNonNull(ZkStateReader.CONFIGNAME_PROP, configName);
params.setNonNull(CommonParams.Q, query);
params.setNonNull(CommonParams.FL, fields);
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/Utils.java b/solr/solrj/src/java/org/apache/solr/common/util/Utils.java
index d079052..50e7c0c 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/Utils.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/Utils.java
@@ -40,6 +40,7 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
@@ -571,7 +572,7 @@ public class Utils {
VersionedData data = null;
try {
data = distribStateManager.getData(path);
- } catch (KeeperException.NoNodeException e) {
+ } catch (KeeperException.NoNodeException | NoSuchElementException e) {
return Collections.emptyMap();
}
if (data == null || data.getData() == null || data.getData().length == 0) return Collections.emptyMap();