You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2013/10/14 12:48:03 UTC

svn commit: r1531845 - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/handler/admin/ core/src/test/org/apache/solr/cloud/ core/src/test/org/apache/solr/update/ solrj/src/java/org/apache/solr/common/cloud/

Author: shalin
Date: Mon Oct 14 10:48:02 2013
New Revision: 1531845

URL: http://svn.apache.org/r1531845
Log:
SOLR-5338: Split shards by a route key using split.key parameter

Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/SolrIndexSplitterTest.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1531845&r1=1531844&r2=1531845&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Mon Oct 14 10:48:02 2013
@@ -95,6 +95,8 @@ New Features
 * SOLR-5324: Make sub shard replica recovery and shard state switch asynchronous.
   (Yago Riveiro, shalin)
 
+* SOLR-5338: Split shards by a route key using split.key parameter. (shalin)
+
 Bug Fixes
 ----------------------
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1531845&r1=1531844&r2=1531845&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Mon Oct 14 10:48:02 2013
@@ -31,6 +31,7 @@ import org.apache.solr.common.SolrExcept
 import org.apache.solr.common.cloud.Aliases;
 import org.apache.solr.common.cloud.ClosableThread;
 import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.CompositeIdRouter;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.common.cloud.ImplicitDocRouter;
@@ -57,6 +58,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -487,8 +489,34 @@ public class OverseerCollectionProcessor
     log.info("Split shard invoked");
     String collectionName = message.getStr("collection");
     String slice = message.getStr(ZkStateReader.SHARD_ID_PROP);
-    Slice parentSlice = clusterState.getSlice(collectionName, slice);
-    
+    String splitKey = message.getStr("split.key");
+
+    DocCollection collection = clusterState.getCollection(collectionName);
+    DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
+
+    Slice parentSlice = null;
+
+    if (slice == null)  {
+      if (router instanceof CompositeIdRouter) {
+        Collection<Slice> searchSlices = router.getSearchSlicesSingle(splitKey, new ModifiableSolrParams(), collection);
+        if (searchSlices.isEmpty()) {
+          throw new SolrException(ErrorCode.BAD_REQUEST, "Unable to find an active shard for split.key: " + splitKey);
+        }
+        if (searchSlices.size() > 1)  {
+          throw new SolrException(ErrorCode.BAD_REQUEST,
+              "Splitting a split.key: " + splitKey + " which spans multiple shards is not supported");
+        }
+        parentSlice = searchSlices.iterator().next();
+        slice = parentSlice.getName();
+        log.info("Split by route.key: {}, parent shard is: {} ", splitKey, slice);
+      } else  {
+        throw new SolrException(ErrorCode.BAD_REQUEST,
+            "Split by route key can only be used with CompositeIdRouter or subclass. Found router: " + router.getClass().getName());
+      }
+    } else  {
+      parentSlice = clusterState.getSlice(collectionName, slice);
+    }
+
     if (parentSlice == null) {
       if(clusterState.getCollections().contains(collectionName)) {
         throw new SolrException(ErrorCode.BAD_REQUEST, "No shard with the specified name exists: " + slice);
@@ -499,8 +527,6 @@ public class OverseerCollectionProcessor
     
     // find the leader for the shard
     Replica parentShardLeader = clusterState.getLeader(collectionName, slice);
-    DocCollection collection = clusterState.getCollection(collectionName);
-    DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
     DocRouter.Range range = parentSlice.getRange();
     if (range == null) {
       range = new PlainIdRouter().fullRange();
@@ -527,6 +553,23 @@ public class OverseerCollectionProcessor
           }
         }
       }
