You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2013/04/08 17:19:42 UTC
svn commit: r1465663 - in /lucene/dev/branches/branch_4x: ./ solr/
solr/core/ solr/core/src/java/org/apache/solr/cloud/
solr/core/src/java/org/apache/solr/handler/admin/
solr/core/src/java/org/apache/solr/servlet/
solr/core/src/java/org/apache/solr/upd...
Author: shalin
Date: Mon Apr 8 15:19:41 2013
New Revision: 1465663
URL: http://svn.apache.org/r1465663
Log:
SOLR-3755: A new collections api to add additional shards dynamically by splitting existing shards
Added:
lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
- copied unchanged from r1465661, lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java
- copied unchanged from r1465661, lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java
Modified:
lucene/dev/branches/branch_4x/ (props changed)
lucene/dev/branches/branch_4x/solr/ (props changed)
lucene/dev/branches/branch_4x/solr/CHANGES.txt (contents, props changed)
lucene/dev/branches/branch_4x/solr/core/ (props changed)
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/AssignShard.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/SliceStateUpdateTest.java
lucene/dev/branches/branch_4x/solr/solrj/ (props changed)
lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java
lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/DocRouter.java
lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/HashBasedRouter.java
lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ImplicitDocRouter.java
lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
Modified: lucene/dev/branches/branch_4x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/CHANGES.txt?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_4x/solr/CHANGES.txt Mon Apr 8 15:19:41 2013
@@ -111,6 +111,9 @@ New Features
and add additional configuration parameters. See the javadocs for more details and
examples. (Robert Muir)
+* SOLR-3755: A new collections api to add additional shards dynamically by splitting
+ existing shards. (yonik, Anshum Gupta, shalin)
+
Bug Fixes
----------------------
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/AssignShard.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/AssignShard.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/AssignShard.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/AssignShard.java Mon Apr 8 15:19:41 2013
@@ -39,7 +39,7 @@ public class AssignShard {
numShards = 1;
}
String returnShardId = null;
- Map<String, Slice> sliceMap = state.getSlicesMap(collection);
+ Map<String, Slice> sliceMap = state.getActiveSlicesMap(collection);
// TODO: now that we create shards ahead of time, is this code needed? Esp since hash ranges aren't assigned when creating via this method?
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java Mon Apr 8 15:19:41 2013
@@ -18,6 +18,7 @@ package org.apache.solr.cloud;
*/
import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.SolrParams;
public class CloudDescriptor {
@@ -28,6 +29,11 @@ public class CloudDescriptor {
private Integer numShards;
private String nodeName = null;
+ /* shardRange and shardState are used once-only during sub shard creation for shard splits
+ * Use the values from {@link Slice} instead */
+ volatile String shardRange = null;
+ volatile String shardState = Slice.ACTIVE;
+
volatile boolean isLeader = false;
volatile String lastPublished = ZkStateReader.ACTIVE;
@@ -89,4 +95,19 @@ public class CloudDescriptor {
this.nodeName = nodeName;
}
+ public String getShardRange() {
+ return shardRange;
+ }
+
+ public void setShardRange(String shardRange) {
+ this.shardRange = shardRange;
+ }
+
+ public String getShardState() {
+ return shardState;
+ }
+
+ public void setShardState(String shardState) {
+ this.shardState = shardState;
+ }
}
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Mon Apr 8 15:19:41 2013
@@ -268,6 +268,10 @@ final class ShardLeaderElectionContext e
.getClusterState();
Map<String,Slice> slices = clusterState.getSlicesMap(collection);
Slice slice = slices.get(shardId);
+ if (!slice.getState().equals(Slice.ACTIVE)) {
+ //Return false if the Slice is not active yet.
+ return false;
+ }
Map<String,Replica> replicasMap = slice.getReplicasMap();
for (Map.Entry<String,Replica> shard : replicasMap.entrySet()) {
String state = shard.getValue().getStr(ZkStateReader.STATE_PROP);
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/Overseer.java Mon Apr 8 15:19:41 2013
@@ -19,6 +19,7 @@ package org.apache.solr.cloud;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -27,6 +28,7 @@ import java.util.Map.Entry;
import java.util.Set;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClosableThread;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
@@ -190,6 +192,10 @@ public class Overseer {
message.getStr(ZkStateReader.SHARD_ID_PROP),
sb.length() > 0 ? sb.toString() : null);
+ } else if ("createshard".equals(operation)) {
+ clusterState = createShard(clusterState, message);
+ } else if ("updateshardstate".equals(operation)) {
+ clusterState = updateShardState(clusterState, message);
} else {
throw new RuntimeException("unknown operation:" + operation
+ " contents:" + message.getProperties());
@@ -197,6 +203,46 @@ public class Overseer {
return clusterState;
}
+ private ClusterState updateShardState(ClusterState clusterState, ZkNodeProps message) {
+ String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
+ log.info("Update shard state invoked for collection: " + collection);
+ for (String key : message.keySet()) {
+ if (ZkStateReader.COLLECTION_PROP.equals(key)) continue;
+ if (QUEUE_OPERATION.equals(key)) continue;
+
+ Slice slice = clusterState.getSlice(collection, key);
+ if (slice == null) {
+ throw new RuntimeException("Overseer.updateShardState unknown collection: " + collection + " slice: " + key);
+ }
+ log.info("Update shard state " + key + " to " + message.getStr(key));
+ Map<String, Object> props = slice.shallowCopy();
+ props.put(Slice.STATE, message.getStr(key));
+ Slice newSlice = new Slice(slice.getName(), slice.getReplicasCopy(), props);
+ clusterState = updateSlice(clusterState, collection, newSlice);
+ }
+
+ return clusterState;
+ }
+
+ private ClusterState createShard(ClusterState clusterState, ZkNodeProps message) {
+ String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
+ String shardId = message.getStr(ZkStateReader.SHARD_ID_PROP);
+ Slice slice = clusterState.getSlice(collection, shardId);
+ if (slice == null) {
+ Map<String, Replica> replicas = Collections.EMPTY_MAP;
+ Map<String, Object> sliceProps = new HashMap<String, Object>();
+ String shardRange = message.getStr(ZkStateReader.SHARD_RANGE_PROP);
+ String shardState = message.getStr(ZkStateReader.SHARD_STATE_PROP);
+ sliceProps.put(Slice.RANGE, shardRange);
+ sliceProps.put(Slice.STATE, shardState);
+ slice = new Slice(shardId, replicas, sliceProps);
+ clusterState = updateSlice(clusterState, collection, slice);
+ } else {
+ log.error("Unable to create Shard: " + shardId + " because it already exists in collection: " + collection);
+ }
+ return clusterState;
+ }
+
private boolean amILeader() {
try {
ZkNodeProps props = ZkNodeProps.load(zkClient.getData("/overseer_elect/leader", null, null, true));
@@ -211,6 +257,7 @@ public class Overseer {
log.info("According to ZK I (id=" + myId + ") am no longer a leader.");
return false;
}
+
/**
* Try to assign core to the cluster.
*/
@@ -247,15 +294,24 @@ public class Overseer {
log.info("Collection already exists with " + ZkStateReader.NUM_SHARDS_PROP + "=" + numShards);
}
sliceName = AssignShard.assignShard(collection, state, numShards);
- log.info("Assigning new node to shard shard=" + sliceName);
+ log.info("Assigning new node to shard=" + sliceName);
}
Slice slice = state.getSlice(collection, sliceName);
+
Map<String,Object> replicaProps = new LinkedHashMap<String,Object>();
replicaProps.putAll(message.getProperties());
// System.out.println("########## UPDATE MESSAGE: " + JSONUtil.toJSON(message));
if (slice != null) {
+ String sliceState = slice.getState();
+
+ // throw an exception if the slice is not yet active.
+
+ //if(!sliceState.equals(Slice.ACTIVE)) {
+ // throw new SolrException(ErrorCode.BAD_REQUEST, "Can not assign core to a non-active slice [" + slice.getName() + "]");
+ //}
+
Replica oldReplica = slice.getReplicasMap().get(coreNodeName);
if (oldReplica != null && oldReplica.containsKey(ZkStateReader.LEADER_PROP)) {
replicaProps.put(ZkStateReader.LEADER_PROP, oldReplica.get(ZkStateReader.LEADER_PROP));
@@ -278,6 +334,9 @@ public class Overseer {
replicaProps.remove(removeKey);
}
replicaProps.remove(ZkStateReader.CORE_NODE_NAME_PROP);
+ // remove shard specific properties
+ String shardRange = (String) replicaProps.remove(ZkStateReader.SHARD_RANGE_PROP);
+ String shardState = (String) replicaProps.remove(ZkStateReader.SHARD_STATE_PROP);
Replica replica = new Replica(coreNodeName, replicaProps);
@@ -292,6 +351,9 @@ public class Overseer {
replicas = slice.getReplicasCopy();
} else {
replicas = new HashMap<String, Replica>(1);
+ sliceProps = new HashMap<String, Object>();
+ sliceProps.put(Slice.RANGE, shardRange);
+ sliceProps.put(Slice.STATE, shardState);
}
replicas.put(replica.getName(), replica);
@@ -399,7 +461,11 @@ public class Overseer {
Slice slice = slices.get(sliceName);
if (slice == null) {
- log.error("Could not mark leader for non existing slice:" + sliceName);
+ slice = coll.getSlice(sliceName);
+ }
+
+ if (slice == null) {
+ log.error("Could not mark leader for non existing/active slice:" + sliceName);
return state;
} else {
// TODO: consider just putting the leader property on the shard, not on individual replicas
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Mon Apr 8 15:19:41 2013
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.cloud.DistributedQueue.QueueEvent;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
@@ -32,6 +33,8 @@ import org.apache.solr.common.cloud.Alia
import org.apache.solr.common.cloud.ClosableThread;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.PlainIdRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
@@ -70,6 +73,8 @@ public class OverseerCollectionProcessor
public static final String DELETEALIAS = "deletealias";
+ public static final String SPLITSHARD = "splitshard";
+
// TODO: use from Overseer?
private static final String QUEUE_OPERATION = "operation";
@@ -171,6 +176,8 @@ public class OverseerCollectionProcessor
createAlias(zkStateReader.getAliases(), message);
} else if (DELETEALIAS.equals(operation)) {
deleteAlias(zkStateReader.getAliases(), message);
+ } else if (SPLITSHARD.equals(operation)) {
+ splitShard(zkStateReader.getClusterState(), message, results);
} else {
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
+ operation);
@@ -290,6 +297,265 @@ public class OverseerCollectionProcessor
}
+ private boolean splitShard(ClusterState clusterState, ZkNodeProps message, NamedList results) {
+ log.info("Split shard invoked");
+ String collection = message.getStr("collection");
+ String slice = message.getStr(ZkStateReader.SHARD_ID_PROP);
+ Slice parentSlice = clusterState.getSlice(collection, slice);
+
+ if (parentSlice == null) {
+ if(clusterState.getCollections().contains(collection)) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "No shard with the specified name exists: " + slice);
+ } else {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "No collection with the specified name exists: " + collection);
+ }
+ }
+
+ // find the leader for the shard
+ Replica parentShardLeader = clusterState.getLeader(collection, slice);
+
+ DocRouter.Range range = parentSlice.getRange();
+ if (range == null) {
+ range = new PlainIdRouter().fullRange();
+ }
+
+ // todo: fixed to two partitions?
+ // todo: accept the range as a param to api?
+ // todo: handle randomizing subshard name in case a shard with the same name already exists.
+ List<DocRouter.Range> subRanges = new PlainIdRouter().partitionRange(2, range);
+ try {
+ List<String> subSlices = new ArrayList<String>(subRanges.size());
+ List<String> subShardNames = new ArrayList<String>(subRanges.size());
+ String nodeName = parentShardLeader.getNodeName();
+ for (int i = 0; i < subRanges.size(); i++) {
+ String subSlice = slice + "_" + i;
+ subSlices.add(subSlice);
+ String subShardName = collection + "_" + subSlice + "_replica1";
+ subShardNames.add(subShardName);
+
+ Slice oSlice = clusterState.getSlice(collection, subSlice);
+ if (oSlice != null) {
+ if (Slice.ACTIVE.equals(oSlice.getState())) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Sub-shard: " + subSlice + " exists in active state. Aborting split shard.");
+ } else if (Slice.CONSTRUCTION.equals(oSlice.getState())) {
+ for (Replica replica : oSlice.getReplicas()) {
+ String core = replica.getStr("core");
+ log.info("Unloading core: " + core + " from node: " + replica.getNodeName());
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
+ params.set(CoreAdminParams.CORE, core);
+ sendShardRequest(replica.getNodeName(), params);
+ }
+ }
+ }
+ }
+
+ ShardResponse srsp;
+ do {
+ srsp = shardHandler.takeCompletedOrError();
+ if (srsp != null) {
+ processResponse(results, srsp);
+ }
+ } while (srsp != null);
+
+ for (int i=0; i<subRanges.size(); i++) {
+ String subSlice = subSlices.get(i);
+ String subShardName = subShardNames.get(i);
+ DocRouter.Range subRange = subRanges.get(i);
+
+ log.info("Creating shard " + subShardName + " as part of slice "
+ + subSlice + " of collection " + collection + " on "
+ + nodeName);
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
+
+ params.set(CoreAdminParams.NAME, subShardName);
+ params.set(CoreAdminParams.COLLECTION, collection);
+ params.set(CoreAdminParams.SHARD, subSlice);
+ params.set(CoreAdminParams.SHARD_RANGE, subRange.toString());
+ params.set(CoreAdminParams.SHARD_STATE, Slice.CONSTRUCTION);
+ //params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices); todo: is it necessary, we're not creating collections?
+
+ sendShardRequest(nodeName, params);
+
+ // wait for parent leader to acknowledge the sub-shard core
+ log.info("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName);
+ CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
+ cmd.setCoreName(subShardName);
+ cmd.setNodeName(nodeName);
+ cmd.setCoreNodeName(nodeName + "_" + subShardName);
+ cmd.setState(ZkStateReader.ACTIVE);
+ cmd.setCheckLive(true);
+ cmd.setOnlyIfLeader(true);
+ sendShardRequest(nodeName, new ModifiableSolrParams(cmd.getParams()));
+ }
+
+ do {
+ srsp = shardHandler.takeCompletedOrError();
+ if (srsp != null) {
+ processResponse(results, srsp);
+ }
+ } while (srsp != null);
+
+ log.info("Successfully created all sub-shards for collection "
+ + collection + " parent shard: " + slice + " on: " + parentShardLeader);
+
+ log.info("Splitting shard " + parentShardLeader.getName() + " as part of slice "
+ + slice + " of collection " + collection + " on "
+ + parentShardLeader);
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.SPLIT.toString());
+ params.set(CoreAdminParams.CORE, parentShardLeader.getStr("core"));
+ for (int i = 0; i < subShardNames.size(); i++) {
+ String subShardName = subShardNames.get(i);
+ params.add(CoreAdminParams.TARGET_CORE, subShardName);
+ }
+
+ sendShardRequest(parentShardLeader.getNodeName(), params);
+ do {
+ srsp = shardHandler.takeCompletedOrError();
+ if (srsp != null) {
+ processResponse(results, srsp);
+ }
+ } while (srsp != null);
+
+ log.info("Index on shard: " + nodeName + " split into two successfully");
+
+ // apply buffered updates on sub-shards
+ for (int i = 0; i < subShardNames.size(); i++) {
+ String subShardName = subShardNames.get(i);
+
+ log.info("Applying buffered updates on : " + subShardName);
+
+ params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTAPPLYUPDATES.toString());
+ params.set(CoreAdminParams.NAME, subShardName);
+
+ sendShardRequest(nodeName, params);
+ }
+
+ do {
+ srsp = shardHandler.takeCompletedOrError();
+ if (srsp != null) {
+ processResponse(results, srsp);
+ }
+ } while (srsp != null);
+
+ log.info("Successfully applied buffered updates on : " + subShardNames);
+
+ // Replica creation for the new Slices
+
+ // look at the replication factor and see if it matches reality
+ // if it does not, find best nodes to create more cores
+
+ // TODO: Have replication factor decided in some other way instead of numShards for the parent
+
+ int repFactor = clusterState.getSlice(collection, slice).getReplicas().size();
+
+ // we need to look at every node and see how many cores it serves
+ // add our new cores to existing nodes serving the least number of cores
+ // but (for now) require that each core goes on a distinct node.
+
+ // TODO: add smarter options that look at the current number of cores per
+ // node?
+ // for now we just go random
+ Set<String> nodes = clusterState.getLiveNodes();
+ List<String> nodeList = new ArrayList<String>(nodes.size());
+ nodeList.addAll(nodes);
+
+ Collections.shuffle(nodeList);
+
+ // TODO: Have maxShardsPerNode param for this operation?
+
+ // Remove the node that hosts the parent shard for replica creation.
+ nodeList.remove(nodeName);
+
+ // TODO: change this to handle sharding a slice into > 2 sub-shards.
+
+ for (int i = 1; i <= subSlices.size(); i++) {
+ Collections.shuffle(nodeList);
+ String sliceName = subSlices.get(i - 1);
+ for (int j = 2; j <= repFactor; j++) {
+ String subShardNodeName = nodeList.get((repFactor * (i - 1) + (j - 2)) % nodeList.size());
+ String shardName = collection + "_" + sliceName + "_replica" + (j);
+
+ log.info("Creating replica shard " + shardName + " as part of slice "
+ + sliceName + " of collection " + collection + " on "
+ + subShardNodeName);
+
+ // Need to create new params for each request
+ params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
+
+ params.set(CoreAdminParams.NAME, shardName);
+ params.set(CoreAdminParams.COLLECTION, collection);
+ params.set(CoreAdminParams.SHARD, sliceName);
+ // TODO: Figure the config used by the parent shard and use it.
+ //params.set("collection.configName", configName);
+
+ //Not using this property. Do we really need to use it?
+ //params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
+
+ sendShardRequest(subShardNodeName, params);
+
+ // wait for the replicas to be seen as active on sub shard leader
+ log.info("Asking sub shard leader to wait for: " + shardName + " to be alive on: " + subShardNodeName);
+ CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
+ cmd.setCoreName(subShardNames.get(i-1));
+ cmd.setNodeName(subShardNodeName);
+ cmd.setCoreNodeName(subShardNodeName + "_" + shardName);
+ cmd.setState(ZkStateReader.ACTIVE);
+ cmd.setCheckLive(true);
+ cmd.setOnlyIfLeader(true);
+ sendShardRequest(nodeName, new ModifiableSolrParams(cmd.getParams()));
+ }
+ }
+
+ do {
+ srsp = shardHandler.takeCompletedOrError();
+ if (srsp != null) {
+ processResponse(results, srsp);
+ }
+ } while (srsp != null);
+ log.info("Successfully created all replica shards for all sub-slices "
+ + subSlices);
+
+ log.info("Requesting update shard state");
+ DistributedQueue inQueue = Overseer.getInQueue(zkStateReader.getZkClient());
+ Map<String, Object> propMap = new HashMap<String, Object>();
+ propMap.put(Overseer.QUEUE_OPERATION, "updateshardstate");
+ propMap.put(slice, Slice.INACTIVE);
+ for (String subSlice : subSlices) {
+ propMap.put(subSlice, Slice.ACTIVE);
+ }
+ propMap.put(ZkStateReader.COLLECTION_PROP, collection);
+ ZkNodeProps m = new ZkNodeProps(propMap);
+ inQueue.offer(ZkStateReader.toJSON(m));
+
+ return true;
+ } catch (SolrException e) {
+ throw e;
+ } catch (Exception e) {
+ log.error("Error executing split operation for collection: " + collection + " parent shard: " + slice, e);
+ throw new SolrException(ErrorCode.SERVER_ERROR, null, e);
+ }
+ }
+
+ private void sendShardRequest(String nodeName, ModifiableSolrParams params) {
+ ShardRequest sreq = new ShardRequest();
+ params.set("qt", adminPath);
+ sreq.purpose = 1;
+ String replica = zkStateReader.getZkClient().getBaseUrlForNodeName(nodeName);
+ if (replica.startsWith("http://")) replica = replica.substring(7);
+ sreq.shards = new String[]{replica};
+ sreq.actualShards = sreq.shards;
+ sreq.params = params;
+
+ shardHandler.submit(sreq, replica, sreq.params);
+ }
+
private void createCollection(ClusterState clusterState, ZkNodeProps message, NamedList results) {
String collectionName = message.getStr("name");
if (clusterState.getCollections().contains(collectionName)) {
@@ -384,7 +650,6 @@ public class OverseerCollectionProcessor
params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
ShardRequest sreq = new ShardRequest();
- sreq.nodeName = nodeName;
params.set("qt", adminPath);
sreq.purpose = 1;
String replica = zkStateReader.getZkClient()
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/cloud/ZkController.java Mon Apr 8 15:19:41 2013
@@ -792,6 +792,13 @@ public final class ZkController {
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
if (!core.isReloaded() && ulog != null) {
+ // disable recovery in case shard is in construction state (for shard splits)
+ Slice slice = getClusterState().getSlice(collection, shardId);
+ if (Slice.CONSTRUCTION.equals(slice.getState())) {
+ core.getUpdateHandler().getUpdateLog().bufferUpdates();
+ publish(desc, ZkStateReader.ACTIVE);
+
+ } else {
Future<UpdateLog.RecoveryInfo> recoveryFuture = core.getUpdateHandler()
.getUpdateLog().recoverFromLog();
if (recoveryFuture != null) {
@@ -802,12 +809,13 @@ public final class ZkController {
} else {
log.info("No LogReplay needed for core="+core.getName() + " baseURL=" + baseUrl);
}
- }
boolean didRecovery = checkRecovery(coreName, desc, recoverReloadedCores, isLeader, cloudDesc,
collection, coreZkNodeName, shardId, leaderProps, core, cc);
if (!didRecovery) {
publish(desc, ZkStateReader.ACTIVE);
}
+ }
+ }
} finally {
if (core != null) {
core.close();
@@ -817,7 +825,6 @@ public final class ZkController {
// make sure we have an update cluster state right away
zkStateReader.updateClusterState(true);
-
return shardId;
}
@@ -993,6 +1000,8 @@ public final class ZkController {
ZkStateReader.ROLES_PROP, cd.getCloudDescriptor().getRoles(),
ZkStateReader.NODE_NAME_PROP, getNodeName(),
ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId(),
+ ZkStateReader.SHARD_RANGE_PROP, cd.getCloudDescriptor().getShardRange(),
+ ZkStateReader.SHARD_STATE_PROP, cd.getCloudDescriptor().getShardState(),
ZkStateReader.COLLECTION_PROP, cd.getCloudDescriptor()
.getCollectionName(),
ZkStateReader.NUM_SHARDS_PROP, numShards != null ? numShards.toString()
@@ -1274,6 +1283,9 @@ public final class ZkController {
// before becoming available, make sure we are not live and active
// this also gets us our assigned shard id if it was not specified
publish(cd, ZkStateReader.DOWN, false);
+ // shardState and shardRange are for one-time use only, thereafter the actual values in the Slice should be used
+ cd.getCloudDescriptor().setShardState(null);
+ cd.getCloudDescriptor().setShardRange(null);
String coreNodeName = getCoreNodeName(cd);
// make sure the node name is set on the descriptor
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Mon Apr 8 15:19:41 2013
@@ -132,9 +132,13 @@ public class CollectionsHandler extends
this.handleDeleteAliasAction(req, rsp);
break;
}
+ case SPLITSHARD: {
+ this.handleSplitShardAction(req, rsp);
+ break;
+ }
+
default: {
- throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown action: "
- + action);
+ throw new RuntimeException("Unknown action: " + action);
}
}
@@ -277,6 +281,26 @@ public class CollectionsHandler extends
handleResponse(OverseerCollectionProcessor.CREATECOLLECTION, m, rsp);
}
+ private void handleSplitShardAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
+ log.info("Splitting shard : " + req.getParamString());
+ String name = req.getParams().required().get("collection");
+ // TODO : add support for multiple shards
+ String shard = req.getParams().required().get("shard");
+ // TODO : add support for shard range
+
+ Map<String,Object> props = new HashMap<String,Object>();
+ props.put(Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.SPLITSHARD);
+ props.put("collection", name);
+ props.put(ZkStateReader.SHARD_ID_PROP, shard);
+
+ ZkNodeProps m = new ZkNodeProps(props);
+
+ // todo remove this hack
+ DEFAULT_ZK_TIMEOUT *= 5;
+ handleResponse(OverseerCollectionProcessor.SPLITSHARD, m, rsp);
+ DEFAULT_ZK_TIMEOUT /= 5;
+ }
+
public static ModifiableSolrParams params(String... params) {
ModifiableSolrParams msp = new ModifiableSolrParams();
for (int i=0; i<params.length; i+=2) {
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Mon Apr 8 15:19:41 2013
@@ -28,6 +28,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.Future;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
@@ -40,6 +41,7 @@ import org.apache.solr.cloud.ZkControlle
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
@@ -64,6 +66,7 @@ import org.apache.solr.response.SolrQuer
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.MergeIndexesCommand;
import org.apache.solr.update.SplitIndexCommand;
+import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
import org.apache.solr.util.NumberUtils;
@@ -198,6 +201,12 @@ public class CoreAdminHandler extends Re
break;
}
+ // todo : Can this be done by the regular RecoveryStrategy route?
+ case REQUESTAPPLYUPDATES: {
+ this.handleRequestApplyUpdatesAction(req, rsp);
+ break;
+ }
+
default: {
doPersist = this.handleCustomAction(req, rsp);
break;
@@ -214,6 +223,12 @@ public class CoreAdminHandler extends Re
rsp.setHttpCaching(false);
}
+
+ /**
+ * Handle the core admin SPLIT action.
+ * @return true if a modification has resulted that requires persistence
+ * of the CoreContainer configuration.
+ */
protected boolean handleSplitAction(SolrQueryRequest adminReq, SolrQueryResponse rsp) throws IOException {
SolrParams params = adminReq.getParams();
// partitions=N (split into N partitions, leaving it up to solr what the ranges are and where to put them)
@@ -228,6 +243,7 @@ public class CoreAdminHandler extends Re
String[] newCoreNames = params.getParams("targetCore");
String cname = params.get(CoreAdminParams.CORE, "");
+ log.info("Invoked split action for core: " + cname);
SolrCore core = coreContainer.getCore(cname);
SolrQueryRequest req = new LocalSolrQueryRequest(core, params);
List<SolrCore> newCores = null;
@@ -237,14 +253,17 @@ public class CoreAdminHandler extends Re
List<String> paths = null;
int partitions = pathsArr != null ? pathsArr.length : params.getInt("partitions", 2);
-
// TODO: if we don't know the real range of the current core, we should just
// split on every other doc rather than hash.
+ ClusterState clusterState = coreContainer.getZkController().getClusterState();
+ String collectionName = req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName();
+ DocCollection collection = clusterState.getCollection(collectionName);
+ String sliceName = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId();
+ Slice slice = clusterState.getSlice(collectionName, sliceName);
+ DocRouter.Range currentRange = slice.getRange() == null ?
+ new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE) : slice.getRange();
- // TODO (cloud): get from the current core
- DocRouter.Range currentRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE);
-
- DocRouter hp = DocRouter.DEFAULT; // TODO: get actual doc router for collection if available
+ DocRouter hp = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
ranges = hp.partitionRange(partitions, currentRange);
if (pathsArr == null) {
@@ -257,9 +276,6 @@ public class CoreAdminHandler extends Re
throw new SolrException(ErrorCode.BAD_REQUEST, "Core with core name " + newCoreName + " expected but doesn't exist.");
}
}
- // TODO (cloud): cores should be registered, should be in recovery / buffering-updates mode, and the shard
- // leader should be forwarding updates to the new shards *before* we split the current shard
- // into the new shards.
} else {
paths = Arrays.asList(pathsArr);
}
@@ -462,6 +478,14 @@ public class CoreAdminHandler extends Re
if (opts != null)
cd.setShardId(opts);
+ opts = params.get(CoreAdminParams.SHARD_RANGE);
+ if (opts != null)
+ cd.setShardRange(opts);
+
+ opts = params.get(CoreAdminParams.SHARD_STATE);
+ if (opts != null)
+ cd.setShardState(opts);
+
opts = params.get(CoreAdminParams.ROLES);
if (opts != null)
cd.setRoles(opts);
@@ -848,6 +872,9 @@ public class CoreAdminHandler extends Re
Boolean checkLive = params.getBool("checkLive");
Boolean onlyIfLeader = params.getBool("onlyIfLeader");
+ log.info("Going to wait for coreNodeName: " + coreNodeName + ", state: " + waitForState
+ + ", checkLive: " + checkLive + ", onlyIfLeader: " + onlyIfLeader);
+
String state = null;
boolean live = false;
int retry = 0;
@@ -942,6 +969,47 @@ public class CoreAdminHandler extends Re
Thread.sleep(1000);
}
+ log.info("Waited coreNodeName: " + coreNodeName + ", state: " + waitForState
+ + ", checkLive: " + checkLive + ", onlyIfLeader: " + onlyIfLeader + " for: " + retry + " seconds.");
+ }
+
+ private void handleRequestApplyUpdatesAction(SolrQueryRequest req, SolrQueryResponse rsp) {
+ SolrParams params = req.getParams();
+ String cname = params.get(CoreAdminParams.NAME, "");
+ SolrCore core = coreContainer.getCore(cname);
+ try {
+ UpdateLog updateLog = core.getUpdateHandler().getUpdateLog();
+ if (updateLog.getState() != UpdateLog.State.BUFFERING) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Core " + cname + " not in buffering state");
+ }
+ Future<UpdateLog.RecoveryInfo> future = updateLog.applyBufferedUpdates();
+ if (future == null) {
+ log.info("No buffered updates available. core=" + cname);
+ rsp.add("core", cname);
+ rsp.add("status", "EMPTY_BUFFER");
+ return;
+ }
+ UpdateLog.RecoveryInfo report = future.get();
+ if (report.failed) {
+ SolrException.log(log, "Replay failed");
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Replay failed");
+ }
+ coreContainer.getZkController().publish(core.getCoreDescriptor(), ZkStateReader.ACTIVE);
+ rsp.add("core", cname);
+ rsp.add("status", "BUFFER_APPLIED");
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.warn("Recovery was interrupted", e);
+ } catch (Throwable e) {
+ if (e instanceof SolrException)
+ throw (SolrException)e;
+ else
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Could not apply buffered updates", e);
+ } finally {
+ if (req != null) req.close();
+ if (core != null)
+ core.close();
+ }
}
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java Mon Apr 8 15:19:41 2013
@@ -486,7 +486,7 @@ public class SolrDispatchFilter implemen
private String getRemotCoreUrl(CoreContainer cores, String collectionName, String origCorename) {
ClusterState clusterState = cores.getZkController().getClusterState();
- Collection<Slice> slices = clusterState.getSlices(collectionName);
+ Collection<Slice> slices = clusterState.getActiveSlices(collectionName);
boolean byCoreName = false;
if (slices == null) {
// look by core name
@@ -494,7 +494,7 @@ public class SolrDispatchFilter implemen
Set<String> collections = clusterState.getCollections();
for (String collection : collections) {
slices = new ArrayList<Slice>();
- slices.addAll(clusterState.getSlices(collection));
+ slices.addAll(clusterState.getActiveSlices(collection));
}
}
@@ -541,7 +541,7 @@ public class SolrDispatchFilter implemen
ZkStateReader zkStateReader = cores.getZkController().getZkStateReader();
ClusterState clusterState = zkStateReader.getClusterState();
- Map<String,Slice> slices = clusterState.getSlicesMap(collection);
+ Map<String,Slice> slices = clusterState.getActiveSlicesMap(collection);
if (slices == null) {
return null;
}
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java Mon Apr 8 15:19:41 2013
@@ -744,7 +744,7 @@ public class DirectUpdateHandler2 extend
@Override
public void split(SplitIndexCommand cmd) throws IOException {
- // TODO: do a commit first?
+ commit(new CommitUpdateCommand(cmd.req, false));
SolrIndexSplitter splitter = new SolrIndexSplitter(cmd);
splitter.split();
}
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java Mon Apr 8 15:19:41 2013
@@ -28,6 +28,7 @@ import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.CharsRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.OpenBitSet;
import org.apache.solr.common.cloud.DocRouter;
@@ -149,8 +150,10 @@ public class SolrIndexSplitter {
// figure out the hash for the term
// TODO: hook in custom hashes (or store hashes)
- int hash = Hash.murmurhash3_x86_32(term.bytes, term.offset, term.length, 0);
-
+ // TODO: performance implications of using indexedToReadable?
+ CharsRef ref = new CharsRef(term.length);
+ ref = field.getType().indexedToReadable(term, ref);
+ int hash = Hash.murmurhash3_x86_32(ref, ref.offset, ref.length, 0);
docsEnum = termsEnum.docs(liveDocs, docsEnum, DocsEnum.FLAG_NONE);
for (;;) {
int doc = docsEnum.nextDoc();
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Mon Apr 8 15:19:41 2013
@@ -39,6 +39,7 @@ import org.apache.solr.common.SolrInputD
import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
@@ -137,6 +138,7 @@ public class DistributedUpdateProcessor
// method in this update processor
private boolean isLeader = true;
private boolean forwardToLeader = false;
+ private boolean forwardToSubShard = false;
private List<Node> nodes;
private int numNodes;
@@ -239,8 +241,12 @@ public class DistributedUpdateProcessor
List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
.getReplicaProps(collection, shardId, coreNodeName,
coreName, null, ZkStateReader.DOWN);
+
+ nodes = addSubShardLeaders(coll, shardId, id, doc, nodes);
if (replicaProps != null) {
+ if (nodes == null) {
nodes = new ArrayList<Node>(replicaProps.size());
+ }
// check for test param that lets us miss replicas
String[] skipList = req.getParams().getParams(TEST_DISTRIB_SKIP_SERVERS);
Set<String> skipListSet = null;
@@ -280,17 +286,55 @@ public class DistributedUpdateProcessor
return nodes;
}
+ private List<Node> addSubShardLeaders(DocCollection coll, String shardId, String docId, SolrInputDocument doc, List<Node> nodes) {
+ Collection<Slice> allSlices = coll.getSlices();
+ for (Slice aslice : allSlices) {
+ if (Slice.CONSTRUCTION.equals(aslice.getState())) {
+ DocRouter.Range myRange = coll.getSlice(shardId).getRange();
+ if (myRange == null) myRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE);
+ boolean isSubset = aslice.getRange() != null && aslice.getRange().isSubsetOf(myRange);
+ if (isSubset &&
+ (docId == null // in case of deletes
+ || (docId != null && coll.getRouter().isTargetSlice(docId, doc, req.getParams(), aslice.getName(), coll)))) {
+ Replica sliceLeader = aslice.getLeader();
+ // slice leader can be null because node/shard is created zk before leader election
+ if (sliceLeader != null) {
+ if (nodes == null) nodes = new ArrayList<Node>();
+ ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(sliceLeader);
+ nodes.add(new StdNode(nodeProps));
+ forwardToSubShard = true;
+ }
+ }
+ }
+ }
+ return nodes;
+ }
private void doDefensiveChecks(DistribPhase phase) {
boolean isReplayOrPeersync = (updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.REPLAY)) != 0;
if (isReplayOrPeersync) return;
String from = req.getParams().get("distrib.from");
- boolean localIsLeader = req.getCore().getCoreDescriptor().getCloudDescriptor().isLeader();
+ ClusterState clusterState = zkController.getClusterState();
+ CloudDescriptor cloudDescriptor = req.getCore().getCoreDescriptor().getCloudDescriptor();
+ Slice mySlice = clusterState.getSlice(collection, cloudDescriptor.getShardId());
+ boolean localIsLeader = cloudDescriptor.isLeader();
if (DistribPhase.FROMLEADER == phase && localIsLeader && from != null) { // from will be null on log replay
+ String fromShard = req.getParams().get("distrib.from.parent");
+ if (fromShard != null) {
+ // shard splitting case -- check ranges to see if we are a sub-shard
+ Slice fromSlice = zkController.getClusterState().getCollection(collection).getSlice(fromShard);
+ DocRouter.Range parentRange = fromSlice.getRange();
+ if (parentRange == null) parentRange = new DocRouter.Range(Integer.MIN_VALUE, Integer.MAX_VALUE);
+ if (mySlice.getRange() != null && !mySlice.getRange().isSubsetOf(parentRange)) {
+ throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
+ "Request says it is coming from parent shard leader but parent hash range is not superset of my range");
+ }
+ } else {
log.error("Request says it is coming from leader, but we are the leader: " + req.getParamString());
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Request says it is coming from leader, but we are the leader");
}
+ }
if (isLeader && !localIsLeader) {
log.error("ClusterState says we are the leader, but locally we don't think so");
@@ -324,6 +368,8 @@ public class DistributedUpdateProcessor
nodes.add(new StdNode(props));
}
}
+
+ nodes = addSubShardLeaders(zkController.getClusterState().getCollection(collection), shardId, null, null, nodes);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
@@ -367,6 +413,9 @@ public class DistributedUpdateProcessor
params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
}
+ if (forwardToSubShard) {
+ params.set("distrib.from.parent", req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId());
+ }
params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/SliceStateUpdateTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/SliceStateUpdateTest.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/SliceStateUpdateTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/SliceStateUpdateTest.java Mon Apr 8 15:19:41 2013
@@ -171,7 +171,7 @@ public class SliceStateUpdateTest extend
Map<String, Slice> slices = null;
for (int i = 75; i > 0; i--) {
clusterState2 = zkController2.getClusterState();
- slices = clusterState2.getAllSlicesMap("collection1");
+ slices = clusterState2.getSlicesMap("collection1");
if (slices != null && slices.containsKey("shard1")
&& slices.get("shard1").getState().equals("inactive")) {
break;
Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java Mon Apr 8 15:19:41 2013
@@ -228,7 +228,7 @@ public class CloudSolrServer extends Sol
// add it to the Map of slices.
Map<String,Slice> slices = new HashMap<String,Slice>();
for (String collectionName : collectionsList) {
- Collection<Slice> colSlices = clusterState.getSlices(collectionName);
+ Collection<Slice> colSlices = clusterState.getActiveSlices(collectionName);
if (colSlices == null) {
throw new SolrServerException("Could not find collection:" + collectionName);
}
Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java Mon Apr 8 15:19:41 2013
@@ -115,10 +115,10 @@ public class ClusterState implements JSO
return coll.getSlicesMap();
}
- public Map<String, Slice> getAllSlicesMap(String collection) {
+ public Map<String, Slice> getActiveSlicesMap(String collection) {
DocCollection coll = collectionStates.get(collection);
if (coll == null) return null;
- return coll.getAllSlicesMap();
+ return coll.getActiveSlicesMap();
}
public Collection<Slice> getSlices(String collection) {
@@ -127,6 +127,12 @@ public class ClusterState implements JSO
return coll.getSlices();
}
+ public Collection<Slice> getActiveSlices(String collection) {
+ DocCollection coll = collectionStates.get(collection);
+ if (coll == null) return null;
+ return coll.getActiveSlices();
+ }
+
/**
* Get the named DocCollection object, or throw an exception if it doesn't exist.
*/
Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java Mon Apr 8 15:19:41 2013
@@ -91,7 +91,7 @@ public class CompositeIdRouter extends H
if (shardKey == null) {
// search across whole collection
// TODO: this may need modification in the future when shard splitting could cause an overlap
- return collection.getSlices();
+ return collection.getActiveSlices();
}
String id = shardKey;
@@ -132,7 +132,7 @@ public class CompositeIdRouter extends H
Range completeRange = new Range(lowerBound, upperBound);
List<Slice> targetSlices = new ArrayList<Slice>(1);
- for (Slice slice : collection.getSlices()) {
+ for (Slice slice : collection.getActiveSlices()) {
Range range = slice.getRange();
if (range != null && range.overlaps(completeRange)) {
targetSlices.add(slice);
Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java Mon Apr 8 15:19:41 2013
@@ -36,7 +36,7 @@ public class DocCollection extends ZkNod
private final String name;
private final Map<String, Slice> slices;
- private final Map<String, Slice> allSlices;
+ private final Map<String, Slice> activeSlices;
private final DocRouter router;
/**
@@ -48,15 +48,15 @@ public class DocCollection extends ZkNod
super( props==null ? Collections.<String,Object>emptyMap() : props);
this.name = name;
- this.allSlices = slices;
- this.slices = new HashMap<String, Slice>();
+ this.slices = slices;
+ this.activeSlices = new HashMap<String, Slice>();
Iterator<Map.Entry<String, Slice>> iter = slices.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, Slice> slice = iter.next();
if (slice.getValue().getState().equals(Slice.ACTIVE))
- this.slices.put(slice.getKey(), slice.getValue());
+ this.activeSlices.put(slice.getKey(), slice.getValue());
}
this.router = router;
@@ -72,11 +72,11 @@ public class DocCollection extends ZkNod
}
public Slice getSlice(String sliceName) {
- return allSlices.get(sliceName);
+ return slices.get(sliceName);
}
/**
- * Gets the list of active slices for this collection.
+ * Gets the list of all slices for this collection.
*/
public Collection<Slice> getSlices() {
return slices.values();
@@ -84,24 +84,24 @@ public class DocCollection extends ZkNod
/**
- * Return the list of all slices for this collection.
+ * Return the list of active slices for this collection.
*/
- public Collection<Slice> getAllSlices() {
- return allSlices.values();
+ public Collection<Slice> getActiveSlices() {
+ return activeSlices.values();
}
/**
- * Get the map of active slices (sliceName->Slice) for this collection.
+ * Get the map of all slices (sliceName->Slice) for this collection.
*/
public Map<String, Slice> getSlicesMap() {
return slices;
}
/**
- * Get the map of all slices (sliceName->Slice) for this collection.
+ * Get the map of active slices (sliceName->Slice) for this collection.
*/
- public Map<String, Slice> getAllSlicesMap() {
- return allSlices;
+ public Map<String, Slice> getActiveSlicesMap() {
+ return activeSlices;
}
public DocRouter getRouter() {
@@ -115,9 +115,9 @@ public class DocCollection extends ZkNod
@Override
public void write(JSONWriter jsonWriter) {
- LinkedHashMap<String, Object> all = new LinkedHashMap<String, Object>(allSlices.size() + 1);
+ LinkedHashMap<String, Object> all = new LinkedHashMap<String, Object>(slices.size() + 1);
all.putAll(propMap);
- all.put(SHARDS, allSlices);
+ all.put(SHARDS, slices);
jsonWriter.write(all);
}
}
Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/DocRouter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/DocRouter.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/DocRouter.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/DocRouter.java Mon Apr 8 15:19:41 2013
@@ -164,6 +164,8 @@ public abstract class DocRouter {
**/
public abstract Collection<Slice> getSearchSlicesSingle(String shardKey, SolrParams params, DocCollection collection);
+ public abstract boolean isTargetSlice(String id, SolrInputDocument sdoc, SolrParams params, String shardId, DocCollection collection);
+
/** This method is consulted to determine what slices should be queried for a request when
* an explicit shards parameter was not used.
Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/HashBasedRouter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/HashBasedRouter.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/HashBasedRouter.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/HashBasedRouter.java Mon Apr 8 15:19:41 2013
@@ -34,6 +34,14 @@ public abstract class HashBasedRouter ex
return hashToSlice(hash, collection);
}
+ @Override
+ public boolean isTargetSlice(String id, SolrInputDocument sdoc, SolrParams params, String shardId, DocCollection collection) {
+ if (id == null) id = getId(sdoc, params);
+ int hash = sliceHash(id, sdoc, params);
+ Range range = collection.getSlice(shardId).getRange();
+ return range != null && range.includes(hash);
+ }
+
protected int sliceHash(String id, SolrInputDocument sdoc, SolrParams params) {
return Hash.murmurhash3_x86_32(id, 0, id.length(), 0);
}
@@ -45,11 +53,11 @@ public abstract class HashBasedRouter ex
}
protected Slice hashToSlice(int hash, DocCollection collection) {
- for (Slice slice : collection.getSlices()) {
+ for (Slice slice : collection.getActiveSlices()) {
Range range = slice.getRange();
if (range != null && range.includes(hash)) return slice;
}
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No slice servicing hash code " + Integer.toHexString(hash) + " in " + collection);
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No active slice servicing hash code " + Integer.toHexString(hash) + " in " + collection);
}
@@ -58,7 +66,7 @@ public abstract class HashBasedRouter ex
if (shardKey == null) {
// search across whole collection
// TODO: this may need modification in the future when shard splitting could cause an overlap
- return collection.getSlices();
+ return collection.getActiveSlices();
}
// use the shardKey as an id for plain hashing
Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ImplicitDocRouter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ImplicitDocRouter.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ImplicitDocRouter.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ImplicitDocRouter.java Mon Apr 8 15:19:41 2013
@@ -55,9 +55,15 @@ public class ImplicitDocRouter extends D
}
@Override
+ public boolean isTargetSlice(String id, SolrInputDocument sdoc, SolrParams params, String shardId, DocCollection collection) {
+ // todo : how to handle this?
+ return false;
+ }
+
+ @Override
public Collection<Slice> getSearchSlicesSingle(String shardKey, SolrParams params, DocCollection collection) {
if (shardKey == null) {
- return collection.getSlices();
+ return collection.getActiveSlices();
}
// assume the shardKey is just a slice name
Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java Mon Apr 8 15:19:41 2013
@@ -34,6 +34,8 @@ public class Slice extends ZkNodeProps {
public static String STATE = "state";
public static String LEADER = "leader"; // FUTURE: do we want to record the leader as a slice property in the JSON (as opposed to isLeader as a replica property?)
public static String ACTIVE = "active";
+ public static String INACTIVE = "inactive";
+ public static String CONSTRUCTION = "construction";
private final String name;
private final DocRouter.Range range;
@@ -52,10 +54,10 @@ public class Slice extends ZkNodeProps {
this.name = name;
Object rangeObj = propMap.get(RANGE);
- if (propMap.containsKey(STATE))
- state = (String) propMap.get(STATE);
+ if (propMap.containsKey(STATE) && propMap.get(STATE) != null)
+ this.state = (String) propMap.get(STATE);
else {
- state = ACTIVE; //Default to ACTIVE
+ this.state = ACTIVE; //Default to ACTIVE
propMap.put(STATE, this.state);
}
DocRouter.Range tmpRange = null;
Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Mon Apr 8 15:19:41 2013
@@ -57,6 +57,8 @@ public class ZkStateReader {
public static final String CORE_NAME_PROP = "core";
public static final String COLLECTION_PROP = "collection";
public static final String SHARD_ID_PROP = "shard";
+ public static final String SHARD_RANGE_PROP = "shard_range";
+ public static final String SHARD_STATE_PROP = "shard_state";
public static final String NUM_SHARDS_PROP = "numShards";
public static final String LEADER_PROP = "leader";
Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java Mon Apr 8 15:19:41 2013
@@ -28,7 +28,7 @@ public interface CollectionParams
public enum CollectionAction {
- CREATE, DELETE, RELOAD, SYNCSHARD, CREATEALIAS, DELETEALIAS;
+ CREATE, DELETE, RELOAD, SYNCSHARD, CREATEALIAS, DELETEALIAS, SPLITSHARD;
public static CollectionAction get( String p )
{
Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java?rev=1465663&r1=1465662&r2=1465663&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java Mon Apr 8 15:19:41 2013
@@ -74,6 +74,16 @@ public interface CoreAdminParams
/** The shard id in solr cloud */
public final static String SHARD = "shard";
+ /** The shard range in solr cloud */
+ public final static String SHARD_RANGE = "shard.range";
+
+ /** The shard range in solr cloud */
+ public final static String SHARD_STATE = "shard.state";
+
+ /** The target core to which a split index should be written to
+ * Multiple targetCores can be specified by multiple targetCore parameters */
+ public final static String TARGET_CORE = "targetCore";
+
public static final String ROLES = "roles";
public static final String CORE_NODE_NAME = "coreNodeName";
@@ -108,6 +118,7 @@ public interface CoreAdminParams
REQUESTSYNCSHARD,
CREATEALIAS,
DELETEALIAS,
+ REQUESTAPPLYUPDATES,
LOAD_ON_STARTUP,
TRANSIENT;