You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2018/08/01 14:31:08 UTC
[2/2] lucene-solr:master: SOLR-12509: Improve SplitShardCmd
performance and reliability.
SOLR-12509: Improve SplitShardCmd performance and reliability.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/1133bf98
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/1133bf98
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/1133bf98
Branch: refs/heads/master
Commit: 1133bf98a5fd075173efecfb75a51493fceb62b3
Parents: c6e0c28
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Wed Aug 1 14:39:37 2018 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Wed Aug 1 16:30:59 2018 +0200
----------------------------------------------------------------------
solr/CHANGES.txt | 4 +
.../cloud/api/collections/SplitShardCmd.java | 144 ++++--
.../solr/cloud/overseer/ReplicaMutator.java | 47 +-
.../solr/handler/admin/CollectionsHandler.java | 6 +-
.../org/apache/solr/handler/admin/SplitOp.java | 24 +-
.../solr/update/DirectUpdateHandler2.java | 5 +-
.../apache/solr/update/SolrIndexSplitter.java | 464 +++++++++++++++++--
.../apache/solr/update/SplitIndexCommand.java | 22 +-
.../solr/cloud/ChaosMonkeyShardSplitTest.java | 2 +-
.../cloud/api/collections/ShardSplitTest.java | 73 ++-
.../solr/update/SolrIndexSplitterTest.java | 112 ++++-
solr/solr-ref-guide/src/collections-api.adoc | 20 +-
.../solrj/request/CollectionAdminRequest.java | 11 +
.../solr/common/params/CommonAdminParams.java | 2 +
.../solr/common/params/CoreAdminParams.java | 2 +-
15 files changed, 773 insertions(+), 165 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 37dd5a7..49fc7fe 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -205,6 +205,10 @@ Optimizations
* SOLR-12305: When a replica is applying updates, some kind of updates can skip buffering for faster recovery.
(Cao Manh Dat)
+* SOLR-12509: Improve SplitShardCmd performance and reliability. A new method of splitting has been
+ introduced (splitMethod=link) which uses hard-linking of index files when possible, resulting in
+ significant speedups and reduced CPU / IO load on shard leader. (ab)
+
Other Changes
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
index b5408f8..00488a3 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -51,11 +52,14 @@ import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.rule.ImplicitSnitch;
import org.apache.solr.common.params.CommonAdminParams;
+import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.update.SolrIndexSplitter;
+import org.apache.solr.util.RTimerTree;
import org.apache.solr.util.TestInjection;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
@@ -87,13 +91,23 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
public boolean split(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
+ String methodStr = message.getStr(CommonAdminParams.SPLIT_METHOD, SolrIndexSplitter.SplitMethod.REWRITE.toLower());
+ SolrIndexSplitter.SplitMethod splitMethod = SolrIndexSplitter.SplitMethod.get(methodStr);
+ if (splitMethod == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown value '" + CommonAdminParams.SPLIT_METHOD +
+ ": " + methodStr);
+ }
+ boolean withTiming = message.getBool(CommonParams.TIMING, false);
+
String collectionName = message.getStr(CoreAdminParams.COLLECTION);
- log.info("Split shard invoked");
+ log.debug("Split shard invoked: {}", message);
ZkStateReader zkStateReader = ocmh.zkStateReader;
zkStateReader.forceUpdateCollection(collectionName);
AtomicReference<String> slice = new AtomicReference<>();
slice.set(message.getStr(ZkStateReader.SHARD_ID_PROP));
+ Set<String> offlineSlices = new HashSet<>();
+ RTimerTree timings = new RTimerTree();
String splitKey = message.getStr("split.key");
DocCollection collection = clusterState.getCollection(collectionName);
@@ -101,6 +115,9 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
PolicyHelper.SessionWrapper sessionWrapper = null;
Slice parentSlice = getParentSlice(clusterState, collectionName, slice, splitKey);
+ if (parentSlice.getState() != Slice.State.ACTIVE) {
+ throw new SolrException(SolrException.ErrorCode.INVALID_STATE, "Parent slice is not active: " + parentSlice.getState());
+ }
// find the leader for the shard
Replica parentShardLeader = null;
@@ -111,7 +128,9 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Interrupted.");
}
+ RTimerTree t = timings.sub("checkDiskSpace");
checkDiskSpace(collectionName, slice.get(), parentShardLeader);
+ t.stop();
// let's record the ephemeralOwner of the parent leader node
Stat leaderZnodeStat = zkStateReader.getZkClient().exists(ZkStateReader.LIVE_NODES_ZKNODE + "/" + parentShardLeader.getNodeName(), null, true);
@@ -142,20 +161,22 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
});
int repFactor = numNrt.get() + numTlog.get() + numPull.get();
- // type of the first subreplica will be the same as leader
- boolean firstNrtReplica = parentShardLeader.getType() == Replica.Type.NRT;
- // verify that we indeed have the right number of correct replica types
- if ((firstNrtReplica && numNrt.get() < 1) || (!firstNrtReplica && numTlog.get() < 1)) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "aborting split - inconsistent replica types in collection " + collectionName +
- ": nrt=" + numNrt.get() + ", tlog=" + numTlog.get() + ", pull=" + numPull.get() + ", shard leader type is " +
- parentShardLeader.getType());
- }
-
- List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2);
+ boolean success = false;
+ try {
+ // type of the first subreplica will be the same as leader
+ boolean firstNrtReplica = parentShardLeader.getType() == Replica.Type.NRT;
+ // verify that we indeed have the right number of correct replica types
+ if ((firstNrtReplica && numNrt.get() < 1) || (!firstNrtReplica && numTlog.get() < 1)) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "aborting split - inconsistent replica types in collection " + collectionName +
+ ": nrt=" + numNrt.get() + ", tlog=" + numTlog.get() + ", pull=" + numPull.get() + ", shard leader type is " +
+ parentShardLeader.getType());
+ }
- String rangesStr = fillRanges(ocmh.cloudManager, message, collection, parentSlice, subRanges, subSlices, subShardNames, firstNrtReplica);
+ List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2);
- try {
+ t = timings.sub("fillRanges");
+ String rangesStr = fillRanges(ocmh.cloudManager, message, collection, parentSlice, subRanges, subSlices, subShardNames, firstNrtReplica);
+ t.stop();
boolean oldShardsDeleted = false;
for (String subSlice : subSlices) {
@@ -196,12 +217,13 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
Map<String, String> requestMap = new HashMap<>();
String nodeName = parentShardLeader.getNodeName();
+ t = timings.sub("createSubSlicesAndLeadersInState");
for (int i = 0; i < subRanges.size(); i++) {
String subSlice = subSlices.get(i);
String subShardName = subShardNames.get(i);
DocRouter.Range subRange = subRanges.get(i);
- log.info("Creating slice " + subSlice + " of collection " + collectionName + " on " + nodeName);
+ log.debug("Creating slice " + subSlice + " of collection " + collectionName + " on " + nodeName);
Map<String, Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD.toLower());
@@ -210,7 +232,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
propMap.put(ZkStateReader.SHARD_RANGE_PROP, subRange.toString());
propMap.put(ZkStateReader.SHARD_STATE_PROP, Slice.State.CONSTRUCTION.toString());
propMap.put(ZkStateReader.SHARD_PARENT_PROP, parentSlice.getName());
- propMap.put("shard_parent_node", parentShardLeader.getNodeName());
+ propMap.put("shard_parent_node", nodeName);
propMap.put("shard_parent_zk_session", leaderZnodeStat.getEphemeralOwner());
DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
@@ -221,7 +243,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
// refresh cluster state
clusterState = zkStateReader.getClusterState();
- log.info("Adding replica " + subShardName + " as part of slice " + subSlice + " of collection " + collectionName
+ log.debug("Adding first replica " + subShardName + " as part of slice " + subSlice + " of collection " + collectionName
+ " on " + nodeName);
propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
@@ -248,9 +270,11 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard leaders", asyncId, requestMap);
+ t.stop();
+ t = timings.sub("waitForSubSliceLeadersAlive");
for (String subShardName : subShardNames) {
// wait for parent leader to acknowledge the sub-shard core
- log.info("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName);
+ log.debug("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName);
String coreNodeName = ocmh.waitForCoreNodeName(collectionName, nodeName, subShardName);
CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
cmd.setCoreName(subShardName);
@@ -266,8 +290,9 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
ocmh.processResponses(results, shardHandler, true, "SPLITSHARD timed out waiting for subshard leaders to come up",
asyncId, requestMap);
+ t.stop();
- log.info("Successfully created all sub-shards for collection " + collectionName + " parent shard: " + slice
+ log.debug("Successfully created all sub-shards for collection " + collectionName + " parent shard: " + slice
+ " on: " + parentShardLeader);
log.info("Splitting shard " + parentShardLeader.getName() + " as part of slice " + slice + " of collection "
@@ -275,6 +300,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.SPLIT.toString());
+ params.set(CommonAdminParams.SPLIT_METHOD, splitMethod.toLower());
params.set(CoreAdminParams.CORE, parentShardLeader.getStr("core"));
for (int i = 0; i < subShardNames.size(); i++) {
String subShardName = subShardNames.get(i);
@@ -282,18 +308,22 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
}
params.set(CoreAdminParams.RANGES, rangesStr);
+ t = timings.sub("splitParentCore");
+
ocmh.sendShardRequest(parentShardLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed to invoke SPLIT core admin command", asyncId,
requestMap);
+ t.stop();
- log.info("Index on shard: " + nodeName + " split into two successfully");
+ log.debug("Index on shard: " + nodeName + " split into two successfully");
+ t = timings.sub("applyBufferedUpdates");
// apply buffered updates on sub-shards
for (int i = 0; i < subShardNames.size(); i++) {
String subShardName = subShardNames.get(i);
- log.info("Applying buffered updates on : " + subShardName);
+ log.debug("Applying buffered updates on : " + subShardName);
params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTAPPLYUPDATES.toString());
@@ -304,8 +334,9 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed while asking sub shard leaders" +
" to apply buffered updates", asyncId, requestMap);
+ t.stop();
- log.info("Successfully applied buffered updates on : " + subShardNames);
+ log.debug("Successfully applied buffered updates on : " + subShardNames);
// Replica creation for the new Slices
// replica placement is controlled by the autoscaling policy framework
@@ -329,6 +360,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
numTlog.decrementAndGet();
}
+ t = timings.sub("identifyNodesForReplicas");
List<ReplicaPosition> replicaPositions = Assign.identifyNodes(ocmh.cloudManager,
clusterState,
new ArrayList<>(clusterState.getLiveNodes()),
@@ -336,13 +368,15 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
new ZkNodeProps(collection.getProperties()),
subSlices, numNrt.get(), numTlog.get(), numPull.get());
sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
+ t.stop();
+ t = timings.sub("createReplicaPlaceholders");
for (ReplicaPosition replicaPosition : replicaPositions) {
String sliceName = replicaPosition.shard;
String subShardNodeName = replicaPosition.node;
String solrCoreName = Assign.buildSolrCoreName(collectionName, sliceName, replicaPosition.type, replicaPosition.index);
- log.info("Creating replica shard " + solrCoreName + " as part of slice " + sliceName + " of collection "
+ log.debug("Creating replica shard " + solrCoreName + " as part of slice " + sliceName + " of collection "
+ collectionName + " on " + subShardNodeName);
// we first create all replicas in DOWN state without actually creating their cores in order to
@@ -384,7 +418,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
replicas.add(propMap);
}
-
+ t.stop();
assert TestInjection.injectSplitFailureBeforeReplicaCreation();
long ephemeralOwner = leaderZnodeStat.getEphemeralOwner();
@@ -414,12 +448,12 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
}
// we must set the slice state into recovery before actually creating the replica cores
- // this ensures that the logic inside Overseer to update sub-shard state to 'active'
+ // this ensures that the logic inside ReplicaMutator to update sub-shard state to 'active'
// always gets a chance to execute. See SOLR-7673
if (repFactor == 1) {
// switch sub shard states to 'active'
- log.info("Replication factor is 1 so switching shard states");
+ log.debug("Replication factor is 1 so switching shard states");
DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
Map<String, Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
@@ -431,7 +465,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
ZkNodeProps m = new ZkNodeProps(propMap);
inQueue.offer(Utils.toJSON(m));
} else {
- log.info("Requesting shard state be set to 'recovery'");
+ log.debug("Requesting shard state be set to 'recovery'");
DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
Map<String, Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
@@ -443,6 +477,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
inQueue.offer(Utils.toJSON(m));
}
+ t = timings.sub("createCoresForReplicas");
// now actually create replica cores on sub shard nodes
for (Map<String, Object> replica : replicas) {
ocmh.addReplica(clusterState, new ZkNodeProps(replica), results, null);
@@ -451,20 +486,28 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
assert TestInjection.injectSplitFailureAfterReplicaCreation();
ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard replicas", asyncId, requestMap);
+ t.stop();
log.info("Successfully created all replica shards for all sub-slices " + subSlices);
+ t = timings.sub("finalCommit");
ocmh.commit(results, slice.get(), parentShardLeader);
-
+ t.stop();
+ if (withTiming) {
+ results.add(CommonParams.TIMING, timings.asNamedList());
+ }
+ success = true;
return true;
} catch (SolrException e) {
- cleanupAfterFailure(zkStateReader, collectionName, parentSlice.getName(), subSlices);
throw e;
} catch (Exception e) {
log.error("Error executing split operation for collection: " + collectionName + " parent shard: " + slice, e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, null, e);
} finally {
if (sessionWrapper != null) sessionWrapper.release();
+ if (!success) {
+ cleanupAfterFailure(zkStateReader, collectionName, parentSlice.getName(), subSlices, offlineSlices);
+ }
}
}
@@ -505,13 +548,14 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
}
}
- private void cleanupAfterFailure(ZkStateReader zkStateReader, String collectionName, String parentShard, List<String> subSlices) {
- log.debug("- cleanup after failed split of " + collectionName + "/" + parentShard);
+ private void cleanupAfterFailure(ZkStateReader zkStateReader, String collectionName, String parentShard,
+ List<String> subSlices, Set<String> offlineSlices) {
+ log.info("Cleaning up after a failed split of " + collectionName + "/" + parentShard);
// get the latest state
try {
zkStateReader.forceUpdateCollection(collectionName);
} catch (KeeperException | InterruptedException e) {
- log.warn("Cleanup after failed split of " + collectionName + "/" + parentShard + ": (force update collection)", e);
+ log.warn("Cleanup failed after failed split of " + collectionName + "/" + parentShard + ": (force update collection)", e);
return;
}
ClusterState clusterState = zkStateReader.getClusterState();
@@ -524,7 +568,8 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
// set already created sub shards states to CONSTRUCTION - this prevents them
// from entering into RECOVERY or ACTIVE (SOLR-9455)
DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
- Map<String, Object> propMap = new HashMap<>();
+ final Map<String, Object> propMap = new HashMap<>();
+ boolean sendUpdateState = false;
propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
for (Slice s : coll.getSlices()) {
@@ -532,20 +577,29 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
continue;
}
propMap.put(s.getName(), Slice.State.CONSTRUCTION.toString());
+ sendUpdateState = true;
}
// if parent is inactive activate it again
Slice parentSlice = coll.getSlice(parentShard);
if (parentSlice.getState() == Slice.State.INACTIVE) {
+ sendUpdateState = true;
propMap.put(parentShard, Slice.State.ACTIVE.toString());
}
+ // plus any other previously deactivated slices
+ for (String sliceName : offlineSlices) {
+ propMap.put(sliceName, Slice.State.ACTIVE.toString());
+ sendUpdateState = true;
+ }
- try {
- ZkNodeProps m = new ZkNodeProps(propMap);
- inQueue.offer(Utils.toJSON(m));
- } catch (Exception e) {
- // don't give up yet - just log the error, we may still be able to clean up
- log.warn("Cleanup after failed split of " + collectionName + "/" + parentShard + ": (slice state changes)", e);
+ if (sendUpdateState) {
+ try {
+ ZkNodeProps m = new ZkNodeProps(propMap);
+ inQueue.offer(Utils.toJSON(m));
+ } catch (Exception e) {
+ // don't give up yet - just log the error, we may still be able to clean up
+ log.warn("Cleanup failed after failed split of " + collectionName + "/" + parentShard + ": (slice state changes)", e);
+ }
}
// delete existing subShards
@@ -554,16 +608,16 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
if (s == null) {
continue;
}
- log.info("Sub-shard: {} already exists therefore requesting its deletion", subSlice);
- propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, "deleteshard");
- propMap.put(COLLECTION_PROP, collectionName);
- propMap.put(SHARD_ID_PROP, subSlice);
- ZkNodeProps m = new ZkNodeProps(propMap);
+ log.debug("- sub-shard: {} exists therefore requesting its deletion", subSlice);
+ HashMap<String, Object> props = new HashMap<>();
+ props.put(Overseer.QUEUE_OPERATION, "deleteshard");
+ props.put(COLLECTION_PROP, collectionName);
+ props.put(SHARD_ID_PROP, subSlice);
+ ZkNodeProps m = new ZkNodeProps(props);
try {
ocmh.commandMap.get(DELETESHARD).call(clusterState, m, new NamedList());
} catch (Exception e) {
- log.warn("Cleanup after failed split of " + collectionName + "/" + parentShard + ": (deleting existing sub shard " + subSlice + ")", e);
+ log.warn("Cleanup failed after failed split of " + collectionName + "/" + parentShard + ": (deleting existing sub shard " + subSlice + ")", e);
}
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
index f897072..34843c1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
@@ -25,6 +25,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.cloud.DistribStateManager;
@@ -52,12 +53,12 @@ import static org.apache.solr.common.params.CommonParams.NAME;
public class ReplicaMutator {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- protected final SolrCloudManager dataProvider;
+ protected final SolrCloudManager cloudManager;
protected final DistribStateManager stateManager;
- public ReplicaMutator(SolrCloudManager dataProvider) {
- this.dataProvider = dataProvider;
- this.stateManager = dataProvider.getDistribStateManager();
+ public ReplicaMutator(SolrCloudManager cloudManager) {
+ this.cloudManager = cloudManager;
+ this.stateManager = cloudManager.getDistribStateManager();
}
protected Replica setProperty(Replica replica, String key, String value) {
@@ -96,11 +97,11 @@ public class ReplicaMutator {
}
public ZkWriteCommand addReplicaProperty(ClusterState clusterState, ZkNodeProps message) {
- if (checkKeyExistence(message, ZkStateReader.COLLECTION_PROP) == false ||
- checkKeyExistence(message, ZkStateReader.SHARD_ID_PROP) == false ||
- checkKeyExistence(message, ZkStateReader.REPLICA_PROP) == false ||
- checkKeyExistence(message, ZkStateReader.PROPERTY_PROP) == false ||
- checkKeyExistence(message, ZkStateReader.PROPERTY_VALUE_PROP) == false) {
+ if (!checkKeyExistence(message, ZkStateReader.COLLECTION_PROP) ||
+ !checkKeyExistence(message, ZkStateReader.SHARD_ID_PROP) ||
+ !checkKeyExistence(message, ZkStateReader.REPLICA_PROP) ||
+ !checkKeyExistence(message, ZkStateReader.PROPERTY_PROP) ||
+ !checkKeyExistence(message, ZkStateReader.PROPERTY_VALUE_PROP)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Overseer ADDREPLICAPROP requires " +
ZkStateReader.COLLECTION_PROP + " and " + ZkStateReader.SHARD_ID_PROP + " and " +
@@ -200,7 +201,7 @@ public class ReplicaMutator {
}
public ZkWriteCommand setState(ClusterState clusterState, ZkNodeProps message) {
- if (Overseer.isLegacy(dataProvider.getClusterStateProvider())) {
+ if (Overseer.isLegacy(cloudManager.getClusterStateProvider())) {
return updateState(clusterState, message);
} else {
return updateStateNew(clusterState, message);
@@ -224,7 +225,7 @@ public class ReplicaMutator {
ClusterStateMutator.getShardNames(numShards, shardNames);
Map<String, Object> createMsg = Utils.makeMap(NAME, cName);
createMsg.putAll(message.getProperties());
- writeCommand = new ClusterStateMutator(dataProvider).createCollection(prevState, new ZkNodeProps(createMsg));
+ writeCommand = new ClusterStateMutator(cloudManager).createCollection(prevState, new ZkNodeProps(createMsg));
DocCollection collection = writeCommand.collection;
newState = ClusterStateMutator.newState(prevState, cName, collection);
}
@@ -451,30 +452,34 @@ public class ReplicaMutator {
}
}
+ Map<String, Object> propMap = new HashMap<>();
+ propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
+ propMap.put(ZkStateReader.COLLECTION_PROP, collection.getName());
if (isLeaderSame) {
log.info("Sub-shard leader node is still the same one at {} with ZK session id {}. Preparing to switch shard states.", shardParentNode, shardParentZkSession);
- Map<String, Object> propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, "updateshardstate");
propMap.put(parentSliceName, Slice.State.INACTIVE.toString());
propMap.put(sliceName, Slice.State.ACTIVE.toString());
+ long now = cloudManager.getTimeSource().getEpochTimeNs();
for (Slice subShardSlice : subShardSlices) {
propMap.put(subShardSlice.getName(), Slice.State.ACTIVE.toString());
+ String lastTimeStr = subShardSlice.getStr(ZkStateReader.STATE_TIMESTAMP_PROP);
+ if (lastTimeStr != null) {
+ long start = Long.parseLong(lastTimeStr);
+ log.info("TIMINGS: Sub-shard " + subShardSlice.getName() + " recovered in " +
+ TimeUnit.MILLISECONDS.convert(now - start, TimeUnit.NANOSECONDS) + " ms");
+ } else {
+ log.info("TIMINGS Sub-shard " + subShardSlice.getName() + " not available: " + subShardSlice);
+ }
}
- propMap.put(ZkStateReader.COLLECTION_PROP, collection.getName());
- ZkNodeProps m = new ZkNodeProps(propMap);
- return new SliceMutator(dataProvider).updateShardState(prevState, m).collection;
} else {
// we must mark the shard split as failed by switching sub-shards to recovery_failed state
- Map<String, Object> propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, "updateshardstate");
propMap.put(sliceName, Slice.State.RECOVERY_FAILED.toString());
for (Slice subShardSlice : subShardSlices) {
propMap.put(subShardSlice.getName(), Slice.State.RECOVERY_FAILED.toString());
}
- propMap.put(ZkStateReader.COLLECTION_PROP, collection.getName());
- ZkNodeProps m = new ZkNodeProps(propMap);
- return new SliceMutator(dataProvider).updateShardState(prevState, m).collection;
}
+ ZkNodeProps m = new ZkNodeProps(propMap);
+ return new SliceMutator(cloudManager).updateShardState(prevState, m).collection;
}
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 8d7cdbf..3a46b2b 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -144,8 +144,10 @@ import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTIO
import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonAdminParams.IN_PLACE_MOVE;
+import static org.apache.solr.common.params.CommonAdminParams.SPLIT_METHOD;
import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.params.CommonParams.TIMING;
import static org.apache.solr.common.params.CommonParams.VALUE_LONG;
import static org.apache.solr.common.params.CoreAdminParams.DATA_DIR;
import static org.apache.solr.common.params.CoreAdminParams.DELETE_DATA_DIR;
@@ -662,7 +664,9 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
SHARD_ID_PROP,
"split.key",
CoreAdminParams.RANGES,
- WAIT_FOR_FINAL_STATE);
+ WAIT_FOR_FINAL_STATE,
+ TIMING,
+ SPLIT_METHOD);
return copyPropertiesWithPrefix(req.getParams(), map, COLL_PROP_PREFIX);
}),
DELETESHARD_OP(DELETESHARD, (req, rsp, h) -> {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java b/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java
index 9dda6d4..31382c3 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java
@@ -30,11 +30,13 @@ import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.params.CommonAdminParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.update.SolrIndexSplitter;
import org.apache.solr.update.SplitIndexCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,9 +80,14 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
}
log.info("Invoked split action for core: " + cname);
- SolrCore core = it.handler.coreContainer.getCore(cname);
- SolrQueryRequest req = new LocalSolrQueryRequest(core, params);
+ String methodStr = params.get(CommonAdminParams.SPLIT_METHOD, SolrIndexSplitter.SplitMethod.REWRITE.toLower());
+ SolrIndexSplitter.SplitMethod splitMethod = SolrIndexSplitter.SplitMethod.get(methodStr);
+ if (splitMethod == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unsupported value of '" + CommonAdminParams.SPLIT_METHOD + "': " + methodStr);
+ }
+ SolrCore parentCore = it.handler.coreContainer.getCore(cname);
List<SolrCore> newCores = null;
+ SolrQueryRequest req = null;
try {
// TODO: allow use of rangesStr in the future
@@ -91,9 +98,9 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
String routeFieldName = null;
if (it.handler.coreContainer.isZooKeeperAware()) {
ClusterState clusterState = it.handler.coreContainer.getZkController().getClusterState();
- String collectionName = req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName();
+ String collectionName = parentCore.getCoreDescriptor().getCloudDescriptor().getCollectionName();
DocCollection collection = clusterState.getCollection(collectionName);
- String sliceName = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId();
+ String sliceName = parentCore.getCoreDescriptor().getCloudDescriptor().getShardId();
Slice slice = collection.getSlice(sliceName);
router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
if (ranges == null) {
@@ -101,7 +108,7 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
ranges = currentRange != null ? router.partitionRange(partitions, currentRange) : null;
}
Object routerObj = collection.get(DOC_ROUTER); // for back-compat with Solr 4.4
- if (routerObj != null && routerObj instanceof Map) {
+ if (routerObj instanceof Map) {
Map routerProps = (Map) routerObj;
routeFieldName = (String) routerProps.get("field");
}
@@ -131,9 +138,10 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
paths = Arrays.asList(pathsArr);
}
+ req = new LocalSolrQueryRequest(parentCore, params);
- SplitIndexCommand cmd = new SplitIndexCommand(req, paths, newCores, ranges, router, routeFieldName, splitKey);
- core.getUpdateHandler().split(cmd);
+ SplitIndexCommand cmd = new SplitIndexCommand(req, it.rsp, paths, newCores, ranges, router, routeFieldName, splitKey, splitMethod);
+ parentCore.getUpdateHandler().split(cmd);
if (it.handler.coreContainer.isZooKeeperAware()) {
for (SolrCore newcore : newCores) {
@@ -150,7 +158,7 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
throw e;
} finally {
if (req != null) req.close();
- if (core != null) core.close();
+ if (parentCore != null) parentCore.close();
if (newCores != null) {
for (SolrCore newCore : newCores) {
newCore.close();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
index bcc97eb..e64ee8a 100644
--- a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
+++ b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
@@ -47,6 +47,7 @@ import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.SolrConfig.UpdateHandlerInfo;
import org.apache.solr.core.SolrCore;
import org.apache.solr.metrics.SolrMetricManager;
@@ -902,8 +903,10 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
commit(new CommitUpdateCommand(cmd.req, false));
SolrIndexSplitter splitter = new SolrIndexSplitter(cmd);
splitCommands.mark();
+ NamedList<Object> results = new NamedList<>();
try {
- splitter.split();
+ splitter.split(results);
+ cmd.rsp.addResponse(results);
} catch (IOException e) {
numErrors.increment();
numErrorsCumulative.mark();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java b/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
index aadbe74..75234fa 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
@@ -18,31 +18,59 @@ package org.apache.solr.update;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Date;
import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
import org.apache.lucene.index.CodecReader;
import org.apache.lucene.index.FilterCodecReader;
+import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.PostingsEnum;
import org.apache.lucene.index.SlowCodecReaderWrapper;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
+import org.apache.lucene.search.ConstantScoreScorer;
+import org.apache.lucene.search.ConstantScoreWeight;
import org.apache.lucene.search.DocIdSetIterator;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreMode;
+import org.apache.lucene.search.Scorer;
+import org.apache.lucene.search.Weight;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.HardlinkCopyDirectoryWrapper;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.util.BitSetIterator;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRefBuilder;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.IOUtils;
+import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.CompositeIdRouter;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.HashBasedRouter;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.IndexFetcher;
+import org.apache.solr.handler.SnapShooter;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.BitsFilteredPostingsEnum;
import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.util.RTimerTree;
import org.apache.solr.util.RefCounted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,26 +78,47 @@ import org.slf4j.LoggerFactory;
public class SolrIndexSplitter {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final String INDEX_PREFIX = "index.";
+
+ public enum SplitMethod {
+ REWRITE,
+ LINK;
+
+ public static SplitMethod get(String p) {
+ if (p != null) {
+ try {
+ return SplitMethod.valueOf(p.toUpperCase(Locale.ROOT));
+ } catch (Exception ex) {
+ return null;
+ }
+ }
+ return null;
+ }
+
+ public String toLower() {
+ return toString().toLowerCase(Locale.ROOT);
+ }
+ }
+
SolrIndexSearcher searcher;
SchemaField field;
List<DocRouter.Range> ranges;
DocRouter.Range[] rangesArr; // same as ranges list, but an array for extra speed in inner loops
List<String> paths;
List<SolrCore> cores;
- DocRouter router;
HashBasedRouter hashRouter;
int numPieces;
- int currPartition = 0;
String routeFieldName;
String splitKey;
+ SplitMethod splitMethod;
+ RTimerTree timings = new RTimerTree();
public SolrIndexSplitter(SplitIndexCommand cmd) {
searcher = cmd.getReq().getSearcher();
ranges = cmd.ranges;
paths = cmd.paths;
cores = cmd.cores;
- router = cmd.router;
- hashRouter = router instanceof HashBasedRouter ? (HashBasedRouter)router : null;
+ hashRouter = cmd.router instanceof HashBasedRouter ? (HashBasedRouter)cmd.router : null;
if (ranges == null) {
numPieces = paths != null ? paths.size() : cores.size();
@@ -86,83 +135,413 @@ public class SolrIndexSplitter {
if (cmd.splitKey != null) {
splitKey = getRouteKey(cmd.splitKey);
}
+ if (cores == null) {
+ this.splitMethod = SplitMethod.REWRITE;
+ } else {
+ this.splitMethod = cmd.splitMethod;
+ }
}
- public void split() throws IOException {
+ public void split(NamedList<Object> results) throws IOException {
+ SolrCore parentCore = searcher.getCore();
+ Directory parentDirectory = searcher.getRawReader().directory();
+ Lock parentDirectoryLock = null;
+ UpdateLog ulog = parentCore.getUpdateHandler().getUpdateLog();
+ if (ulog == null && splitMethod == SplitMethod.LINK) {
+ log.warn("No updateLog in parent core, switching to use potentially slower 'splitMethod=rewrite'");
+ splitMethod = SplitMethod.REWRITE;
+ }
+ if (splitMethod == SplitMethod.LINK) {
+ RTimerTree t = timings.sub("closeParentIW");
+ try {
+ // start buffering updates
+ ulog.bufferUpdates();
+ parentCore.getSolrCoreState().closeIndexWriter(parentCore, false);
+ // make sure we can lock the directory for our exclusive use
+ parentDirectoryLock = parentDirectory.obtainLock(IndexWriter.WRITE_LOCK_NAME);
+ log.info("Splitting in 'link' mode: closed parent IndexWriter...");
+ t.stop();
+ } catch (Exception e) {
+ if (parentDirectoryLock != null) {
+ IOUtils.closeWhileHandlingException(parentDirectoryLock);
+ }
+ try {
+ parentCore.getSolrCoreState().openIndexWriter(parentCore);
+ ulog.applyBufferedUpdates();
+ } catch (Exception e1) {
+ log.error("Error reopening IndexWriter after failed close", e1);
+ log.error("Original error closing IndexWriter:", e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error reopening IndexWriter after failed close", e1);
+ }
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error closing current IndexWriter, aborting offline split...", e);
+ }
+ }
+ boolean success = false;
+ try {
+ RTimerTree t = timings.sub("doSplit");
+ doSplit();
+ t.stop();
+ success = true;
+ } catch (Exception e) {
+ results.add("failed", e.toString());
+ throw e;
+ } finally {
+ if (splitMethod == SplitMethod.LINK) {
+ IOUtils.closeWhileHandlingException(parentDirectoryLock);
+ RTimerTree t = timings.sub("reopenParentIW");
+ parentCore.getSolrCoreState().openIndexWriter(parentCore);
+ t.stop();
+ t = timings.sub("parentApplyBufferedUpdates");
+ ulog.applyBufferedUpdates();
+ t.stop();
+ log.info("Splitting in 'offline' mode " + (success? "finished" : "FAILED") +
+ ": re-opened parent IndexWriter.");
+ }
+ }
+ results.add(CommonParams.TIMING, timings.asNamedList());
+ }
+
+ public void doSplit() throws IOException {
List<LeafReaderContext> leaves = searcher.getRawReader().leaves();
+ Directory parentDirectory = searcher.getRawReader().directory();
List<FixedBitSet[]> segmentDocSets = new ArrayList<>(leaves.size());
-
- log.info("SolrIndexSplitter: partitions=" + numPieces + " segments="+leaves.size());
-
- for (LeafReaderContext readerContext : leaves) {
- assert readerContext.ordInParent == segmentDocSets.size(); // make sure we're going in order
- FixedBitSet[] docSets = split(readerContext);
- segmentDocSets.add( docSets );
+ SolrIndexConfig parentConfig = searcher.getCore().getSolrConfig().indexConfig;
+ String timestamp = new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(new Date());
+
+ log.info("SolrIndexSplitter: partitions=" + numPieces + " segments=" + leaves.size());
+ RTimerTree t;
+
+ if (splitMethod != SplitMethod.LINK) {
+ t = timings.sub("findDocSetsPerLeaf");
+ for (LeafReaderContext readerContext : leaves) {
+ assert readerContext.ordInParent == segmentDocSets.size(); // make sure we're going in order
+ FixedBitSet[] docSets = split(readerContext, numPieces, field, rangesArr, splitKey, hashRouter, false);
+ segmentDocSets.add(docSets);
+ }
+ t.stop();
}
+ Map<IndexReader.CacheKey, FixedBitSet[]> docsToDeleteCache = new ConcurrentHashMap<>();
+
// would it be more efficient to write segment-at-a-time to each new index?
// - need to worry about number of open descriptors
// - need to worry about if IW.addIndexes does a sync or not...
// - would be more efficient on the read side, but prob less efficient merging
-
for (int partitionNumber=0; partitionNumber<numPieces; partitionNumber++) {
- log.info("SolrIndexSplitter: partition #" + partitionNumber + " partitionCount=" + numPieces + (ranges != null ? " range=" + ranges.get(partitionNumber) : ""));
+ String partitionName = "SolrIndexSplitter:partition=" + partitionNumber + ",partitionCount=" + numPieces + (ranges != null ? ",range=" + ranges.get(partitionNumber) : "");
+ log.info(partitionName);
boolean success = false;
RefCounted<IndexWriter> iwRef = null;
- IndexWriter iw = null;
- if (cores != null) {
+ IndexWriter iw;
+ if (cores != null && splitMethod != SplitMethod.LINK) {
SolrCore subCore = cores.get(partitionNumber);
iwRef = subCore.getUpdateHandler().getSolrCoreState().getIndexWriter(subCore);
iw = iwRef.get();
} else {
- SolrCore core = searcher.getCore();
- String path = paths.get(partitionNumber);
- iw = SolrIndexWriter.create(core, "SplittingIndexWriter"+partitionNumber + (ranges != null ? " " + ranges.get(partitionNumber) : ""), path,
- core.getDirectoryFactory(), true, core.getLatestSchema(),
- core.getSolrConfig().indexConfig, core.getDeletionPolicy(), core.getCodec());
+ if (splitMethod == SplitMethod.LINK) {
+ SolrCore subCore = cores.get(partitionNumber);
+ String path = subCore.getDataDir() + INDEX_PREFIX + timestamp;
+ t = timings.sub("hardLinkCopy");
+ t.resume();
+ // copy by hard-linking
+ Directory splitDir = subCore.getDirectoryFactory().get(path, DirectoryFactory.DirContext.DEFAULT, subCore.getSolrConfig().indexConfig.lockType);
+ // the wrapper doesn't hold any resources itself so it doesn't need closing
+ HardlinkCopyDirectoryWrapper hardLinkedDir = new HardlinkCopyDirectoryWrapper(splitDir);
+ boolean copiedOk = false;
+ try {
+ for (String file : parentDirectory.listAll()) {
+ // we've closed the IndexWriter, so ignore write.lock
+ // its file may be present even when IndexWriter is closed but
+ // we've already checked that the lock is not held by anyone else
+ if (file.equals(IndexWriter.WRITE_LOCK_NAME)) {
+ continue;
+ }
+ hardLinkedDir.copyFrom(parentDirectory, file, file, IOContext.DEFAULT);
+ }
+ copiedOk = true;
+ } finally {
+ if (!copiedOk) {
+ subCore.getDirectoryFactory().doneWithDirectory(splitDir);
+ subCore.getDirectoryFactory().remove(splitDir);
+ }
+ }
+ t.pause();
+ IndexWriterConfig iwConfig = parentConfig.toIndexWriterConfig(subCore);
+ // don't run merges at this time
+ iwConfig.setMergePolicy(NoMergePolicy.INSTANCE);
+ t = timings.sub("createSubIW");
+ t.resume();
+ iw = new SolrIndexWriter(partitionName, splitDir, iwConfig);
+ t.pause();
+ } else {
+ SolrCore core = searcher.getCore();
+ String path = paths.get(partitionNumber);
+ t = timings.sub("createSubIW");
+ t.resume();
+ iw = SolrIndexWriter.create(core, partitionName, path,
+ core.getDirectoryFactory(), true, core.getLatestSchema(),
+ core.getSolrConfig().indexConfig, core.getDeletionPolicy(), core.getCodec());
+ t.pause();
+ }
}
try {
- // This removes deletions but optimize might still be needed because sub-shards will have the same number of segments as the parent shard.
- for (int segmentNumber = 0; segmentNumber<leaves.size(); segmentNumber++) {
- log.info("SolrIndexSplitter: partition #" + partitionNumber + " partitionCount=" + numPieces + (ranges != null ? " range=" + ranges.get(partitionNumber) : "") + " segment #"+segmentNumber + " segmentCount=" + leaves.size());
- CodecReader subReader = SlowCodecReaderWrapper.wrap(leaves.get(segmentNumber).reader());
- iw.addIndexes(new LiveDocsReader(subReader, segmentDocSets.get(segmentNumber)[partitionNumber]));
+ if (splitMethod == SplitMethod.LINK) {
+ t = timings.sub("deleteDocuments");
+ t.resume();
+ // apply deletions specific to this partition. As a side-effect on the first call this also populates
+ // a cache of docsets to delete per leaf reader per partition, which is reused for subsequent partitions.
+ iw.deleteDocuments(new SplittingQuery(partitionNumber, field, rangesArr, hashRouter, splitKey, docsToDeleteCache));
+ t.pause();
+ } else {
+ // This removes deletions but optimize might still be needed because sub-shards will have the same number of segments as the parent shard.
+ t = timings.sub("addIndexes");
+ t.resume();
+ for (int segmentNumber = 0; segmentNumber<leaves.size(); segmentNumber++) {
+ log.info("SolrIndexSplitter: partition #" + partitionNumber + " partitionCount=" + numPieces + (ranges != null ? " range=" + ranges.get(partitionNumber) : "") + " segment #"+segmentNumber + " segmentCount=" + leaves.size());
+ CodecReader subReader = SlowCodecReaderWrapper.wrap(leaves.get(segmentNumber).reader());
+ iw.addIndexes(new LiveDocsReader(subReader, segmentDocSets.get(segmentNumber)[partitionNumber]));
+ }
+ t.pause();
}
// we commit explicitly instead of sending a CommitUpdateCommand through the processor chain
// because the sub-shard cores will just ignore such a commit because the update log is not
// in active state at this time.
//TODO no commitUpdateCommand
SolrIndexWriter.setCommitData(iw, -1);
+ t = timings.sub("subIWCommit");
+ t.resume();
iw.commit();
+ t.pause();
success = true;
} finally {
if (iwRef != null) {
iwRef.decref();
} else {
if (success) {
+ t = timings.sub("subIWClose");
+ t.resume();
iw.close();
+ t.pause();
} else {
IOUtils.closeWhileHandlingException(iw);
}
+ if (splitMethod == SplitMethod.LINK) {
+ SolrCore subCore = cores.get(partitionNumber);
+ subCore.getDirectoryFactory().release(iw.getDirectory());
+ }
+ }
+ }
+ }
+ // all sub-indexes created ok
+ // when using hard-linking switch directories & refresh cores
+ if (splitMethod == SplitMethod.LINK && cores != null) {
+ boolean switchOk = true;
+ t = timings.sub("switchSubIndexes");
+ for (int partitionNumber = 0; partitionNumber < numPieces; partitionNumber++) {
+ SolrCore subCore = cores.get(partitionNumber);
+ String indexDirPath = subCore.getIndexDir();
+
+ log.debug("Switching directories");
+ String hardLinkPath = subCore.getDataDir() + INDEX_PREFIX + timestamp;
+ subCore.modifyIndexProps(INDEX_PREFIX + timestamp);
+ try {
+ subCore.getUpdateHandler().newIndexWriter(false);
+ openNewSearcher(subCore);
+ } catch (Exception e) {
+ log.error("Failed to switch sub-core " + indexDirPath + " to " + hardLinkPath + ", split will fail.", e);
+ switchOk = false;
+ break;
}
}
+ t.stop();
+ if (!switchOk) {
+ t = timings.sub("rollbackSubIndexes");
+ // rollback the switch
+ for (int partitionNumber = 0; partitionNumber < numPieces; partitionNumber++) {
+ SolrCore subCore = cores.get(partitionNumber);
+ Directory dir = null;
+ try {
+ dir = subCore.getDirectoryFactory().get(subCore.getDataDir(), DirectoryFactory.DirContext.META_DATA,
+ subCore.getSolrConfig().indexConfig.lockType);
+ dir.deleteFile(IndexFetcher.INDEX_PROPERTIES);
+ } finally {
+ if (dir != null) {
+ subCore.getDirectoryFactory().release(dir);
+ }
+ }
+ // switch back if necessary and remove the hardlinked dir
+ String hardLinkPath = subCore.getDataDir() + INDEX_PREFIX + timestamp;
+ try {
+ dir = subCore.getDirectoryFactory().get(hardLinkPath, DirectoryFactory.DirContext.DEFAULT,
+ subCore.getSolrConfig().indexConfig.lockType);
+ subCore.getDirectoryFactory().doneWithDirectory(dir);
+ subCore.getDirectoryFactory().remove(dir);
+ } finally {
+ if (dir != null) {
+ subCore.getDirectoryFactory().release(dir);
+ }
+ }
+ subCore.getUpdateHandler().newIndexWriter(false);
+ try {
+ openNewSearcher(subCore);
+ } catch (Exception e) {
+ log.warn("Error rolling back failed split of " + hardLinkPath, e);
+ }
+ }
+ t.stop();
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "There were errors during index split");
+ } else {
+ // complete the switch - remove original index
+ t = timings.sub("cleanSubIndex");
+ for (int partitionNumber = 0; partitionNumber < numPieces; partitionNumber++) {
+ SolrCore subCore = cores.get(partitionNumber);
+ String oldIndexPath = subCore.getDataDir() + "index";
+ Directory indexDir = null;
+ try {
+ indexDir = subCore.getDirectoryFactory().get(oldIndexPath,
+ DirectoryFactory.DirContext.DEFAULT, subCore.getSolrConfig().indexConfig.lockType);
+ subCore.getDirectoryFactory().doneWithDirectory(indexDir);
+ subCore.getDirectoryFactory().remove(indexDir);
+ } finally {
+ if (indexDir != null) {
+ subCore.getDirectoryFactory().release(indexDir);
+ }
+ }
+ }
+ t.stop();
+ }
+ }
+ }
+
+ private void openNewSearcher(SolrCore core) throws Exception {
+ Future[] waitSearcher = new Future[1];
+ core.getSearcher(true, false, waitSearcher, true);
+ if (waitSearcher[0] != null) {
+ waitSearcher[0].get();
+ }
+ }
+
+ private class SplittingQuery extends Query {
+ private final int partition;
+ private final SchemaField field;
+ private final DocRouter.Range[] rangesArr;
+ private final HashBasedRouter hashRouter;
+ private final String splitKey;
+ private final Map<IndexReader.CacheKey, FixedBitSet[]> docsToDelete;
+
+ SplittingQuery(int partition, SchemaField field, DocRouter.Range[] rangesArr, HashBasedRouter hashRouter, String splitKey,
+ Map<IndexReader.CacheKey, FixedBitSet[]> docsToDelete) {
+ this.partition = partition;
+ this.field = field;
+ this.rangesArr = rangesArr;
+ this.hashRouter = hashRouter;
+ this.splitKey = splitKey;
+ this.docsToDelete = docsToDelete;
+ }
+
+ @Override
+ public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) throws IOException {
+ return new ConstantScoreWeight(this, boost) {
+
+ @Override
+ public Scorer scorer(LeafReaderContext context) throws IOException {
+ RTimerTree t = timings.sub("findDocsToDelete");
+ t.resume();
+ FixedBitSet set = findDocsToDelete(context);
+ t.pause();
+ log.info("### partition=" + partition + ", leaf=" + context + ", maxDoc=" + context.reader().maxDoc() +
+ ", numDels=" + context.reader().numDeletedDocs() + ", setLen=" + set.length() + ", setCard=" + set.cardinality());
+ Bits liveDocs = context.reader().getLiveDocs();
+ if (liveDocs != null) {
+ // check that we don't delete already deleted docs
+ FixedBitSet dels = FixedBitSet.copyOf(liveDocs);
+ dels.flip(0, dels.length());
+ dels.and(set);
+ if (dels.cardinality() > 0) {
+ log.error("### INVALID DELS " + dels.cardinality());
+ }
+ }
+ return new ConstantScoreScorer(this, score(), new BitSetIterator(set, set.length()));
+ }
+ @Override
+ public boolean isCacheable(LeafReaderContext ctx) {
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "weight(shardSplittingQuery,part" + partition + ")";
+ }
+ };
}
+ private FixedBitSet findDocsToDelete(LeafReaderContext readerContext) throws IOException {
+ // check whether a cached copy of bitsets already exists for this reader
+ FixedBitSet[] perPartition = docsToDelete.get(readerContext.reader().getCoreCacheHelper().getKey());
+ if (perPartition != null) {
+ return perPartition[partition];
+ }
+ synchronized (docsToDelete) {
+ perPartition = docsToDelete.get(readerContext.reader().getCoreCacheHelper().getKey());
+ if (perPartition != null) {
+ return perPartition[partition];
+ }
+
+ perPartition = split(readerContext, numPieces, field, rangesArr, splitKey, hashRouter, true);
+ docsToDelete.put(readerContext.reader().getCoreCacheHelper().getKey(), perPartition);
+ return perPartition[partition];
+ }
+ }
+
+ @Override
+ public String toString(String field) {
+ return "shardSplittingQuery";
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof SplittingQuery)) {
+ return false;
+ }
+ SplittingQuery q = (SplittingQuery)obj;
+ return partition == q.partition;
+ }
+
+ @Override
+ public int hashCode() {
+ return partition;
+ }
}
- FixedBitSet[] split(LeafReaderContext readerContext) throws IOException {
+ static FixedBitSet[] split(LeafReaderContext readerContext, int numPieces, SchemaField field, DocRouter.Range[] rangesArr,
+ String splitKey, HashBasedRouter hashRouter, boolean delete) throws IOException {
LeafReader reader = readerContext.reader();
FixedBitSet[] docSets = new FixedBitSet[numPieces];
for (int i=0; i<docSets.length; i++) {
docSets[i] = new FixedBitSet(reader.maxDoc());
+ if (delete) {
+ docSets[i].set(0, reader.maxDoc());
+ }
}
Bits liveDocs = reader.getLiveDocs();
+ if (liveDocs != null && delete) {
+ FixedBitSet liveDocsSet = FixedBitSet.copyOf(liveDocs);
+ for (FixedBitSet set : docSets) {
+ set.and(liveDocsSet);
+ }
+ }
Terms terms = reader.terms(field.getName());
TermsEnum termsEnum = terms==null ? null : terms.iterator();
@@ -172,11 +551,12 @@ public class SolrIndexSplitter {
PostingsEnum postingsEnum = null;
int[] docsMatchingRanges = null;
- if (ranges != null) {
+ if (rangesArr != null) {
// +1 because documents can belong to *zero*, one, several or all ranges in rangesArr
docsMatchingRanges = new int[rangesArr.length+1];
}
+ int partition = 0;
CharsRefBuilder idRef = new CharsRefBuilder();
for (;;) {
term = termsEnum.next();
@@ -209,14 +589,22 @@ public class SolrIndexSplitter {
for (;;) {
int doc = postingsEnum.nextDoc();
if (doc == DocIdSetIterator.NO_MORE_DOCS) break;
- if (ranges == null) {
- docSets[currPartition].set(doc);
- currPartition = (currPartition + 1) % numPieces;
+ if (rangesArr == null) {
+ if (delete) {
+ docSets[partition].clear(doc);
+ } else {
+ docSets[partition].set(doc);
+ }
+ partition = (partition + 1) % numPieces;
} else {
int matchingRangesCount = 0;
for (int i=0; i<rangesArr.length; i++) { // inner-loop: use array here for extra speed.
if (rangesArr[i].includes(hash)) {
- docSets[i].set(doc);
+ if (delete) {
+ docSets[i].clear(doc);
+ } else {
+ docSets[i].set(doc);
+ }
++matchingRangesCount;
}
}
@@ -256,12 +644,10 @@ public class SolrIndexSplitter {
if (idx <= 0) return null;
String part1 = idString.substring(0, idx);
int commaIdx = part1.indexOf(CompositeIdRouter.bitsSeparator);
- if (commaIdx > 0) {
- if (commaIdx + 1 < part1.length()) {
- char ch = part1.charAt(commaIdx + 1);
- if (ch >= '0' && ch <= '9') {
- part1 = part1.substring(0, commaIdx);
- }
+ if (commaIdx > 0 && commaIdx + 1 < part1.length()) {
+ char ch = part1.charAt(commaIdx + 1);
+ if (ch >= '0' && ch <= '9') {
+ part1 = part1.substring(0, commaIdx);
}
}
return part1;
@@ -273,7 +659,7 @@ public class SolrIndexSplitter {
final FixedBitSet liveDocs;
final int numDocs;
- public LiveDocsReader(CodecReader in, FixedBitSet liveDocs) throws IOException {
+ public LiveDocsReader(CodecReader in, FixedBitSet liveDocs) {
super(in);
this.liveDocs = liveDocs;
this.numDocs = liveDocs.cardinality();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java b/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java
index eaa1e59..7ea8a2a 100644
--- a/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java
+++ b/solr/core/src/java/org/apache/solr/update/SplitIndexCommand.java
@@ -19,6 +19,7 @@ package org.apache.solr.update;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
import java.util.List;
@@ -29,22 +30,26 @@ import java.util.List;
*
*/
public class SplitIndexCommand extends UpdateCommand {
- // public List<Directory> dirs;
- public List<String> paths;
- public List<SolrCore> cores; // either paths or cores should be specified
- public List<DocRouter.Range> ranges;
- public DocRouter router;
- public String routeFieldName;
- public String splitKey;
+ public final SolrQueryResponse rsp;
+ public final List<String> paths;
+ public final List<SolrCore> cores; // either paths or cores should be specified
+ public final List<DocRouter.Range> ranges;
+ public final DocRouter router;
+ public final String routeFieldName;
+ public final String splitKey;
+ public final SolrIndexSplitter.SplitMethod splitMethod;
- public SplitIndexCommand(SolrQueryRequest req, List<String> paths, List<SolrCore> cores, List<DocRouter.Range> ranges, DocRouter router, String routeFieldName, String splitKey) {
+ public SplitIndexCommand(SolrQueryRequest req, SolrQueryResponse rsp, List<String> paths, List<SolrCore> cores, List<DocRouter.Range> ranges,
+ DocRouter router, String routeFieldName, String splitKey, SolrIndexSplitter.SplitMethod splitMethod) {
super(req);
+ this.rsp = rsp;
this.paths = paths;
this.cores = cores;
this.ranges = ranges;
this.router = router;
this.routeFieldName = routeFieldName;
this.splitKey = splitKey;
+ this.splitMethod = splitMethod;
}
@Override
@@ -65,6 +70,7 @@ public class SplitIndexCommand extends UpdateCommand {
if (splitKey != null) {
sb.append(",split.key=" + splitKey);
}
+ sb.append(",method=" + splitMethod.toLower());
sb.append('}');
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
index 22862b4..dd5f9f3 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
@@ -134,7 +134,7 @@ public class ChaosMonkeyShardSplitTest extends ShardSplitTest {
killerThread.start();
killCounter.incrementAndGet();
- splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, null, null);
+ splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, null, null, false);
log.info("Layout after split: \n");
printLayout();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1133bf98/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
index 92fd4d5..f6ee7b4 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
@@ -62,7 +62,10 @@ import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
+import org.apache.solr.update.SolrIndexSplitter;
+import org.apache.solr.util.LogLevel;
import org.apache.solr.util.TestInjection;
import org.junit.Test;
import org.slf4j.Logger;
@@ -72,6 +75,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
+@LogLevel("org.apache.solr.cloud.api.collections=DEBUG")
@Slow
public class ShardSplitTest extends BasicDistributedZkTest {
@@ -116,14 +120,24 @@ public class ShardSplitTest extends BasicDistributedZkTest {
Creates a collection with replicationFactor=1, splits a shard. Restarts the sub-shard leader node.
Add a replica. Ensure count matches in leader and replica.
*/
+ @Test
public void testSplitStaticIndexReplication() throws Exception {
+ doSplitStaticIndexReplication(SolrIndexSplitter.SplitMethod.REWRITE);
+ }
+
+ @Test
+ public void testSplitStaticIndexReplicationLink() throws Exception {
+ doSplitStaticIndexReplication(SolrIndexSplitter.SplitMethod.LINK);
+ }
+
+ private void doSplitStaticIndexReplication(SolrIndexSplitter.SplitMethod splitMethod) throws Exception {
waitForThingsToLevelOut(15);
DocCollection defCol = cloudClient.getZkStateReader().getClusterState().getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
Replica replica = defCol.getReplicas().get(0);
String nodeName = replica.getNodeName();
- String collectionName = "testSplitStaticIndexReplication";
+ String collectionName = "testSplitStaticIndexReplication_" + splitMethod.toLower();
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 1);
create.setMaxShardsPerNode(5); // some high number so we can create replicas without hindrance
create.setCreateNodeSet(nodeName); // we want to create the leader on a fixed node so that we know which one to restart later
@@ -141,6 +155,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(collectionName);
splitShard.setShardName(SHARD1);
+ splitShard.setSplitMethod(splitMethod.toLower());
String asyncId = splitShard.processAsync(client);
RequestStatusState state = CollectionAdminRequest.requestStatus(asyncId).waitFor(client, 120);
if (state == RequestStatusState.COMPLETED) {
@@ -351,16 +366,31 @@ public class ShardSplitTest extends BasicDistributedZkTest {
@Test
public void testSplitMixedReplicaTypes() throws Exception {
+ doSplitMixedReplicaTypes(SolrIndexSplitter.SplitMethod.REWRITE);
+ }
+
+ @Test
+ public void testSplitMixedReplicaTypesLink() throws Exception {
+ doSplitMixedReplicaTypes(SolrIndexSplitter.SplitMethod.LINK);
+ }
+
+ private void doSplitMixedReplicaTypes(SolrIndexSplitter.SplitMethod splitMethod) throws Exception {
waitForThingsToLevelOut(15);
- String collectionName = "testSplitMixedReplicaTypes";
+ String collectionName = "testSplitMixedReplicaTypes_" + splitMethod.toLower();
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 2, 2, 2);
create.setMaxShardsPerNode(5); // some high number so we can create replicas without hindrance
create.process(cloudClient);
waitForRecoveriesToFinish(collectionName, false);
+ for (int i = 0; i < 100; i++) {
+ cloudClient.add(collectionName, getDoc("id", "id-" + i, "foo_s", "bar " + i));
+ }
+ cloudClient.commit(collectionName);
+
CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(collectionName);
splitShard.setShardName(SHARD1);
- splitShard.process(cloudClient);
+ splitShard.setSplitMethod(splitMethod.toLower());
+ CollectionAdminResponse rsp = splitShard.process(cloudClient);
waitForThingsToLevelOut(15);
cloudClient.getZkStateReader().forceUpdateCollection(collectionName);
@@ -393,7 +423,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
assertEquals("actual PULL", numPull, actualPull.get());
}
- @Test
+ @Test
@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028")
public void testSplitWithChaosMonkey() throws Exception {
waitForThingsToLevelOut(15);
@@ -600,6 +630,15 @@ public class ShardSplitTest extends BasicDistributedZkTest {
@Test
public void testSplitShardWithRule() throws Exception {
+ doSplitShardWithRule(SolrIndexSplitter.SplitMethod.LINK);
+ }
+
+ @Test
+ public void testSplitShardWithRuleLink() throws Exception {
+ doSplitShardWithRule(SolrIndexSplitter.SplitMethod.LINK);
+ }
+
+ private void doSplitShardWithRule(SolrIndexSplitter.SplitMethod splitMethod) throws Exception {
waitForThingsToLevelOut(15);
if (usually()) {
@@ -609,14 +648,14 @@ public class ShardSplitTest extends BasicDistributedZkTest {
}
log.info("Starting testSplitShardWithRule");
- String collectionName = "shardSplitWithRule";
+ String collectionName = "shardSplitWithRule_" + splitMethod.toLower();
CollectionAdminRequest.Create createRequest = CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 2)
.setRule("shard:*,replica:<2,node:*");
CollectionAdminResponse response = createRequest.process(cloudClient);
assertEquals(0, response.getStatus());
CollectionAdminRequest.SplitShard splitShardRequest = CollectionAdminRequest.splitShard(collectionName)
- .setShardName("shard1");
+ .setShardName("shard1").setSplitMethod(splitMethod.toLower());
response = splitShardRequest.process(cloudClient);
assertEquals(String.valueOf(response.getErrorMessages()), 0, response.getStatus());
}
@@ -633,7 +672,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
// test with only one range
subRanges.add(ranges.get(0));
try {
- splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges, null);
+ splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges, null, false);
fail("Shard splitting with just one custom hash range should not succeed");
} catch (HttpSolrClient.RemoteSolrException e) {
log.info("Expected exception:", e);
@@ -644,7 +683,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
subRanges.add(ranges.get(3)); // order shouldn't matter
subRanges.add(ranges.get(0));
try {
- splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges, null);
+ splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges, null, false);
fail("Shard splitting with missing hashes in between given ranges should not succeed");
} catch (HttpSolrClient.RemoteSolrException e) {
log.info("Expected exception:", e);
@@ -657,7 +696,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
subRanges.add(ranges.get(2));
subRanges.add(new DocRouter.Range(ranges.get(3).min - 15, ranges.get(3).max));
try {
- splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges, null);
+ splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges, null, false);
fail("Shard splitting with overlapping ranges should not succeed");
} catch (HttpSolrClient.RemoteSolrException e) {
log.info("Expected exception:", e);
@@ -683,6 +722,9 @@ public class ShardSplitTest extends BasicDistributedZkTest {
final int[] docCounts = new int[ranges.size()];
int numReplicas = shard1.getReplicas().size();
+ cloudClient.getZkStateReader().forceUpdateCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
+ clusterState = cloudClient.getZkStateReader().getClusterState();
+ log.debug("-- COLLECTION: {}", clusterState.getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION));
del("*:*");
for (int id = 0; id <= 100; id++) {
String shardKey = "" + (char)('a' + (id % 26)); // See comment in ShardRoutingTest for hash distribution
@@ -725,7 +767,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
try {
for (int i = 0; i < 3; i++) {
try {
- splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges, null);
+ splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges, null, false);
log.info("Layout after split: \n");
printLayout();
break;
@@ -807,7 +849,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
for (int i = 0; i < 3; i++) {
try {
- splitShard(collectionName, SHARD1, null, null);
+ splitShard(collectionName, SHARD1, null, null, false);
break;
} catch (HttpSolrClient.RemoteSolrException e) {
if (e.code() != 500) {
@@ -889,7 +931,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
for (int i = 0; i < 3; i++) {
try {
- splitShard(collectionName, null, null, splitKey);
+ splitShard(collectionName, null, null, splitKey, false);
break;
} catch (HttpSolrClient.RemoteSolrException e) {
if (e.code() != 500) {
@@ -992,9 +1034,11 @@ public class ShardSplitTest extends BasicDistributedZkTest {
}
}
- protected void splitShard(String collection, String shardId, List<DocRouter.Range> subRanges, String splitKey) throws SolrServerException, IOException {
+ protected void splitShard(String collection, String shardId, List<DocRouter.Range> subRanges, String splitKey, boolean offline) throws SolrServerException, IOException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.SPLITSHARD.toString());
+ params.set("timing", "true");
+ params.set("offline", String.valueOf(offline));
params.set("collection", collection);
if (shardId != null) {
params.set("shard", shardId);
@@ -1019,7 +1063,8 @@ public class ShardSplitTest extends BasicDistributedZkTest {
baseUrl = baseUrl.substring(0, baseUrl.length() - "collection1".length());
try (HttpSolrClient baseServer = getHttpSolrClient(baseUrl, 30000, 60000 * 5)) {
- baseServer.request(request);
+ NamedList<Object> rsp = baseServer.request(request);
+ log.info("Shard split response: " + Utils.toJSONString(rsp));
}
}