+    } else if (splitKey != null)  {
+      if (router instanceof CompositeIdRouter) {
+        CompositeIdRouter compositeIdRouter = (CompositeIdRouter) router;
+        subRanges = compositeIdRouter.partitionRangeByKey(splitKey, range);
+        if (subRanges.size() == 1)  {
+          throw new SolrException(ErrorCode.BAD_REQUEST,
+              "The split.key: " + splitKey + " has a hash range that is exactly equal to hash range of shard: " + slice);
+        }
+        log.info("Partitioning parent shard " + slice + " range: " + parentSlice.getRange() + " yields: " + subRanges);
+        rangesStr = "";
+        for (int i = 0; i < subRanges.size(); i++) {
+          DocRouter.Range subRange = subRanges.get(i);
+          rangesStr += subRange.toString();
+          if (i < subRanges.size() - 1)
+            rangesStr += ',';
+        }
+      }
     } else  {
       // todo: fixed to two partitions?
       subRanges = router.partitionRange(2, range);

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1531845&r1=1531844&r2=1531845&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Mon Oct 14 10:48:02 2013
@@ -369,13 +369,31 @@ public class CollectionsHandler extends 
     log.info("Splitting shard : " + req.getParamString());
     String name = req.getParams().required().get("collection");
     // TODO : add support for multiple shards
-    String shard = req.getParams().required().get("shard");
+    String shard = req.getParams().get("shard");
     String rangesStr = req.getParams().get(CoreAdminParams.RANGES);
+    String splitKey = req.getParams().get("split.key");
+
+    if (splitKey == null && shard == null) {
+      throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, "Missing required parameter: shard");
+    }
+    if (splitKey != null && shard != null)  {
+      throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,
+          "Only one of 'shard' or 'split.key' should be specified");
+    }
+    if (splitKey != null && rangesStr != null)  {
+      throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,
+          "Only one of 'ranges' or 'split.key' should be specified");
+    }
 
     Map<String,Object> props = new HashMap<String,Object>();
     props.put(Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.SPLITSHARD);
     props.put("collection", name);
-    props.put(ZkStateReader.SHARD_ID_PROP, shard);
+    if (shard != null)  {
+      props.put(ZkStateReader.SHARD_ID_PROP, shard);
+    }
+    if (splitKey != null) {
+      props.put("split.key", splitKey);
+    }
     if (rangesStr != null)  {
       props.put(CoreAdminParams.RANGES, rangesStr);
     }

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java?rev=1531845&r1=1531844&r2=1531845&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java Mon Oct 14 10:48:02 2013
@@ -135,7 +135,7 @@ public class ChaosMonkeyShardSplitTest e
       killerThread.start();
       killCounter.incrementAndGet();
 
-      splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, null);
+      splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, null, null);
 
       log.info("Layout after split: \n");
       printLayout();

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java?rev=1531845&r1=1531844&r2=1531845&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java Mon Oct 14 10:48:02 2013
@@ -28,6 +28,7 @@ import org.apache.solr.client.solrj.requ
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.CompositeIdRouter;
 import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.common.cloud.HashBasedRouter;
 import org.apache.solr.common.cloud.Replica;
@@ -110,6 +111,7 @@ public class ShardSplitTest extends Basi
 
     splitByUniqueKeyTest();
     splitByRouteFieldTest();
+    splitByRouteKeyTest();
 
     // todo can't call waitForThingsToLevelOut because it looks for jettys of all shards
     // and the new sub-shards don't have any.
