You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2019/03/19 12:47:18 UTC
[lucene-solr] branch master updated: SOLR-11127: REINDEXCOLLECTION
command for re-indexing of existing collections.
This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/master by this push:
new 6f2b7bf SOLR-11127: REINDEXCOLLECTION command for re-indexing of existing collections.
6f2b7bf is described below
commit 6f2b7bf5c0144f19572b54eed4fc340c13cf8c2a
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Tue Mar 19 13:41:11 2019 +0100
SOLR-11127: REINDEXCOLLECTION command for re-indexing of existing collections.
---
solr/CHANGES.txt | 4 +
.../src/java/org/apache/solr/cloud/Overseer.java | 136 +++-
.../cloud/api/collections/DeleteCollectionCmd.java | 2 +-
.../OverseerCollectionMessageHandler.java | 1 +
.../api/collections/ReindexCollectionCmd.java | 824 +++++++++++++++++++++
.../org/apache/solr/handler/StreamHandler.java | 4 +-
.../org/apache/solr/handler/admin/ColStatus.java | 3 +
.../solr/handler/admin/CollectionsHandler.java | 31 +
.../solr/schema/ManagedIndexSchemaFactory.java | 8 +-
.../java/org/apache/solr/util/TestInjection.java | 35 +
.../apache/solr/cloud/ReindexCollectionTest.java | 379 ++++++++++
.../solr/cloud/SystemCollectionCompatTest.java | 208 ++++++
solr/solr-ref-guide/src/collections-api.adoc | 120 +++
.../solr/client/solrj/io/stream/DaemonStream.java | 13 +-
.../solrj/request/CollectionAdminRequest.java | 92 ++-
.../solr/common/cloud/CompositeIdRouter.java | 5 +
.../org/apache/solr/common/cloud/DocRouter.java | 1 +
.../solr/common/cloud/ImplicitDocRouter.java | 5 +
.../apache/solr/common/cloud/PlainIdRouter.java | 5 +
.../solr/common/params/CollectionParams.java | 4 +-
.../java/org/apache/solr/common/util/Utils.java | 3 +-
21 files changed, 1865 insertions(+), 18 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 1fca619..f90bc3b 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -86,6 +86,10 @@ New Features
* SOLR-13292: Provide extended per-segment status of a collection. (ab)
+* SOLR-11127: REINDEXCOLLECTION command for re-indexing of existing collections. This issue also adds
+ a back-compat check of the .system collection to notify users of potential compatibility issues after
+ upgrades or schema changes. (ab)
+
Bug Fixes
----------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 91b7e74..a89926f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -21,15 +21,23 @@ import static org.apache.solr.common.params.CommonParams.ID;
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
+import java.util.function.BiConsumer;
+import org.apache.lucene.util.Version;
+import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
import org.apache.solr.cloud.autoscaling.OverseerTriggerThread;
import org.apache.solr.cloud.overseer.ClusterStateMutator;
@@ -45,11 +53,15 @@ import org.apache.solr.common.SolrCloseable;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.ConnectionManager;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.util.IOUtils;
+import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.Utils;
@@ -524,6 +536,8 @@ public class Overseer implements SolrCloseable {
private Stats stats;
private String id;
private volatile boolean closed;
+ private volatile boolean systemCollCompatCheck = true;
+
private CloudConfig config;
// overseer not responsible for closing reader
@@ -570,10 +584,130 @@ public class Overseer implements SolrCloseable {
updaterThread.start();
ccThread.start();
triggerThread.start();
-
+
+ systemCollectionCompatCheck(new BiConsumer<String, Object>() {
+ boolean firstPair = true;
+ @Override
+ public void accept(String s, Object o) {
+ if (firstPair) {
+ log.warn("WARNING: Collection '.system' may need re-indexing due to compatibility issues listed below. See REINDEXCOLLECTION documentation for more details.");
+ firstPair = false;
+ }
+ log.warn("WARNING: *\t{}:\t{}", s, o);
+ }
+ });
+
assert ObjectReleaseTracker.track(this);
}
+ public void systemCollectionCompatCheck(final BiConsumer<String, Object> consumer) {
+ ClusterState clusterState = zkController.getClusterState();
+ if (clusterState == null) {
+ log.warn("Unable to check back-compat of .system collection - can't obtain ClusterState.");
+ return;
+ }
+ DocCollection coll = clusterState.getCollectionOrNull(CollectionAdminParams.SYSTEM_COLL);
+ if (coll == null) {
+ return;
+ }
+ // check that all shard leaders are active
+ boolean allActive = true;
+ for (Slice s : coll.getActiveSlices()) {
+ if (s.getLeader() == null || !s.getLeader().isActive(clusterState.getLiveNodes())) {
+ allActive = false;
+ break;
+ }
+ }
+ if (allActive) {
+ doCompatCheck(consumer);
+ } else {
+ // wait for all leaders to become active and then check
+ zkController.zkStateReader.registerCollectionStateWatcher(CollectionAdminParams.SYSTEM_COLL, (liveNodes, state) -> {
+ boolean active = true;
+ if (state == null || liveNodes.isEmpty()) {
+ return true;
+ }
+ for (Slice s : state.getActiveSlices()) {
+ if (s.getLeader() == null || !s.getLeader().isActive(liveNodes)) {
+ active = false;
+ break;
+ }
+ }
+ if (active) {
+ doCompatCheck(consumer);
+ }
+ return active;
+ });
+ }
+ }
+
+ private void doCompatCheck(BiConsumer<String, Object> consumer) {
+ if (systemCollCompatCheck) {
+ systemCollCompatCheck = false;
+ } else {
+ return;
+ }
+ try (CloudSolrClient client = new CloudSolrClient.Builder(Collections.singletonList(getZkController().getZkServerAddress()), Optional.empty())
+ .withSocketTimeout(30000).withConnectionTimeout(15000)
+ .withHttpClient(updateShardHandler.getDefaultHttpClient()).build()) {
+ CollectionAdminRequest.ColStatus req = CollectionAdminRequest.collectionStatus(CollectionAdminParams.SYSTEM_COLL)
+ .setWithSegments(true)
+ .setWithFieldInfo(true);
+ CollectionAdminResponse rsp = req.process(client);
+ NamedList<Object> status = (NamedList<Object>)rsp.getResponse().get(CollectionAdminParams.SYSTEM_COLL);
+ Collection<String> nonCompliant = (Collection<String>)status.get("schemaNonCompliant");
+ if (!nonCompliant.contains("(NONE)")) {
+ consumer.accept("indexFieldsNotMatchingSchema", nonCompliant);
+ }
+ Set<Integer> segmentCreatedMajorVersions = new HashSet<>();
+ Set<String> segmentVersions = new HashSet<>();
+ int currentMajorVersion = Version.LATEST.major;
+ String currentVersion = Version.LATEST.toString();
+ segmentVersions.add(currentVersion);
+ segmentCreatedMajorVersions.add(currentMajorVersion);
+ NamedList<Object> shards = (NamedList<Object>)status.get("shards");
+ for (Map.Entry<String, Object> entry : shards) {
+ NamedList<Object> leader = (NamedList<Object>)((NamedList<Object>)entry.getValue()).get("leader");
+ if (leader == null) {
+ continue;
+ }
+ NamedList<Object> segInfos = (NamedList<Object>)leader.get("segInfos");
+ if (segInfos == null) {
+ continue;
+ }
+ NamedList<Object> infos = (NamedList<Object>)segInfos.get("info");
+ if (((Number)infos.get("numSegments")).intValue() > 0) {
+ segmentVersions.add(infos.get("minSegmentLuceneVersion").toString());
+ }
+ if (infos.get("commitLuceneVersion") != null) {
+ segmentVersions.add(infos.get("commitLuceneVersion").toString());
+ }
+ NamedList<Object> segmentInfos = (NamedList<Object>)segInfos.get("segments");
+ segmentInfos.forEach((k, v) -> {
+ NamedList<Object> segment = (NamedList<Object>)v;
+ segmentVersions.add(segment.get("version").toString());
+ if (segment.get("minVersion") != null) {
+ segmentVersions.add(segment.get("version").toString());
+ }
+ if (segment.get("createdVersionMajor") != null) {
+ segmentCreatedMajorVersions.add(((Number)segment.get("createdVersionMajor")).intValue());
+ }
+ });
+ }
+ if (segmentVersions.size() > 1) {
+ consumer.accept("differentSegmentVersions", segmentVersions);
+ consumer.accept("currentLuceneVersion", currentVersion);
+ }
+ if (segmentCreatedMajorVersions.size() > 1) {
+ consumer.accept("differentMajorSegmentVersions", segmentCreatedMajorVersions);
+ consumer.accept("currentLuceneMajorVersion", currentMajorVersion);
+ }
+
+ } catch (SolrServerException | IOException e) {
+ log.warn("Unable to perform back-compat check of .system collection", e);
+ }
+ }
+
public Stats getStats() {
return stats;
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
index e5f6f2d..7177f03 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -181,7 +181,7 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
}
}
- private String referencedByAlias(String collection, Aliases aliases) {
+ public static String referencedByAlias(String collection, Aliases aliases) {
Objects.requireNonNull(aliases);
return aliases.getCollectionAliasListMap().entrySet().stream()
.filter(e -> e.getValue().contains(collection))
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index de7b3eb..a1bd826 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -241,6 +241,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
.put(DELETEREPLICA, new DeleteReplicaCmd(this))
.put(ADDREPLICA, new AddReplicaCmd(this))
.put(MOVEREPLICA, new MoveReplicaCmd(this))
+ .put(REINDEXCOLLECTION, new ReindexCollectionCmd(this))
.put(UTILIZENODE, new UtilizeNodeCmd(this))
.build()
;
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
new file mode 100644
index 0000000..553c4bf
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
@@ -0,0 +1,824 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.http.client.HttpClient;
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.cloud.DistribStateManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.io.SolrClientCache;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Aliases;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CollectionAdminParams;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.CommonAdminParams;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.util.TestInjection;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Reindex a collection, usually in order to change the index schema.
+ * <p>WARNING: Reindexing is potentially a lossy operation - some indexed data that is not available as
+ * stored fields may be irretrievably lost, so users should use this command with caution, evaluating
+ * the potential impact by using different source and target collection names first, and preserving
+ * the source collection until the evaluation is complete.</p>
+ * <p>Reindexing follows these steps:</p>
+ * <ol>
+ * <li>creates a temporary collection using the most recent schema of the source collection
+ * (or the one specified in the parameters, which must already exist), and the shape of the original
+ * collection, unless overridden by parameters.</li>
+ * <li>copy the source documents to the temporary collection, using their stored fields and
+ * reindexing them using the specified schema. NOTE: some data
+ * loss may occur if the original stored field data is not available!</li>
+ * <li>create the target collection from scratch with the specified name (or the same as source if not
+ * specified) and the specified parameters. NOTE: if the target name was not specified or is the same
+ * as the source collection then a unique sequential collection name will be used.</li>
+ * <li>copy the documents from the source collection to the target collection.</li>
+ * <li>if the source and target collection name was the same then set up an alias pointing from the source collection name to the actual
+ * (sequentially named) target collection</li>
+ * <li>optionally delete the source collection.</li>
+ * </ol>
+ */
+public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cmd {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static final String COMMAND = "cmd";
+ public static final String REINDEX_STATUS = "reindexStatus";
+ public static final String REMOVE_SOURCE = "removeSource";
+ public static final String TARGET = "target";
+ public static final String TARGET_COL_PREFIX = ".rx_";
+ public static final String CHK_COL_PREFIX = ".rx_ck_";
+ public static final String REINDEXING_STATE = CollectionAdminRequest.PROPERTY_PREFIX + "rx";
+
+ public static final String STATE = "state";
+ public static final String PHASE = "phase";
+
+ private static final List<String> COLLECTION_PARAMS = Arrays.asList(
+ ZkStateReader.CONFIGNAME_PROP,
+ ZkStateReader.NUM_SHARDS_PROP,
+ ZkStateReader.NRT_REPLICAS,
+ ZkStateReader.PULL_REPLICAS,
+ ZkStateReader.TLOG_REPLICAS,
+ ZkStateReader.REPLICATION_FACTOR,
+ ZkStateReader.MAX_SHARDS_PER_NODE,
+ "shards",
+ Policy.POLICY,
+ CollectionAdminParams.CREATE_NODE_SET_PARAM,
+ CollectionAdminParams.CREATE_NODE_SET_SHUFFLE_PARAM,
+ ZkStateReader.AUTO_ADD_REPLICAS
+ );
+
+ private final OverseerCollectionMessageHandler ocmh;
+
+ private static AtomicInteger tmpCollectionSeq = new AtomicInteger();
+
+ public enum State {
+ IDLE,
+ RUNNING,
+ ABORTED,
+ FINISHED;
+
+ public String toLower() {
+ return toString().toLowerCase(Locale.ROOT);
+ }
+
+ public static State get(Object p) {
+ if (p == null) {
+ return null;
+ }
+ p = String.valueOf(p).toLowerCase(Locale.ROOT);
+ return states.get(p);
+ }
+ static Map<String, State> states = Collections.unmodifiableMap(
+ Stream.of(State.values()).collect(Collectors.toMap(State::toLower, Function.identity())));
+ }
+
+ public enum Cmd {
+ START,
+ ABORT,
+ STATUS;
+
+ public String toLower() {
+ return toString().toLowerCase(Locale.ROOT);
+ }
+
+ public static Cmd get(String p) {
+ if (p == null) {
+ return null;
+ }
+ p = p.toLowerCase(Locale.ROOT);
+ return cmds.get(p);
+ }
+ static Map<String, Cmd> cmds = Collections.unmodifiableMap(
+ Stream.of(Cmd.values()).collect(Collectors.toMap(Cmd::toLower, Function.identity())));
+ }
+
+ private SolrClientCache solrClientCache;
+ private String zkHost;
+
+ public ReindexCollectionCmd(OverseerCollectionMessageHandler ocmh) {
+ this.ocmh = ocmh;
+ }
+
+ @Override
+ public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+
+ log.debug("*** called: {}", message);
+
+ String collection = message.getStr(CommonParams.NAME);
+ // before resolving aliases
+ String originalCollection = collection;
+ Aliases aliases = ocmh.zkStateReader.getAliases();
+ if (collection != null) {
+ // resolve aliases - the source may be an alias
+ List<String> aliasList = aliases.resolveAliases(collection);
+ if (aliasList != null && !aliasList.isEmpty()) {
+ collection = aliasList.get(0);
+ }
+ }
+
+ if (collection == null || !clusterState.hasCollection(collection)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Source collection name must be specified and must exist");
+ }
+ String target = message.getStr(TARGET);
+ if (target == null) {
+ target = collection;
+ } else {
+ // resolve aliases
+ List<String> aliasList = aliases.resolveAliases(target);
+ if (aliasList != null && !aliasList.isEmpty()) {
+ target = aliasList.get(0);
+ }
+ }
+ boolean sameTarget = target.equals(collection) || target.equals(originalCollection);
+ boolean removeSource = message.getBool(REMOVE_SOURCE, false);
+ Cmd command = Cmd.get(message.getStr(COMMAND, Cmd.START.toLower()));
+ if (command == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown command: " + message.getStr(COMMAND));
+ }
+ Map<String, Object> reindexingState = getReindexingState(ocmh.cloudManager.getDistribStateManager(), collection);
+ if (!reindexingState.containsKey(STATE)) {
+ reindexingState.put(STATE, State.IDLE.toLower());
+ }
+ State state = State.get(reindexingState.get(STATE));
+ if (command == Cmd.ABORT) {
+ log.info("Abort requested for collection {}, setting the state to ABORTED.", collection);
+ // check that it's running
+ if (state != State.RUNNING) {
+ log.debug("Abort requested for collection {} but command is not running: {}", collection, state);
+ return;
+ }
+ setReindexingState(collection, State.ABORTED, null);
+ reindexingState.put(STATE, "aborting");
+ results.add(REINDEX_STATUS, reindexingState);
+ // if needed the cleanup will be performed by the running instance of the command
+ return;
+ } else if (command == Cmd.STATUS) {
+ results.add(REINDEX_STATUS, reindexingState);
+ return;
+ }
+ // command == Cmd.START
+
+ // check it's not already running
+ if (state == State.RUNNING) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Reindex is already running for collection " + collection +
+ ". If you are sure this is not the case you can issue &cmd=abort to clean up this state.");
+ }
+
+ DocCollection coll = clusterState.getCollection(collection);
+ boolean aborted = false;
+ int batchSize = message.getInt(CommonParams.ROWS, 100);
+ String query = message.getStr(CommonParams.Q, "*:*");
+ String fl = message.getStr(CommonParams.FL, "*");
+ Integer rf = message.getInt(ZkStateReader.REPLICATION_FACTOR, coll.getReplicationFactor());
+ Integer numNrt = message.getInt(ZkStateReader.NRT_REPLICAS, coll.getNumNrtReplicas());
+ Integer numTlog = message.getInt(ZkStateReader.TLOG_REPLICAS, coll.getNumTlogReplicas());
+ Integer numPull = message.getInt(ZkStateReader.PULL_REPLICAS, coll.getNumPullReplicas());
+ int numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, coll.getActiveSlices().size());
+ int maxShardsPerNode = message.getInt(ZkStateReader.MAX_SHARDS_PER_NODE, coll.getMaxShardsPerNode());
+ DocRouter router = coll.getRouter();
+ if (router == null) {
+ router = DocRouter.DEFAULT;
+ }
+
+ String configName = message.getStr(ZkStateReader.CONFIGNAME_PROP, ocmh.zkStateReader.readConfigName(collection));
+ String targetCollection;
+ int seq = tmpCollectionSeq.getAndIncrement();
+ if (sameTarget) {
+ do {
+ targetCollection = TARGET_COL_PREFIX + originalCollection + "_" + seq;
+ if (!clusterState.hasCollection(targetCollection)) {
+ break;
+ }
+ seq = tmpCollectionSeq.getAndIncrement();
+ } while (clusterState.hasCollection(targetCollection));
+ } else {
+ targetCollection = target;
+ }
+ String chkCollection = CHK_COL_PREFIX + originalCollection;
+ String daemonUrl = null;
+ Exception exc = null;
+ boolean createdTarget = false;
+ try {
+ solrClientCache = new SolrClientCache(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
+ zkHost = ocmh.zkStateReader.getZkClient().getZkServerAddress();
+ // set the running flag
+ reindexingState.clear();
+ reindexingState.put("actualSourceCollection", collection);
+ reindexingState.put("actualTargetCollection", targetCollection);
+ reindexingState.put("checkpointCollection", chkCollection);
+ reindexingState.put("inputDocs", getNumberOfDocs(collection));
+ reindexingState.put(PHASE, "creating target and checkpoint collections");
+ setReindexingState(collection, State.RUNNING, reindexingState);
+
+ // 0. set up target and checkpoint collections
+ NamedList<Object> cmdResults = new NamedList<>();
+ ZkNodeProps cmd;
+ if (clusterState.hasCollection(targetCollection)) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Target collection " + targetCollection + " already exists! Delete it first.");
+ }
+ if (clusterState.hasCollection(chkCollection)) {
+ // delete the checkpoint collection
+ cmd = new ZkNodeProps(
+ Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.DELETE.toLower(),
+ CommonParams.NAME, chkCollection,
+ CoreAdminParams.DELETE_METRICS_HISTORY, "true"
+ );
+ ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
+ checkResults("deleting old checkpoint collection " + chkCollection, cmdResults, true);
+ }
+
+ if (maybeAbort(collection)) {
+ aborted = true;
+ return;
+ }
+
+ Map<String, Object> propMap = new HashMap<>();
+ propMap.put(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower());
+ propMap.put(CommonParams.NAME, targetCollection);
+ propMap.put(ZkStateReader.NUM_SHARDS_PROP, numShards);
+ propMap.put(CollectionAdminParams.COLL_CONF, configName);
+ // init first from the same router
+ propMap.put("router.name", router.getName());
+ for (String key : coll.keySet()) {
+ if (key.startsWith("router.")) {
+ propMap.put(key, coll.get(key));
+ }
+ }
+ // then apply overrides if present
+ for (String key : message.keySet()) {
+ if (key.startsWith("router.")) {
+ propMap.put(key, message.getStr(key));
+ } else if (COLLECTION_PARAMS.contains(key)) {
+ propMap.put(key, message.get(key));
+ }
+ }
+
+ propMap.put(ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode);
+ propMap.put(CommonAdminParams.WAIT_FOR_FINAL_STATE, true);
+ propMap.put(DocCollection.STATE_FORMAT, message.getInt(DocCollection.STATE_FORMAT, coll.getStateFormat()));
+ if (rf != null) {
+ propMap.put(ZkStateReader.REPLICATION_FACTOR, rf);
+ }
+ if (numNrt != null) {
+ propMap.put(ZkStateReader.NRT_REPLICAS, numNrt);
+ }
+ if (numTlog != null) {
+ propMap.put(ZkStateReader.TLOG_REPLICAS, numTlog);
+ }
+ if (numPull != null) {
+ propMap.put(ZkStateReader.PULL_REPLICAS, numPull);
+ }
+ // create the target collection
+ cmd = new ZkNodeProps(propMap);
+ cmdResults = new NamedList<>();
+ ocmh.commandMap.get(CollectionParams.CollectionAction.CREATE).call(clusterState, cmd, cmdResults);
+ createdTarget = true;
+ checkResults("creating target collection " + targetCollection, cmdResults, true);
+
+ // create the checkpoint collection - use RF=1 and 1 shard
+ cmd = new ZkNodeProps(
+ Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
+ CommonParams.NAME, chkCollection,
+ ZkStateReader.NUM_SHARDS_PROP, "1",
+ ZkStateReader.REPLICATION_FACTOR, "1",
+ DocCollection.STATE_FORMAT, "2",
+ CollectionAdminParams.COLL_CONF, "_default",
+ CommonAdminParams.WAIT_FOR_FINAL_STATE, "true"
+ );
+ cmdResults = new NamedList<>();
+ ocmh.commandMap.get(CollectionParams.CollectionAction.CREATE).call(clusterState, cmd, cmdResults);
+ checkResults("creating checkpoint collection " + chkCollection, cmdResults, true);
+ // wait for a while until we see both collections
+ TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS, ocmh.timeSource);
+ boolean created = false;
+ while (!waitUntil.hasTimedOut()) {
+ waitUntil.sleep(100);
+ // this also refreshes our local var clusterState
+ clusterState = ocmh.cloudManager.getClusterStateProvider().getClusterState();
+ created = clusterState.hasCollection(targetCollection) && clusterState.hasCollection(chkCollection);
+ if (created) break;
+ }
+ if (!created) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not fully create temporary collection(s)");
+ }
+ if (maybeAbort(collection)) {
+ aborted = true;
+ return;
+ }
+
+ // 1. put the source collection in read-only mode
+ cmd = new ZkNodeProps(
+ Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
+ ZkStateReader.COLLECTION_PROP, collection,
+ ZkStateReader.READ_ONLY, "true");
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(cmd));
+
+ TestInjection.injectReindexLatch();
+
+ if (maybeAbort(collection)) {
+ aborted = true;
+ return;
+ }
+
+ // 2. copy the documents to target
+ // Recipe taken from: http://joelsolr.blogspot.com/2016/10/solr-63-batch-jobs-parallel-etl-and.html
+ ModifiableSolrParams q = new ModifiableSolrParams();
+ q.set(CommonParams.QT, "/stream");
+ q.set("collection", collection);
+ q.set("expr",
+ "daemon(id=\"" + targetCollection + "\"," +
+ "terminate=\"true\"," +
+ "commit(" + targetCollection + "," +
+ "update(" + targetCollection + "," +
+ "batchSize=" + batchSize + "," +
+ "topic(" + chkCollection + "," +
+ collection + "," +
+ "q=\"" + query + "\"," +
+ "fl=\"" + fl + "\"," +
+ "id=\"topic_" + targetCollection + "\"," +
+ // some of the documents eg. in .system contain large blobs
+ "rows=\"" + batchSize + "\"," +
+ "initialCheckpoint=\"0\"))))");
+ log.debug("- starting copying documents from " + collection + " to " + targetCollection);
+ SolrResponse rsp = null;
+ try {
+ rsp = ocmh.cloudManager.request(new QueryRequest(q));
+ } catch (Exception e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to copy documents from " +
+ collection + " to " + targetCollection, e);
+ }
+ daemonUrl = getDaemonUrl(rsp, coll);
+ if (daemonUrl == null) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to copy documents from " +
+ collection + " to " + targetCollection + ": " + Utils.toJSONString(rsp));
+ }
+ reindexingState.put("daemonUrl", daemonUrl);
+ reindexingState.put("daemonName", targetCollection);
+ reindexingState.put(PHASE, "copying documents");
+ setReindexingState(collection, State.RUNNING, reindexingState);
+
+ // wait for the daemon to finish
+ waitForDaemon(targetCollection, daemonUrl, collection, targetCollection, reindexingState);
+ if (maybeAbort(collection)) {
+ aborted = true;
+ return;
+ }
+ log.debug("- finished copying from " + collection + " to " + targetCollection);
+ // fail here or earlier during daemon run
+ TestInjection.injectReindexFailure();
+
+ // 5. if (sameTarget) set up an alias to use targetCollection as the source name
+ if (sameTarget) {
+ log.debug("- setting up alias from " + originalCollection + " to " + targetCollection);
+ cmd = new ZkNodeProps(
+ CommonParams.NAME, originalCollection,
+ "collections", targetCollection);
+ cmdResults = new NamedList<>();
+ ocmh.commandMap.get(CollectionParams.CollectionAction.CREATEALIAS).call(clusterState, cmd, results);
+ checkResults("setting up alias " + originalCollection + " -> " + targetCollection, cmdResults, true);
+ reindexingState.put("alias", originalCollection + " -> " + targetCollection);
+ }
+
+ reindexingState.remove("daemonUrl");
+ reindexingState.remove("daemonName");
+ reindexingState.put("processedDocs", getNumberOfDocs(targetCollection));
+ reindexingState.put(PHASE, "copying done, finalizing");
+ setReindexingState(collection, State.RUNNING, reindexingState);
+
+ if (maybeAbort(collection)) {
+ aborted = true;
+ return;
+ }
+ // 6. delete the checkpoint collection
+ log.debug("- deleting " + chkCollection);
+ cmd = new ZkNodeProps(
+ Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.DELETE.toLower(),
+ CommonParams.NAME, chkCollection,
+ CoreAdminParams.DELETE_METRICS_HISTORY, "true"
+ );
+ cmdResults = new NamedList<>();
+ ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
+ checkResults("deleting checkpoint collection " + chkCollection, cmdResults, true);
+
+ // 7. optionally delete the source collection
+ if (removeSource) {
+ log.debug("- deleting source collection");
+ cmd = new ZkNodeProps(
+ Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.DELETE.toLower(),
+ CommonParams.NAME, collection,
+ CoreAdminParams.DELETE_METRICS_HISTORY, "true"
+ );
+ cmdResults = new NamedList<>();
+ ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
+ checkResults("deleting source collection " + collection, cmdResults, true);
+ } else {
+ // 8. clear readOnly on source
+ ZkNodeProps props = new ZkNodeProps(
+ Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
+ ZkStateReader.COLLECTION_PROP, collection,
+ ZkStateReader.READ_ONLY, null);
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+ }
+ // 9. set FINISHED state on the target and clear the state on the source
+ ZkNodeProps props = new ZkNodeProps(
+ Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
+ ZkStateReader.COLLECTION_PROP, targetCollection,
+ REINDEXING_STATE, State.FINISHED.toLower());
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+
+ reindexingState.put(STATE, State.FINISHED.toLower());
+ reindexingState.put(PHASE, "done");
+ removeReindexingState(collection);
+ } catch (Exception e) {
+ log.warn("Error during reindexing of " + originalCollection, e);
+ exc = e;
+ aborted = true;
+ } finally {
+ solrClientCache.close();
+ if (aborted) {
+ cleanup(collection, targetCollection, chkCollection, daemonUrl, targetCollection, createdTarget);
+ if (exc != null) {
+ results.add("error", exc.toString());
+ }
+ reindexingState.put(STATE, State.ABORTED.toLower());
+ }
+ results.add(REINDEX_STATUS, reindexingState);
+ }
+ }
+
+ private static final String REINDEXING_STATE_PATH = "/.reindexing";
+
+ private Map<String, Object> setReindexingState(String collection, State state, Map<String, Object> props) throws Exception {
+ String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + REINDEXING_STATE_PATH;
+ DistribStateManager stateManager = ocmh.cloudManager.getDistribStateManager();
+ Map<String, Object> copyProps = new HashMap<>();
+ if (props == null) { // retrieve existing props, if any
+ props = Utils.getJson(stateManager, path);
+ }
+ copyProps.putAll(props);
+ copyProps.put("state", state.toLower());
+ if (stateManager.hasData(path)) {
+ stateManager.setData(path, Utils.toJSON(copyProps), -1);
+ } else {
+ stateManager.makePath(path, Utils.toJSON(copyProps), CreateMode.PERSISTENT, false);
+ }
+ return copyProps;
+ }
+
+ private void removeReindexingState(String collection) throws Exception {
+ String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + REINDEXING_STATE_PATH;
+ DistribStateManager stateManager = ocmh.cloudManager.getDistribStateManager();
+ if (stateManager.hasData(path)) {
+ stateManager.removeData(path, -1);
+ }
+ }
+
+ @VisibleForTesting
+ public static Map<String, Object> getReindexingState(DistribStateManager stateManager, String collection) throws Exception {
+ String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + REINDEXING_STATE_PATH;
+ // make it modifiable
+ return new TreeMap<>(Utils.getJson(stateManager, path));
+ }
+
+ private long getNumberOfDocs(String collection) {
+ CloudSolrClient solrClient = solrClientCache.getCloudSolrClient(zkHost);
+ try {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.add(CommonParams.Q, "*:*");
+ params.add(CommonParams.ROWS, "0");
+ QueryResponse rsp = solrClient.query(collection, params);
+ return rsp.getResults().getNumFound();
+ } catch (Exception e) {
+ return 0L;
+ }
+ }
+
+ private void checkResults(String label, NamedList<Object> results, boolean failureIsFatal) throws Exception {
+ Object failure = results.get("failure");
+ if (failure == null) {
+ failure = results.get("error");
+ }
+ if (failure != null) {
+ String msg = "Error: " + label + ": " + Utils.toJSONString(results);
+ if (failureIsFatal) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, msg);
+ } else {
+ log.error(msg);
+ }
+ }
+ }
+
+ private boolean maybeAbort(String collection) throws Exception {
+ DocCollection coll = ocmh.cloudManager.getClusterStateProvider().getClusterState().getCollectionOrNull(collection);
+ if (coll == null) {
+ // collection no longer present - abort
+ log.info("## Aborting - collection {} no longer present.", collection);
+ return true;
+ }
+ Map<String, Object> reindexingState = getReindexingState(ocmh.cloudManager.getDistribStateManager(), collection);
+ State state = State.get(reindexingState.getOrDefault(STATE, State.RUNNING.toLower()));
+ if (state != State.ABORTED) {
+ return false;
+ }
+ log.info("## Aborting - collection {} state is {}", collection, state);
+ return true;
+ }
+
+ // XXX see #waitForDaemon() for why we need this
+ private String getDaemonUrl(SolrResponse rsp, DocCollection coll) {
+ Map<String, Object> rs = (Map<String, Object>)rsp.getResponse().get("result-set");
+ if (rs == null || rs.isEmpty()) {
+ log.debug(" -- Missing daemon information in response: " + Utils.toJSONString(rsp));
+ }
+ List<Object> list = (List<Object>)rs.get("docs");
+ if (list == null) {
+ log.debug(" -- Missing daemon information in response: " + Utils.toJSONString(rsp));
+ return null;
+ }
+ String replicaName = null;
+ for (Object o : list) {
+ Map<String, Object> map = (Map<String, Object>)o;
+ String op = (String)map.get("DaemonOp");
+ if (op == null) {
+ continue;
+ }
+ String[] parts = op.split("\\s+");
+ if (parts.length != 4) {
+ log.debug(" -- Invalid daemon location info, expected 4 tokens: " + op);
+ return null;
+ }
+ // check if it's plausible
+ if (parts[3].contains("shard") && parts[3].contains("replica")) {
+ replicaName = parts[3];
+ break;
+ } else {
+ log.debug(" -- daemon location info likely invalid: " + op);
+ return null;
+ }
+ }
+ if (replicaName == null) {
+ return null;
+ }
+ // build a baseUrl of the replica
+ for (Replica r : coll.getReplicas()) {
+ if (replicaName.equals(r.getCoreName())) {
+ return r.getBaseUrl() + "/" + r.getCoreName();
+ }
+ }
+ return null;
+ }
+
+ // XXX currently this is complicated to due a bug in the way the daemon 'list'
+ // XXX operation is implemented - see SOLR-13245. We need to query the actual
+ // XXX SolrCore where the daemon is running
+ private void waitForDaemon(String daemonName, String daemonUrl, String sourceCollection, String targetCollection, Map<String, Object> reindexingState) throws Exception {
+ HttpClient client = ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient();
+ try (HttpSolrClient solrClient = new HttpSolrClient.Builder()
+ .withHttpClient(client)
+ .withBaseSolrUrl(daemonUrl).build()) {
+ ModifiableSolrParams q = new ModifiableSolrParams();
+ q.set(CommonParams.QT, "/stream");
+ q.set("action", "list");
+ q.set(CommonParams.DISTRIB, false);
+ QueryRequest req = new QueryRequest(q);
+ boolean isRunning;
+ int statusCheck = 0;
+ do {
+ isRunning = false;
+ statusCheck++;
+ try {
+ NamedList<Object> rsp = solrClient.request(req);
+ Map<String, Object> rs = (Map<String, Object>)rsp.get("result-set");
+ if (rs == null || rs.isEmpty()) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can't find daemon list: missing result-set: " + Utils.toJSONString(rsp));
+ }
+ List<Object> list = (List<Object>)rs.get("docs");
+ if (list == null) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can't find daemon list: missing result-set: " + Utils.toJSONString(rsp));
+ }
+ if (list.isEmpty()) { // finished?
+ break;
+ }
+ for (Object o : list) {
+ Map<String, Object> map = (Map<String, Object>)o;
+ String id = (String)map.get("id");
+ if (daemonName.equals(id)) {
+ isRunning = true;
+ // fail here
+ TestInjection.injectReindexFailure();
+ break;
+ }
+ }
+ } catch (Exception e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception waiting for daemon " +
+ daemonName + " at " + daemonUrl, e);
+ }
+ if (statusCheck % 5 == 0) {
+ reindexingState.put("processedDocs", getNumberOfDocs(targetCollection));
+ setReindexingState(sourceCollection, State.RUNNING, reindexingState);
+ }
+ ocmh.cloudManager.getTimeSource().sleep(2000);
+ } while (isRunning && !maybeAbort(sourceCollection));
+ }
+ }
+
+ private void killDaemon(String daemonName, String daemonUrl) throws Exception {
+ log.debug("-- killing daemon " + daemonName + " at " + daemonUrl);
+ HttpClient client = ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient();
+ try (HttpSolrClient solrClient = new HttpSolrClient.Builder()
+ .withHttpClient(client)
+ .withBaseSolrUrl(daemonUrl).build()) {
+ ModifiableSolrParams q = new ModifiableSolrParams();
+ q.set(CommonParams.QT, "/stream");
+ // we should really use 'kill' here, but then we will never
+ // know when the daemon actually finishes running - 'kill' only
+ // sets a flag that may be noticed much later
+ q.set("action", "stop");
+ q.set(CommonParams.ID, daemonName);
+ q.set(CommonParams.DISTRIB, false);
+ QueryRequest req = new QueryRequest(q);
+ NamedList<Object> rsp = solrClient.request(req);
+ // /result-set/docs/[0]/DaemonOp : Deamon:id killed on coreName
+ log.debug(" -- stop daemon response: " + Utils.toJSONString(rsp));
+ Map<String, Object> rs = (Map<String, Object>) rsp.get("result-set");
+ if (rs == null || rs.isEmpty()) {
+ log.warn("Problem killing daemon " + daemonName + ": missing result-set: " + Utils.toJSONString(rsp));
+ return;
+ }
+ List<Object> list = (List<Object>) rs.get("docs");
+ if (list == null) {
+ log.warn("Problem killing daemon " + daemonName + ": missing result-set: " + Utils.toJSONString(rsp));
+ return;
+ }
+ if (list.isEmpty()) { // already finished?
+ return;
+ }
+ for (Object o : list) {
+ Map<String, Object> map = (Map<String, Object>) o;
+ String op = (String) map.get("DaemonOp");
+ if (op == null) {
+ continue;
+ }
+ if (op.contains(daemonName) && op.contains("stopped")) {
+ // now wait for the daemon to really stop
+ q.set("action", "list");
+ req = new QueryRequest(q);
+ TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS, ocmh.timeSource);
+ while (!timeOut.hasTimedOut()) {
+ rsp = solrClient.request(req);
+ rs = (Map<String, Object>) rsp.get("result-set");
+ if (rs == null || rs.isEmpty()) {
+ log.warn("Problem killing daemon " + daemonName + ": missing result-set: " + Utils.toJSONString(rsp));
+ break;
+ }
+ List<Object> list2 = (List<Object>) rs.get("docs");
+ if (list2 == null) {
+ log.warn("Problem killing daemon " + daemonName + ": missing result-set: " + Utils.toJSONString(rsp));
+ break;
+ }
+ if (list2.isEmpty()) { // already finished?
+ break;
+ }
+ Map<String, Object> status2 = null;
+ for (Object o2 : list2) {
+ Map<String, Object> map2 = (Map<String, Object>)o2;
+ if (daemonName.equals(map2.get("id"))) {
+ status2 = map2;
+ break;
+ }
+ }
+ if (status2 == null) { // finished?
+ break;
+ }
+ Number stopTime = (Number)status2.get("stopTime");
+ if (stopTime.longValue() > 0) {
+ break;
+ }
+ }
+ if (timeOut.hasTimedOut()) {
+ log.warn("Problem killing daemon " + daemonName + ": timed out waiting for daemon to stop.");
+ // proceed anyway
+ }
+ }
+ }
+ // now kill it - it's already stopped, this simply removes its status
+ q.set("action", "kill");
+ req = new QueryRequest(q);
+ solrClient.request(req);
+ }
+ }
+
+ private void cleanup(String collection, String targetCollection, String chkCollection,
+ String daemonUrl, String daemonName, boolean createdTarget) throws Exception {
+ log.info("## Cleaning up after abort or error");
+ // 1. kill the daemon
+ // 2. cleanup target / chk collections IFF the source collection still exists and is not empty
+ // 3. cleanup collection state
+
+ if (daemonUrl != null) {
+ killDaemon(daemonName, daemonUrl);
+ }
+ ClusterState clusterState = ocmh.cloudManager.getClusterStateProvider().getClusterState();
+ NamedList<Object> cmdResults = new NamedList<>();
+ if (createdTarget && !collection.equals(targetCollection) && clusterState.hasCollection(targetCollection)) {
+ log.debug(" -- removing " + targetCollection);
+ ZkNodeProps cmd = new ZkNodeProps(
+ Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.DELETE.toLower(),
+ CommonParams.NAME, targetCollection,
+ CoreAdminParams.DELETE_METRICS_HISTORY, "true"
+ );
+ ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
+ checkResults("CLEANUP: deleting target collection " + targetCollection, cmdResults, false);
+
+ }
+ // remove chk collection
+ if (clusterState.hasCollection(chkCollection)) {
+ log.debug(" -- removing " + chkCollection);
+ ZkNodeProps cmd = new ZkNodeProps(
+ Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.DELETE.toLower(),
+ CommonParams.NAME, chkCollection,
+ CoreAdminParams.DELETE_METRICS_HISTORY, "true"
+ );
+ cmdResults = new NamedList<>();
+ ocmh.commandMap.get(CollectionParams.CollectionAction.DELETE).call(clusterState, cmd, cmdResults);
+ checkResults("CLEANUP: deleting checkpoint collection " + chkCollection, cmdResults, false);
+ }
+ log.debug(" -- turning readOnly mode off for " + collection);
+ ZkNodeProps props = new ZkNodeProps(
+ Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
+ ZkStateReader.COLLECTION_PROP, collection,
+ ZkStateReader.READ_ONLY, null);
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+ removeReindexingState(collection);
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
index a447093..545e2b3 100644
--- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
@@ -216,8 +216,10 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
DaemonStream d = daemons.remove(id);
if (d != null) {
d.close();
+ rsp.add("result-set", new DaemonResponseStream("Deamon:" + id + " killed on " + coreName));
+ } else {
+ rsp.add("result-set", new DaemonResponseStream("Deamon:" + id + " not found on " + coreName));
}
- rsp.add("result-set", new DaemonResponseStream("Deamon:" + id + " killed on " + coreName));
}
}
}
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java b/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
index 7a4e090..b8e56a9 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
@@ -149,6 +149,9 @@ public class ColStatus {
sliceMap.add("leader", leaderMap);
leaderMap.add("coreNode", leader.getName());
leaderMap.addAll(leader.getProperties());
+ if (!leader.isActive(clusterState.getLiveNodes())) {
+ continue;
+ }
String url = ZkCoreNodeProps.getCoreUrl(leader);
try (SolrClient client = solrClientCache.getHttpSolrClient(url)) {
ModifiableSolrParams params = new ModifiableSolrParams();
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 70bcd1a..e933ac3 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -53,6 +53,7 @@ import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
import org.apache.solr.cloud.ZkController.NotInClusterStateException;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.ZkShardTerms;
+import org.apache.solr.cloud.api.collections.ReindexCollectionCmd;
import org.apache.solr.cloud.api.collections.RoutedAlias;
import org.apache.solr.cloud.overseer.SliceMutator;
import org.apache.solr.cloud.rule.ReplicaAssigner;
@@ -77,6 +78,7 @@ import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
+import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
@@ -540,6 +542,35 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
RELOAD_OP(RELOAD, (req, rsp, h) -> copy(req.getParams().required(), null, NAME)),
+ REINDEXCOLLECTION_OP(REINDEXCOLLECTION, (req, rsp, h) -> {
+ Map<String, Object> m = copy(req.getParams().required(), null, NAME);
+ copy(req.getParams(), m,
+ ReindexCollectionCmd.COMMAND,
+ ReindexCollectionCmd.REMOVE_SOURCE,
+ ReindexCollectionCmd.TARGET,
+ ZkStateReader.CONFIGNAME_PROP,
+ NUM_SLICES,
+ NRT_REPLICAS,
+ PULL_REPLICAS,
+ TLOG_REPLICAS,
+ REPLICATION_FACTOR,
+ MAX_SHARDS_PER_NODE,
+ POLICY,
+ CREATE_NODE_SET,
+ CREATE_NODE_SET_SHUFFLE,
+ AUTO_ADD_REPLICAS,
+ "shards",
+ STATE_FORMAT,
+ CommonParams.ROWS,
+ CommonParams.Q,
+ CommonParams.FL);
+ if (req.getParams().get("collection." + ZkStateReader.CONFIGNAME_PROP) != null) {
+ m.put(ZkStateReader.CONFIGNAME_PROP, req.getParams().get("collection." + ZkStateReader.CONFIGNAME_PROP));
+ }
+ copyPropertiesWithPrefix(req.getParams(), m, "router.");
+ return m;
+ }),
+
SYNCSHARD_OP(SYNCSHARD, (req, rsp, h) -> {
String collection = req.getParams().required().get("collection");
String shard = req.getParams().required().get("shard");
diff --git a/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchemaFactory.java b/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchemaFactory.java
index b9f9645..e433dc4 100644
--- a/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchemaFactory.java
+++ b/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchemaFactory.java
@@ -346,7 +346,13 @@ public class ManagedIndexSchemaFactory extends IndexSchemaFactory implements Sol
zkCmdExecutor.ensureExists(upgradedSchemaPath, zkController.getZkClient());
zkController.getZkClient().setData(upgradedSchemaPath, bytes, true);
// Then delete the non-managed schema znode
- zkController.getZkClient().delete(nonManagedSchemaPath, -1, true);
+ if (zkController.getZkClient().exists(nonManagedSchemaPath, true)) {
+ try {
+ zkController.getZkClient().delete(nonManagedSchemaPath, -1, true);
+ } catch (KeeperException.NoNodeException ex) {
+ // ignore - someone beat us to it
+ }
+ }
// Set the resource name to the managed schema so that the CoreAdminHandler returns a findable filename
schema.setResourceName(managedSchemaResourceName);
diff --git a/solr/core/src/java/org/apache/solr/util/TestInjection.java b/solr/core/src/java/org/apache/solr/util/TestInjection.java
index cf7681e..7a49ba4 100644
--- a/solr/core/src/java/org/apache/solr/util/TestInjection.java
+++ b/solr/core/src/java/org/apache/solr/util/TestInjection.java
@@ -126,6 +126,10 @@ public class TestInjection {
public volatile static CountDownLatch splitLatch = null;
+ public volatile static CountDownLatch reindexLatch = null;
+
+ public volatile static String reindexFailure = null;
+
public volatile static String failIndexFingerprintRequests = null;
public volatile static String wrongIndexFingerprint = null;
@@ -156,6 +160,8 @@ public class TestInjection {
splitFailureBeforeReplicaCreation = null;
splitFailureAfterReplicaCreation = null;
splitLatch = null;
+ reindexLatch = null;
+ reindexFailure = null;
prepRecoveryOpPauseForever = null;
countPrepRecoveryOpPauseForever = new AtomicInteger(0);
failIndexFingerprintRequests = null;
@@ -423,6 +429,35 @@ public class TestInjection {
return true;
}
+ public static boolean injectReindexFailure() {
+ if (reindexFailure != null) {
+ Random rand = random();
+ if (null == rand) return true;
+
+ Pair<Boolean,Integer> pair = parseValue(reindexFailure);
+ boolean enabled = pair.first();
+ int chanceIn100 = pair.second();
+ if (enabled && rand.nextInt(100) >= (100 - chanceIn100)) {
+ log.info("Test injection failure");
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Test injection failure");
+ }
+ }
+ return true;
+ }
+
+
+ public static boolean injectReindexLatch() {
+ if (reindexLatch != null) {
+ try {
+ log.info("Waiting in ReindexCollectionCmd for up to 60s");
+ return reindexLatch.await(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ return true;
+ }
+
private static Pair<Boolean,Integer> parseValue(final String raw) {
if (raw == null) return new Pair<>(false, 0);
Matcher m = ENABLED_PERCENT.matcher(raw);
diff --git a/solr/core/src/test/org/apache/solr/cloud/ReindexCollectionTest.java b/solr/core/src/test/org/apache/solr/cloud/ReindexCollectionTest.java
new file mode 100644
index 0000000..8413cf2
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/ReindexCollectionTest.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import org.apache.solr.client.solrj.cloud.DistribStateManager;
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.api.collections.ReindexCollectionCmd;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.util.LogLevel;
+import org.apache.solr.util.TestInjection;
+import org.apache.solr.util.TimeOut;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ *
+ */
+@LogLevel("org.apache.solr.cloud.api.collections.ReindexCollectionCmd=DEBUG")
+public class ReindexCollectionTest extends SolrCloudTestCase {
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(2)
+ // only *_s
+ .addConfig("conf1", configset("cloud-minimal"))
+ // every combination of field flags
+ .addConfig("conf2", configset("cloud-dynamic"))
+ // catch-all * field, indexed+stored
+ .addConfig("conf3", configset("cloud-minimal-inplace-updates"))
+ .configure();
+ }
+
+ private CloudSolrClient solrClient;
+ private SolrCloudManager cloudManager;
+ private DistribStateManager stateManager;
+
+ @Before
+ public void doBefore() throws Exception {
+ ZkController zkController = cluster.getJettySolrRunner(0).getCoreContainer().getZkController();
+ cloudManager = zkController.getSolrCloudManager();
+ stateManager = cloudManager.getDistribStateManager();
+ solrClient = new CloudSolrClientBuilder(Collections.singletonList(zkController.getZkServerAddress()),
+ Optional.empty()).build();
+ }
+
+ private ReindexCollectionCmd.State getState(String collection) {
+ try {
+ return ReindexCollectionCmd.State.get(ReindexCollectionCmd
+ .getReindexingState(stateManager, collection)
+ .get(ReindexCollectionCmd.STATE));
+ } catch (Exception e) {
+ fail("Unexpected exception checking state of " + collection + ": " + e);
+ return null;
+ }
+ }
+
+ private void waitForState(String collection, ReindexCollectionCmd.State expected) throws Exception {
+ TimeOut timeOut = new TimeOut(30, TimeUnit.SECONDS, cloudManager.getTimeSource());
+ ReindexCollectionCmd.State current = null;
+ while (!timeOut.hasTimedOut()) {
+ current = getState(collection);
+ if (expected == current) {
+ return;
+ }
+ timeOut.sleep(500);
+ }
+ throw new Exception("timeout waiting for state, current=" + current + ", expected=" + expected);
+ }
+
+ @After
+ public void doAfter() throws Exception {
+ cluster.deleteAllCollections(); // deletes aliases too
+
+ solrClient.close();
+
+ TestInjection.reset();
+ }
+
+ private static final int NUM_DOCS = 200; // at least two batches, default batchSize=100
+
+ @Test
+ public void testBasicReindexing() throws Exception {
+ final String sourceCollection = "basicReindexing";
+
+ createCollection(sourceCollection, "conf1", 2, 2);
+
+ indexDocs(sourceCollection, NUM_DOCS,
+ i -> new SolrInputDocument("id", String.valueOf(i), "string_s", String.valueOf(i)));
+
+ final String targetCollection = "basicReindexingTarget";
+
+ CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
+ .setTarget(targetCollection);
+ CollectionAdminResponse rsp = req.process(solrClient);
+ assertNotNull(rsp.toString(), rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS));
+ Map<String, Object> status = (Map<String, Object>)rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS);
+ assertEquals(status.toString(), (long)NUM_DOCS, ((Number)status.get("inputDocs")).longValue());
+ assertEquals(status.toString(), (long)NUM_DOCS, ((Number)status.get("processedDocs")).longValue());
+
+ CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", targetCollection, (liveNodes, coll) -> {
+ ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_STATE));
+ return ReindexCollectionCmd.State.FINISHED == state;
+ });
+ // verify the target docs exist
+ QueryResponse queryResponse = solrClient.query(targetCollection, params(CommonParams.Q, "*:*"));
+ assertEquals("copied num docs", NUM_DOCS, queryResponse.getResults().getNumFound());
+ }
+
+ public void testSameTargetReindexing() throws Exception {
+ final String sourceCollection = "sameTargetReindexing";
+ final String targetCollection = sourceCollection;
+
+ createCollection(sourceCollection, "conf1", 2, 2);
+ indexDocs(sourceCollection, NUM_DOCS,
+ i -> new SolrInputDocument("id", String.valueOf(i), "string_s", String.valueOf(i)));
+
+ CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
+ .setTarget(targetCollection);
+ req.process(solrClient);
+
+ String realTargetCollection = null;
+ TimeOut timeOut = new TimeOut(30, TimeUnit.SECONDS, cloudManager.getTimeSource());
+ String prefix = ReindexCollectionCmd.TARGET_COL_PREFIX + targetCollection;
+ while (!timeOut.hasTimedOut()) {
+ timeOut.sleep(500);
+ for (String name : cloudManager.getClusterStateProvider().getClusterState().getCollectionsMap().keySet()) {
+ if (name.startsWith(prefix)) {
+ realTargetCollection = name;
+ break;
+ }
+ }
+ if (realTargetCollection != null) {
+ break;
+ }
+ }
+ assertNotNull("target collection not present after 30s", realTargetCollection);
+
+ CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", realTargetCollection, (liveNodes, coll) -> {
+ ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_STATE));
+ return ReindexCollectionCmd.State.FINISHED == state;
+ });
+ // verify the target docs exist
+ QueryResponse rsp = solrClient.query(targetCollection, params(CommonParams.Q, "*:*"));
+ assertEquals("copied num docs", NUM_DOCS, rsp.getResults().getNumFound());
+ }
+
+ @Test
+ public void testLossySchema() throws Exception {
+ final String sourceCollection = "sourceLossyReindexing";
+ final String targetCollection = "targetLossyReindexing";
+
+
+ createCollection(sourceCollection, "conf2", 2, 2);
+
+ indexDocs(sourceCollection, NUM_DOCS, i ->
+ new SolrInputDocument(
+ "id", String.valueOf(i),
+ "string_s", String.valueOf(i),
+ "sind", "this is a test " + i)); // "sind": indexed=true, stored=false, will be lost...
+
+ CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
+ .setTarget(targetCollection)
+ .setConfigName("conf3");
+ req.process(solrClient);
+
+ CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", targetCollection, (liveNodes, coll) -> {
+ ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_STATE));
+ return ReindexCollectionCmd.State.FINISHED == state;
+ });
+ // verify the target docs exist
+ QueryResponse rsp = solrClient.query(targetCollection, params(CommonParams.Q, "*:*"));
+ assertEquals("copied num docs", NUM_DOCS, rsp.getResults().getNumFound());
+ for (SolrDocument doc : rsp.getResults()) {
+ String id = (String)doc.getFieldValue("id");
+ assertEquals(id, doc.getFieldValue("string_s"));
+ assertFalse(doc.containsKey("sind")); // lost in translation ...
+ }
+ }
+
+ @Test
+ public void testReshapeReindexing() throws Exception {
+ final String sourceCollection = "reshapeReindexing";
+ final String targetCollection = "reshapeReindexingTarget";
+ createCollection(sourceCollection, "conf1", 2, 2);
+ indexDocs(sourceCollection, NUM_DOCS,
+ i -> new SolrInputDocument(
+ "id", String.valueOf(i),
+ "string_s", String.valueOf(i),
+ "remove_s", String.valueOf(i)));
+
+ CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
+ .setTarget(targetCollection)
+ .setCollectionParam(ZkStateReader.NUM_SHARDS_PROP, 3)
+ .setCollectionParam(ZkStateReader.REPLICATION_FACTOR, 1)
+ .setCollectionParam("router.name", ImplicitDocRouter.NAME)
+ .setCollectionParam("shards", "foo,bar,baz")
+ .setCollectionParam("fl", "id,string_s")
+ .setCollectionParam("q", "id:10*");
+ req.process(solrClient);
+
+ CloudTestUtils.waitForState(cloudManager, "did not finish copying in time", targetCollection, (liveNodes, coll) -> {
+ ReindexCollectionCmd.State state = ReindexCollectionCmd.State.get(coll.getStr(ReindexCollectionCmd.REINDEXING_STATE));
+ return ReindexCollectionCmd.State.FINISHED == state;
+ });
+ // verify the target docs exist
+ QueryResponse rsp = solrClient.query(targetCollection, params(CommonParams.Q, "*:*"));
+ // 10 and 100-109
+ assertEquals("copied num docs", 11, rsp.getResults().getNumFound());
+ // verify the correct fields exist
+ for (SolrDocument doc : rsp.getResults()) {
+ assertNotNull(doc.getFieldValue("id"));
+ assertNotNull(doc.getFieldValue("string_s"));
+ assertNull(doc.getFieldValue("remove_s"));
+ }
+
+ // check the shape of the new collection
+ ClusterState clusterState = solrClient.getClusterStateProvider().getClusterState();
+ List<String> aliases = solrClient.getZkStateReader().getAliases().resolveAliases(targetCollection);
+ assertFalse(aliases.isEmpty());
+ String realTargetCollection = aliases.get(0);
+ DocCollection coll = clusterState.getCollection(realTargetCollection);
+ assertNotNull(coll);
+ assertEquals(3, coll.getSlices().size());
+ assertNotNull("foo", coll.getSlice("foo"));
+ assertNotNull("bar", coll.getSlice("bar"));
+ assertNotNull("baz", coll.getSlice("baz"));
+ assertEquals(Integer.valueOf(1), coll.getReplicationFactor());
+ assertEquals(ImplicitDocRouter.NAME, coll.getRouter().getName());
+ }
+
+ @Test
+ public void testFailure() throws Exception {
+ final String sourceCollection = "failReindexing";
+ final String targetCollection = "failReindexingTarget";
+ final String aliasTarget = "failAlias";
+ createCollection(sourceCollection, "conf1", 2, 2);
+ createCollection(targetCollection, "conf1", 1, 1);
+ CollectionAdminRequest.createAlias(aliasTarget, targetCollection).process(solrClient);
+ indexDocs(sourceCollection, NUM_DOCS,
+ i -> new SolrInputDocument(
+ "id", String.valueOf(i),
+ "string_s", String.valueOf(i)));
+
+ CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
+ .setTarget(targetCollection);
+ CollectionAdminResponse rsp = req.process(solrClient);
+ assertNotNull(rsp.getResponse().get("error"));
+ assertTrue(rsp.toString(), rsp.getResponse().get("error").toString().contains("already exists"));
+
+ req = CollectionAdminRequest.reindexCollection(sourceCollection)
+ .setTarget(aliasTarget);
+ rsp = req.process(solrClient);
+ assertNotNull(rsp.getResponse().get("error"));
+ assertTrue(rsp.toString(), rsp.getResponse().get("error").toString().contains("already exists"));
+
+ CollectionAdminRequest.deleteAlias(aliasTarget).process(solrClient);
+ CollectionAdminRequest.deleteCollection(targetCollection).process(solrClient);
+
+ req = CollectionAdminRequest.reindexCollection(sourceCollection)
+ .setTarget(targetCollection);
+
+ TestInjection.reindexFailure = "true:100";
+ rsp = req.process(solrClient);
+ assertNotNull(rsp.getResponse().get("error"));
+ assertTrue(rsp.toString(), rsp.getResponse().get("error").toString().contains("waiting for daemon"));
+
+ // verify that the target and checkpoint collections don't exist
+ cloudManager.getClusterStateProvider().getClusterState().forEachCollection(coll -> {
+ assertFalse(coll.getName() + " still exists", coll.getName().startsWith(ReindexCollectionCmd.TARGET_COL_PREFIX));
+ assertFalse(coll.getName() + " still exists", coll.getName().startsWith(ReindexCollectionCmd.CHK_COL_PREFIX));
+ });
+ // verify that the source collection is read-write and has no reindexing flags
+ CloudTestUtils.waitForState(cloudManager, "collection state is incorrect", sourceCollection,
+ ((liveNodes, collectionState) ->
+ !collectionState.isReadOnly() &&
+ collectionState.getStr(ReindexCollectionCmd.REINDEXING_STATE) == null &&
+ getState(sourceCollection) == null));
+ }
+
+ @Test
+ public void testAbort() throws Exception {
+ final String sourceCollection = "abortReindexing";
+ final String targetCollection = "abortReindexingTarget";
+ createCollection(sourceCollection, "conf1", 2, 1);
+
+ TestInjection.reindexLatch = new CountDownLatch(1);
+ CollectionAdminRequest.ReindexCollection req = CollectionAdminRequest.reindexCollection(sourceCollection)
+ .setTarget(targetCollection);
+ String asyncId = req.processAsync(solrClient);
+ // wait for the source collection to be put in readOnly mode
+ CloudTestUtils.waitForState(cloudManager, "source collection didn't become readOnly",
+ sourceCollection, (liveNodes, coll) -> coll.isReadOnly());
+
+ req = CollectionAdminRequest.reindexCollection(sourceCollection);
+ req.setCommand("abort");
+ CollectionAdminResponse rsp = req.process(solrClient);
+ Map<String, Object> status = (Map<String, Object>)rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS);
+ assertNotNull(rsp.toString(), status);
+ assertEquals(status.toString(), "aborting", status.get("state"));
+
+ CloudTestUtils.waitForState(cloudManager, "incorrect collection state", sourceCollection,
+ ((liveNodes, collectionState) ->
+ collectionState.isReadOnly() &&
+ getState(sourceCollection) == ReindexCollectionCmd.State.ABORTED));
+
+ // verify status
+ req.setCommand("status");
+ rsp = req.process(solrClient);
+ status = (Map<String, Object>)rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS);
+ assertNotNull(rsp.toString(), status);
+ assertEquals(status.toString(), "aborted", status.get("state"));
+ // let the process continue
+ TestInjection.reindexLatch.countDown();
+ CloudTestUtils.waitForState(cloudManager, "source collection is in wrong state",
+ sourceCollection, (liveNodes, docCollection) -> !docCollection.isReadOnly() && getState(sourceCollection) == null);
+ // verify the response
+ rsp = CollectionAdminRequest.requestStatus(asyncId).process(solrClient);
+ status = (Map<String, Object>)rsp.getResponse().get(ReindexCollectionCmd.REINDEX_STATUS);
+ assertNotNull(rsp.toString(), status);
+ assertEquals(status.toString(), "aborted", status.get("state"));
+ }
+
+ private void createCollection(String name, String config, int numShards, int numReplicas) throws Exception {
+ CollectionAdminRequest.createCollection(name, config, numShards, numReplicas)
+ .setMaxShardsPerNode(-1)
+ .process(solrClient);
+
+ cluster.waitForActiveCollection(name, numShards, numShards * numReplicas);
+ }
+
+ private void indexDocs(String collection, int numDocs, Function<Integer, SolrInputDocument> generator) throws Exception {
+ List<SolrInputDocument> docs = new ArrayList<>();
+ for (int i = 0; i < numDocs; i++) {
+ docs.add(generator.apply(i));
+ }
+ solrClient.add(collection, docs);
+ solrClient.commit(collection);
+ // verify the docs exist
+ QueryResponse rsp = solrClient.query(collection, params(CommonParams.Q, "*:*"));
+ assertEquals("num docs", NUM_DOCS, rsp.getResults().getNumFound());
+
+ }
+}
diff --git a/solr/core/src/test/org/apache/solr/cloud/SystemCollectionCompatTest.java b/solr/core/src/test/org/apache/solr/cloud/SystemCollectionCompatTest.java
new file mode 100644
index 0000000..d2e98cb
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/SystemCollectionCompatTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.schema.SchemaRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.client.solrj.response.schema.SchemaResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.params.CollectionAdminParams;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.RetryUtil;
+import org.apache.solr.logging.LogWatcher;
+import org.apache.solr.logging.LogWatcherConfig;
+import org.apache.solr.util.IdUtils;
+import org.apache.solr.util.TimeOut;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class SystemCollectionCompatTest extends SolrCloudTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ System.setProperty("managed.schema.mutable", "true");
+ configureCluster(2)
+ .addConfig("conf1", configset("cloud-managed"))
+ .configure();
+ if (! log.isWarnEnabled()) {
+ fail("Test requires that log-level is at-least WARN, but WARN is disabled");
+ }
+ }
+
+ private SolrCloudManager cloudManager;
+ private CloudSolrClient solrClient;
+
+ @Before
+ public void setupSystemCollection() throws Exception {
+ CollectionAdminRequest.createCollection(CollectionAdminParams.SYSTEM_COLL, null, 1, 2)
+ .process(cluster.getSolrClient());
+ cluster.waitForActiveCollection(CollectionAdminParams.SYSTEM_COLL, 1, 2);
+ ZkController zkController = cluster.getJettySolrRunner(0).getCoreContainer().getZkController();
+ cloudManager = zkController.getSolrCloudManager();
+ solrClient = new CloudSolrClientBuilder(Collections.singletonList(zkController.getZkServerAddress()),
+ Optional.empty()).build();
+ // send a dummy doc to the .system collection
+ SolrInputDocument doc = new SolrInputDocument(
+ "id", IdUtils.timeRandomId(),
+ CommonParams.TYPE, "dummy");
+ doc.addField("time_l", cloudManager.getTimeSource().getEpochTimeNs());
+ doc.addField("timestamp", new Date());
+ solrClient.add(CollectionAdminParams.SYSTEM_COLL, doc);
+ solrClient.commit(CollectionAdminParams.SYSTEM_COLL);
+
+ Replica leader
+ = solrClient.getZkStateReader().getLeaderRetry(CollectionAdminParams.SYSTEM_COLL, "shard1", DEFAULT_TIMEOUT);
+ final AtomicReference<Long> coreStartTime = new AtomicReference<>(getCoreStatus(leader).getCoreStartTime().getTime());
+ // trigger compat report by changing the schema
+ SchemaRequest req = new SchemaRequest();
+ SchemaResponse rsp = req.process(solrClient, CollectionAdminParams.SYSTEM_COLL);
+ Map<String, Object> field = getSchemaField("timestamp", rsp);
+ // make some obviously incompatible changes
+ field.put("type", "string");
+ field.put("docValues", false);
+ SchemaRequest.ReplaceField replaceFieldRequest = new SchemaRequest.ReplaceField(field);
+ SchemaResponse.UpdateResponse replaceFieldResponse = replaceFieldRequest.process(solrClient, CollectionAdminParams.SYSTEM_COLL);
+ assertEquals(replaceFieldResponse.toString(), 0, replaceFieldResponse.getStatus());
+ CollectionAdminRequest.Reload reloadRequest = CollectionAdminRequest.reloadCollection(CollectionAdminParams.SYSTEM_COLL);
+ CollectionAdminResponse response = reloadRequest.process(solrClient);
+ assertEquals(0, response.getStatus());
+ assertTrue(response.isSuccess());
+ // wait for the reload to complete
+ RetryUtil.retryUntil("Timed out waiting for core to reload", 30, 1000, TimeUnit.MILLISECONDS, () -> {
+ long restartTime = 0;
+ try {
+ restartTime = getCoreStatus(leader).getCoreStartTime().getTime();
+ } catch (Exception e) {
+ log.warn("Exception getting core start time: {}", e.getMessage());
+ return false;
+ }
+ return restartTime > coreStartTime.get();
+ });
+ cluster.waitForActiveCollection(CollectionAdminParams.SYSTEM_COLL, 1, 2);
+
+ }
+
+ @After
+ public void doAfter() throws Exception {
+ cluster.deleteAllCollections();
+
+ solrClient.close();
+ }
+
+ private Map<String, Object> getSchemaField(String name, SchemaResponse schemaResponse) {
+ List<Map<String, Object>> fields = schemaResponse.getSchemaRepresentation().getFields();
+ for (Map<String, Object> field : fields) {
+ if (name.equals(field.get("name"))) {
+ return field;
+ }
+ }
+ return null;
+ }
+
+ @Test
+ public void testBackCompat() throws Exception {
+ CollectionAdminRequest.OverseerStatus status = new CollectionAdminRequest.OverseerStatus();
+ CloudSolrClient solrClient = cluster.getSolrClient();
+ CollectionAdminResponse adminResponse = status.process(solrClient);
+ NamedList<Object> response = adminResponse.getResponse();
+ String leader = (String) response.get("leader");
+ JettySolrRunner overseerNode = null;
+ int index = -1;
+ List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
+ for (int i = 0; i < jettySolrRunners.size(); i++) {
+ JettySolrRunner runner = jettySolrRunners.get(i);
+ if (runner.getNodeName().equals(leader)) {
+ overseerNode = runner;
+ index = i;
+ break;
+ }
+ }
+ assertNotNull(overseerNode);
+ LogWatcherConfig watcherCfg = new LogWatcherConfig(true, null, "WARN", 100);
+ LogWatcher watcher = LogWatcher.newRegisteredLogWatcher(watcherCfg, null);
+
+ watcher.reset();
+
+ // restart Overseer to trigger the back-compat check
+ cluster.stopJettySolrRunner(index);
+ TimeOut timeOut = new TimeOut(30, TimeUnit.SECONDS, cloudManager.getTimeSource());
+ while (!timeOut.hasTimedOut()) {
+ adminResponse = status.process(solrClient);
+ response = adminResponse.getResponse();
+ String newLeader = (String) response.get("leader");
+ if (newLeader != null && !leader.equals(newLeader)) {
+ break;
+ }
+ timeOut.sleep(200);
+ }
+ if (timeOut.hasTimedOut()) {
+ fail("time out waiting for new Overseer leader");
+ }
+
+ TimeOut timeOut1 = new TimeOut(60, TimeUnit.SECONDS, cloudManager.getTimeSource());
+ boolean foundWarning = false;
+ boolean foundSchemaWarning = false;
+ while (!timeOut1.hasTimedOut()) {
+ timeOut1.sleep(1000);
+ SolrDocumentList history = watcher.getHistory(-1, null);
+ for (SolrDocument doc : history) {
+ if (!Overseer.class.getName().equals(doc.getFieldValue("logger"))) {
+ continue;
+ }
+ if (doc.getFieldValue("message").toString().contains("re-indexing")) {
+ foundWarning = true;
+ }
+ if (doc.getFieldValue("message").toString().contains("timestamp")) {
+ foundSchemaWarning = true;
+ }
+ }
+ if (foundWarning && foundSchemaWarning) {
+ break;
+ }
+ }
+ assertTrue("re-indexing warning not found", foundWarning);
+ assertTrue("timestamp field incompatibility warning not found", foundSchemaWarning);
+
+ }
+
+}
diff --git a/solr/solr-ref-guide/src/collections-api.adoc b/solr/solr-ref-guide/src/collections-api.adoc
index c5f68cc..07e112f 100644
--- a/solr/solr-ref-guide/src/collections-api.adoc
+++ b/solr/solr-ref-guide/src/collections-api.adoc
@@ -198,6 +198,7 @@ The attributes that can be modified are:
See the <<create,CREATE action>> section above for details on these attributes.
+[[readonlymode]]
==== Read-only mode
Setting the `readOnly` attribute to `true` puts the collection in read-only mode,
in which any index update requests are rejected. Other collection-level actions (eg. adding /
@@ -218,6 +219,125 @@ NOTE: This may potentially take a long time if there are still major segment mer
Removing the `readOnly` property or setting it to false enables the
processing of updates and reloads the collection.
+[[reindexcollection]]
+== REINDEXCOLLECTION: Re-index a Collection
+
+`/admin/collections?action=REINDEXCOLLECTION&name=_name_`
+
+The REINDEXCOLLECTION command re-indexes a collection using existing data from the
+source collection.
+
+NOTE: Re-indexing is potentially a lossy operation - some of the existing indexed data that is not
+available as stored fields may be lost, so users should use this command
+with caution, evaluating the potential impact by using different source and target
+collection names first, and preserving the source collection until the evaluation is
+complete.
+
+The target collection must not exist (and may not be an alias). If the target
+collection name is the same as the source collection then first a unique sequential name
+will be generated for the target collection, and then after re-indexing is done an alias
+will be created that points from the source name to the actual sequentially-named target collection.
+
+When re-indexing is started the source collection is put in <<readonlymode,read-only mode>> to ensure that
+all source documents are properly processed.
+
+Using optional parameters a different index schema, collection shape (number of shards and replicas)
+or routing parameters can be requested for the target collection.
+
+Re-indexing is executed as a streaming expression daemon, which runs on one of the
+source collection's replicas. It is usually a time-consuming operation so it's recommended to execute
+it as an asynchronous request in order to avoid request time outs. Only one re-indexing operation may
+execute concurrently for a given source collection. Long-running, erroneous or crashed re-indexing
+operations may be terminated by using the `abort` option, which also removes partial results.
+
+=== REINDEXCOLLECTION Parameters
+
+`name`::
+Source collection name, may be an alias. This parameter is required.
+
+`cmd`::
+Optional command. Default command is `start`. Currently supported commands are:
+* `start` - default, starts processing if not already running,
+* `abort` - aborts an already running re-indexing (or clears a left-over status after a crash),
+and deletes partial results,
+* `status` - returns detailed status of a running re-indexing command.
+
+`target`::
+Target collection name, optional. If not specified a unique name will be generated and
+after all documents have been copied an alias will be created that points from the source
+collection name to the unique sequentially-named collection, effectively "hiding"
+the original source collection from regular update and search operations.
+
+`q`::
+Optional query to select documents for re-indexing. Default value is `\*:*`.
+
+`fl`::
+Optional list of fields to re-index. Default value is `*`.
+
+`rows`::
+Documents are transferred in batches. Depending on the average size of the document large
+batch sizes may cause memory issues. Default value is 100.
+
+`configName`::
+`collection.configName`::
+Optional name of the configset for the target collection. Default is the same as the
+source collection.
+
+There's a number of optional parameters that determine the target collection layout. If they
+are not specified in the request then their values are copied from the source collection.
+The following parameters are currently supported (described in details in the <<create,CREATE collection>> section):
+`numShards`, `replicationFactor`, `nrtReplicas`, `tlogReplicas`, `pullReplicas`, `maxShardsPerNode`,
+`autoAddReplicas`, `shards`, `policy`, `createNodeSet`, `createNodeSet.shuffle`, `router.*`.
+
+`removeSource`::
+Optional boolean. If true then after the processing is successfully finished the source collection will
+be deleted.
+
+`async`::
+Optional request ID to track this action which will be <<Asynchronous Calls,processed asynchronously>>.
+
+When the re-indexing process has completed the target collection is marked using
+`property.rx: "finished"`, and the source collection state is updated to become read-write.
+On any errors the command will delete any temporary and target collections and also reset the
+state of the source collection's read-only flag.
+
+=== Examples using REINDEXCOLLECTION
+
+*Input*
+
+[source,text]
+----
+http://localhost:8983/solr/admin/collections?action=REINDEXCOLLECTION&name=newCollection&numShards=3&configName=conf2&q=id:aa*&fl=id,string_s
+----
+This request specifies a different schema for the target collection, copies only some of the fields, selects only the documents
+matching a query, and also potentially re-shapes the collection by explicitly specifying 3 shards. Since the target collection
+hasn't been specified in the parameters a collection with a unique name eg. `.rx_newCollection_2` will be created and on success
+an alias pointing from `newCollection` to `.rx_newCollection_2` will be created, effectively replacing the source collection
+for the purpose of indexing and searching. The source collection is assumed to be small so a synchronous request was made.
+
+*Output*
+
+[source,json]
+----
+{
+ "responseHeader":{
+ "status":0,
+ "QTime":10757},
+ "reindexStatus":{
+ "phase":"done",
+ "inputDocs":13416,
+ "processedDocs":376,
+ "actualSourceCollection":".rx_newCollection_1",
+ "state":"finished",
+ "actualTargetCollection":".rx_newCollection_2",
+ "checkpointCollection":".rx_ck_newCollection"
+ }
+}
+----
+As a result a new collection `.rx_newCollection_2` has been created, with selected documents re-indexed to 3 shards, and
+with an alias pointing from `newCollection` to this one. The status also shows that the source collection
+was already an alias to `.rx_newCollection_1`, which was likely a result of a previous re-indexing.
+
[[reload]]
== RELOAD: Reload a Collection
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
index 9d02ec2..ec56bfe 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.StreamComparator;
@@ -51,7 +52,7 @@ public class DaemonStream extends TupleStream implements Expressible {
private ArrayBlockingQueue<Tuple> queue;
private int queueSize;
private boolean eatTuples;
- private long iterations;
+ private AtomicLong iterations = new AtomicLong();
private long startTime;
private long stopTime;
private Exception exception;
@@ -240,7 +241,7 @@ public class DaemonStream extends TupleStream implements Expressible {
tuple.put(ID, id);
tuple.put("startTime", startTime);
tuple.put("stopTime", stopTime);
- tuple.put("iterations", iterations);
+ tuple.put("iterations", iterations.get());
tuple.put("state", streamRunner.getState().toString());
if(exception != null) {
tuple.put("exception", exception.getMessage());
@@ -253,10 +254,6 @@ public class DaemonStream extends TupleStream implements Expressible {
this.daemons = daemons;
}
- private synchronized void incrementIterations() {
- ++iterations;
- }
-
private synchronized void setStartTime(long startTime) {
this.startTime = startTime;
}
@@ -332,7 +329,7 @@ public class DaemonStream extends TupleStream implements Expressible {
log.error("Error in DaemonStream:" + id, e);
++errors;
if (errors > 100) {
- log.error("Too many consectutive errors. Stopping DaemonStream:" + id);
+ log.error("Too many consecutive errors. Stopping DaemonStream:" + id);
break OUTER;
}
} catch (Throwable t) {
@@ -351,7 +348,7 @@ public class DaemonStream extends TupleStream implements Expressible {
}
}
}
- incrementIterations();
+ iterations.incrementAndGet();
if (sleepMillis > 0) {
try {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index 1654689..ad1c6b7 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -784,6 +784,90 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
}
/**
+ * Returns a SolrRequest to reindex a collection
+ */
+ public static ReindexCollection reindexCollection(String collection) {
+ return new ReindexCollection(collection);
+ }
+
+ public static class ReindexCollection extends AsyncCollectionSpecificAdminRequest {
+ String target;
+ String query;
+ String fields;
+ String configName;
+ Boolean removeSource;
+ String cmd;
+ Integer batchSize;
+ Map<String, Object> collectionParams = new HashMap<>();
+
+ private ReindexCollection(String collection) {
+ super(CollectionAction.REINDEXCOLLECTION, collection);
+ }
+
+ /** Target collection name (null if the same). */
+ public ReindexCollection setTarget(String target) {
+ this.target = target;
+ return this;
+ }
+
+ /** Set optional command (eg. abort, status). */
+ public ReindexCollection setCommand(String command) {
+ this.cmd = command;
+ return this;
+ }
+
+ /** Query matching the documents to reindex (default is '*:*'). */
+ public ReindexCollection setQuery(String query) {
+ this.query = query;
+ return this;
+ }
+
+ /** Fields to reindex (the same syntax as {@link CommonParams#FL}), default is '*'. */
+ public ReindexCollection setFields(String fields) {
+ this.fields = fields;
+ return this;
+ }
+
+ /** Remove source collection after success. Default is false. */
+ public ReindexCollection setRemoveSource(boolean removeSource) {
+ this.removeSource = removeSource;
+ return this;
+ }
+
+ /** Copy documents in batches of this size. Default is 100. */
+ public ReindexCollection setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ return this;
+ }
+
+ /** Config name for the target collection. Default is the same as source. */
+ public ReindexCollection setConfigName(String configName) {
+ this.configName = configName;
+ return this;
+ }
+
+ /** Set other supported collection CREATE parameters. */
+ public ReindexCollection setCollectionParam(String key, Object value) {
+ this.collectionParams.put(key, value);
+ return this;
+ }
+
+ @Override
+ public SolrParams getParams() {
+ ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
+ params.setNonNull("target", target);
+ params.setNonNull("cmd", cmd);
+ params.setNonNull(ZkStateReader.CONFIGNAME_PROP, configName);
+ params.setNonNull(CommonParams.Q, query);
+ params.setNonNull(CommonParams.FL, fields);
+ params.setNonNull("removeSource", removeSource);
+ params.setNonNull(CommonParams.ROWS, batchSize);
+ collectionParams.forEach((k, v) -> params.setNonNull(k, v));
+ return params;
+ }
+ }
+
+ /**
* Return a SolrRequest for low-level detailed status of the collection.
*/
public static ColStatus collectionStatus(String collection) {
@@ -823,10 +907,10 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
@Override
public SolrParams getParams() {
ModifiableSolrParams params = (ModifiableSolrParams)super.getParams();
- params.setNonNull("segments", withSegments.toString());
- params.setNonNull("fieldInfo", withFieldInfo.toString());
- params.setNonNull("coreInfo", withCoreInfo.toString());
- params.setNonNull("sizeInfo", withSizeInfo.toString());
+ params.setNonNull("segments", withSegments);
+ params.setNonNull("fieldInfo", withFieldInfo);
+ params.setNonNull("coreInfo", withCoreInfo);
+ params.setNonNull("sizeInfo", withSizeInfo);
return params;
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java b/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java
index 30778b8..c4dad33 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java
@@ -100,6 +100,11 @@ public class CompositeIdRouter extends HashBasedRouter {
return targetSlices;
}
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
public List<Range> partitionRangeByKey(String key, Range range) {
List<Range> result = new ArrayList<>(3);
Range keyRange = keyHashRange(key);
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocRouter.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocRouter.java
index 111c74b..335c86d 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocRouter.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocRouter.java
@@ -223,6 +223,7 @@ public abstract class DocRouter {
public abstract boolean isTargetSlice(String id, SolrInputDocument sdoc, SolrParams params, String shardId, DocCollection collection);
+ public abstract String getName();
/** This method is consulted to determine what slices should be queried for a request when
* an explicit shards parameter was not used.
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ImplicitDocRouter.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ImplicitDocRouter.java
index 0b25fcb..7e51621 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ImplicitDocRouter.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ImplicitDocRouter.java
@@ -76,6 +76,11 @@ public class ImplicitDocRouter extends DocRouter {
}
@Override
+ public String getName() {
+ return NAME;
+ }
+
+ @Override
public Collection<Slice> getSearchSlicesSingle(String shardKey, SolrParams params, DocCollection collection) {
if (shardKey == null) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/PlainIdRouter.java b/solr/solrj/src/java/org/apache/solr/common/cloud/PlainIdRouter.java
index f1cea47..d63c5cd 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/PlainIdRouter.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/PlainIdRouter.java
@@ -19,4 +19,9 @@ package org.apache.solr.common.cloud;
public class PlainIdRouter extends HashBasedRouter {
public static final String NAME = "plain";
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
index de6b247..cfef82c 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
@@ -123,7 +123,9 @@ public interface CollectionParams {
NONE(false, LockLevel.NONE),
// TODO: not implemented yet
MERGESHARDS(true, LockLevel.SHARD),
- COLSTATUS(true, LockLevel.NONE)
+ COLSTATUS(true, LockLevel.NONE),
+ // this command implements its own locking
+ REINDEXCOLLECTION(true, LockLevel.NONE)
;
public final boolean isWrite;
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/Utils.java b/solr/solrj/src/java/org/apache/solr/common/util/Utils.java
index faaddc8..3d8c6f9 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/Utils.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/Utils.java
@@ -40,6 +40,7 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
@@ -572,7 +573,7 @@ public class Utils {
VersionedData data = null;
try {
data = distribStateManager.getData(path);
- } catch (KeeperException.NoNodeException e) {
+ } catch (KeeperException.NoNodeException | NoSuchElementException e) {
return Collections.emptyMap();
}
if (data == null || data.getData() == null || data.getData().length == 0) return Collections.emptyMap();