@@ -176,7 +178,7 @@ public class ShardSplitTest extends Basi
     try {
       for (int i = 0; i < 3; i++) {
         try {
-          splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges);
+          splitShard(AbstractDistribZkTestBase.DEFAULT_COLLECTION, SHARD1, subRanges, null);
           log.info("Layout after split: \n");
           printLayout();
           break;
@@ -262,7 +264,7 @@ public class ShardSplitTest extends Basi
 
     for (int i = 0; i < 3; i++) {
       try {
-        splitShard(collectionName, SHARD1, null);
+        splitShard(collectionName, SHARD1, null, null);
         break;
       } catch (HttpSolrServer.RemoteSolrException e) {
         if (e.code() != 500) {
@@ -281,6 +283,96 @@ public class ShardSplitTest extends Basi
     assertEquals(docCounts[1], collectionClient.query(new SolrQuery("*:*").setParam("shards", "shard1_1")).getResults().getNumFound());
   }
 
+  private void splitByRouteKeyTest() throws Exception {
+    log.info("Starting splitByRouteKeyTest");
+    String collectionName = "splitByRouteKeyTest";
+    int numShards = 4;
+    int replicationFactor = 2;
+    int maxShardsPerNode = (((numShards * replicationFactor) / getCommonCloudSolrServer()
+        .getZkStateReader().getClusterState().getLiveNodes().size())) + 1;
+
+    HashMap<String, List<Integer>> collectionInfos = new HashMap<String, List<Integer>>();
+    CloudSolrServer client = null;
+    try {
+      client = createCloudClient(null);
+      Map<String, Object> props = ZkNodeProps.makeMap(
+          REPLICATION_FACTOR, replicationFactor,
+          MAX_SHARDS_PER_NODE, maxShardsPerNode,
+          NUM_SLICES, numShards);
+
+      createCollection(collectionInfos, collectionName,props,client);
+    } finally {
+      if (client != null) client.shutdown();
+    }
+
+    List<Integer> list = collectionInfos.get(collectionName);
+    checkForCollection(collectionName, list, null);
+
+    waitForRecoveriesToFinish(false);
+
+    String url = CustomCollectionTest.getUrlFromZk(getCommonCloudSolrServer().getZkStateReader().getClusterState(), collectionName);
+
+    HttpSolrServer collectionClient = new HttpSolrServer(url);
+
+    String splitKey = "b!";
+
+    ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
+    final DocRouter router = clusterState.getCollection(collectionName).getRouter();
+    Slice shard1 = clusterState.getSlice(collectionName, SHARD1);
+    DocRouter.Range shard1Range = shard1.getRange() != null ? shard1.getRange() : router.fullRange();
+    final List<DocRouter.Range> ranges = ((CompositeIdRouter) router).partitionRangeByKey(splitKey, shard1Range);
+    final int[] docCounts = new int[ranges.size()];
+
+    int uniqIdentifier = (1<<12);
+    int splitKeyDocCount = 0;
+    for (int i = 100; i <= 200; i++) {
+      String shardKey = "" + (char)('a' + (i % 26)); // See comment in ShardRoutingTest for hash distribution
+
+      String idStr = shardKey + "!" + i;
+      collectionClient.add(getDoc(id, idStr, "n_ti", (shardKey + "!").equals(splitKey) ? uniqIdentifier : i));
+      int idx = getHashRangeIdx(router, ranges, idStr);
+      if (idx != -1)  {
+        docCounts[idx]++;
+      }
+      if (splitKey.equals(shardKey + "!"))
+        splitKeyDocCount++;
+    }
+
+    for (int i = 0; i < docCounts.length; i++) {
+      int docCount = docCounts[i];
+      log.info("Shard {} docCount = {}", "shard1_" + i, docCount);
+    }
+    log.info("Route key doc count = {}", splitKeyDocCount);
+
+    collectionClient.commit();
+
+    for (int i = 0; i < 3; i++) {
+      try {
+        splitShard(collectionName, null, null, splitKey);
+        break;
+      } catch (HttpSolrServer.RemoteSolrException e) {
+        if (e.code() != 500) {
+          throw e;
+        }
+        log.error("SPLITSHARD failed. " + (i < 2 ? " Retring split" : ""), e);
+        if (i == 2) {
+          fail("SPLITSHARD was not successful even after three tries");
+        }
+      }
+    }
+
+    waitForRecoveriesToFinish(collectionName, false);
+    SolrQuery solrQuery = new SolrQuery("*:*");
+    assertEquals("DocCount on shard1_0 does not match", docCounts[0], collectionClient.query(solrQuery.setParam("shards", "shard1_0")).getResults().getNumFound());
+    assertEquals("DocCount on shard1_1 does not match", docCounts[1], collectionClient.query(solrQuery.setParam("shards", "shard1_1")).getResults().getNumFound());
+    assertEquals("DocCount on shard1_2 does not match", docCounts[2], collectionClient.query(solrQuery.setParam("shards", "shard1_2")).getResults().getNumFound());
+
+    solrQuery = new SolrQuery("n_ti:" + uniqIdentifier);
+    assertEquals("shard1_0 must have 0 docs for route key: " + splitKey, 0, collectionClient.query(solrQuery.setParam("shards", "shard1_0")).getResults().getNumFound());
+    assertEquals("Wrong number of docs on shard1_1 for route key: " + splitKey, splitKeyDocCount, collectionClient.query(solrQuery.setParam("shards", "shard1_1")).getResults().getNumFound());
+    assertEquals("shard1_2 must have 0 docs for route key: " + splitKey, 0, collectionClient.query(solrQuery.setParam("shards", "shard1_2")).getResults().getNumFound());
+  }
+
   protected void checkDocCountsAndShardStates(int[] docCounts, int numReplicas) throws Exception {
     ClusterState clusterState = null;
     Slice slice1_0 = null, slice1_1 = null;
@@ -351,11 +443,13 @@ public class ShardSplitTest extends Basi
     }
   }
 
-  protected void splitShard(String collection, String shardId, List<DocRouter.Range> subRanges) throws SolrServerException, IOException {
+  protected void splitShard(String collection, String shardId, List<DocRouter.Range> subRanges, String splitKey) throws SolrServerException, IOException {
     ModifiableSolrParams params = new ModifiableSolrParams();
     params.set("action", CollectionParams.CollectionAction.SPLITSHARD.toString());
     params.set("collection", collection);
-    params.set("shard", shardId);
+    if (shardId != null)  {
+      params.set("shard", shardId);
+    }
     if (subRanges != null)  {
       StringBuilder ranges = new StringBuilder();
       for (int i = 0; i < subRanges.size(); i++) {
@@ -366,6 +460,9 @@ public class ShardSplitTest extends Basi
       }
       params.set("ranges", ranges.toString());
     }
+    if (splitKey != null) {
+      params.set("split.key", splitKey);
+    }
     SolrRequest request = new QueryRequest(params);
     request.setPath("/admin/collections");
 

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/SolrIndexSplitterTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/SolrIndexSplitterTest.java?rev=1531845&r1=1531844&r2=1531845&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/SolrIndexSplitterTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/update/SolrIndexSplitterTest.java Mon Oct 14 10:48:02 2013
@@ -273,6 +273,12 @@ public class SolrIndexSplitterTest exten
     bytes = id2.getBytes("UTF-8");
     int maxHash = Hash.murmurhash3_x86_32(bytes, 0, bytes.length, 0);
 
+    if (minHash > maxHash)  {
+      int temp = maxHash;
+      maxHash = minHash;
+      minHash = temp;
+    }
+
     PlainIdRouter router = new PlainIdRouter();
     DocRouter.Range fullRange = new DocRouter.Range(minHash, maxHash);
     return router.partitionRange(2, fullRange);

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java?rev=1531845&r1=1531844&r2=1531845&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java Mon Oct 14 10:48:02 2013
@@ -93,6 +93,31 @@ public class CompositeIdRouter extends H
     return (hash1 & m1) | (hash2 & m2);
   }
 
+  public Range keyHashRange(String routeKey) {
+    int idx = routeKey.indexOf(separator);
+    if (idx < 0) {
+      throw new IllegalArgumentException("Route key must be a composite id");
+    }
+    String part1 = routeKey.substring(0, idx);
+    int commaIdx = part1.indexOf(bitsSeparator);
+    int m1 = mask1;
+    int m2 = mask2;
+
+    if (commaIdx > 0) {
+      int firstBits = getBits(part1, commaIdx);
+      if (firstBits >= 0) {
+        m1 = firstBits==0 ? 0 : (-1 << (32-firstBits));
+        m2 = firstBits==32 ? 0 : (-1 >>> firstBits);
+        part1 = part1.substring(0, commaIdx);
+      }
+    }
+
+    int hash = Hash.murmurhash3_x86_32(part1, 0, part1.length(), 0);
+    int min = hash & m1;
+    int max = min | m2;
+    return new Range(min, max);
+  }
+
   @Override
   public Collection<Slice> getSearchSlicesSingle(String shardKey, SolrParams params, DocCollection collection) {
     if (shardKey == null) {
@@ -149,6 +174,27 @@ public class CompositeIdRouter extends H
     return targetSlices;
   }
 
+  public List<Range> partitionRangeByKey(String key, Range range) {
+    List<Range> result = new ArrayList<Range>(3);
+    Range keyRange = keyHashRange(key);
+    if (!keyRange.overlaps(range)) {
+      throw new IllegalArgumentException("Key range does not overlap given range");
+    }
+    if (keyRange.equals(range)) {
+      return Collections.singletonList(keyRange);
+    } else if (keyRange.isSubsetOf(range)) {
+      result.add(new Range(range.min, keyRange.min - 1));
+      result.add(keyRange);
+      result.add((new Range(keyRange.max + 1, range.max)));
+    } else if (range.includes(keyRange.max))  {
+      result.add(new Range(range.min, keyRange.max));
+      result.add(new Range(keyRange.max + 1, range.max));
+    } else  {
+      result.add(new Range(range.min, keyRange.min - 1));
+      result.add(new Range(keyRange.min, range.max));
+    }
+    return result;
+  }
 
   @Override
   public List<Range> partitionRange(int partitions, Range range) {