You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2019/08/06 18:09:43 UTC

[lucene-solr] branch branch_8x updated: SOLR-13399: ability to use id field for compositeId histogram

This is an automated email from the ASF dual-hosted git repository.

yonik pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/branch_8x by this push:
     new d8f99a9  SOLR-13399: ability to use id field for compositeId histogram
d8f99a9 is described below

commit d8f99a9986835507d19b70edf0ff280416104788
Author: yonik <yo...@apache.org>
AuthorDate: Tue Aug 6 14:09:54 2019 -0400

    SOLR-13399: ability to use id field for compositeId histogram
---
 solr/CHANGES.txt                                   |   4 +-
 .../solr/cloud/api/collections/SplitShardCmd.java  |  10 +-
 .../org/apache/solr/handler/admin/SplitOp.java     | 144 ++++++++++++++++++---
 .../cloud/api/collections/SplitByPrefixTest.java   |  18 ++-
 .../solr/handler/admin/SplitHandlerTest.java       |  70 ++++++++++
 .../solr/common/cloud/CompositeIdRouter.java       |   5 +-
 6 files changed, 217 insertions(+), 34 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 73e4750..0c71460 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -45,8 +45,8 @@ New Features
   of two fields. (Gus Heck)
 
 * SOLR-13399: SPLITSHARD implements a new splitByPrefix option that takes into account the actual document distribution
-  when using compositeIds.  The id prefix should be indexed into the "id_prefix" field for this feature to work.
-  (yonik)
+  when using compositeIds.  Document distribution is calculated using the "id_prefix" field (if it exists) containing
+  just the compositeId prefixes, or directly from the indexed "id" field otherwise. (yonik)
 
 * SOLR-13565: Node level runtime libs loaded from remote urls  (noble)
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
index 4d623be..6c5921e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
@@ -212,16 +212,14 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
       if (message.getBool(CommonAdminParams.SPLIT_BY_PREFIX, true)) {
         t = timings.sub("getRanges");
 
-        log.info("Requesting split ranges from replica " + parentShardLeader.getName() + " as part of slice " + slice + " of collection "
-            + collectionName + " on " + parentShardLeader);
-
         ModifiableSolrParams params = new ModifiableSolrParams();
         params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.SPLIT.toString());
         params.set(CoreAdminParams.GET_RANGES, "true");
         params.set(CommonAdminParams.SPLIT_METHOD, splitMethod.toLower());
         params.set(CoreAdminParams.CORE, parentShardLeader.getStr("core"));
-        int numSubShards = message.getInt(NUM_SUB_SHARDS, DEFAULT_NUM_SUB_SHARDS);
-        params.set(NUM_SUB_SHARDS, Integer.toString(numSubShards));
+        // Only 2 is currently supported
+        // int numSubShards = message.getInt(NUM_SUB_SHARDS, DEFAULT_NUM_SUB_SHARDS);
+        // params.set(NUM_SUB_SHARDS, Integer.toString(numSubShards));
 
         {
           final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
@@ -236,7 +234,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
             NamedList shardRsp = (NamedList)successes.getVal(0);
             String splits = (String)shardRsp.get(CoreAdminParams.RANGES);
             if (splits != null) {
-              log.info("Resulting split range to be used is " + splits);
+              log.info("Resulting split ranges to be used: " + splits + " slice=" + slice + " leader=" + parentShardLeader);
               // change the message to use the recommended split ranges
               message = message.plus(CoreAdminParams.RANGES, splits);
             }
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java b/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java
index a37708f..d280b11 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java
@@ -187,11 +187,11 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
   }
 
 
-
-
-  // This is called when splitByPrefix is used.
-  // The overseer called us to get recommended splits taking into
-  // account actual document distribution over the hash space.
+  /**
+   *   This is called when splitByPrefix is used.
+   *   The overseer called us to get recommended splits taking into
+   *   account actual document distribution over the hash space.
+   */
   private void handleGetRanges(CoreAdminHandler.CallInfo it, String coreName) throws Exception {
 
     SolrCore parentCore = it.handler.coreContainer.getCore(coreName);
@@ -205,7 +205,9 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
       if (!it.handler.coreContainer.isZooKeeperAware()) {
         throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Shard splitByPrefix requires SolrCloud mode.");
       } else {
-        String routeFieldName = "id";
+        SolrIndexSearcher searcher = searcherHolder.get();
+
+        String routeFieldName = null;
         String prefixField = "id_prefix";
 
         ClusterState clusterState = it.handler.coreContainer.getZkController().getClusterState();
@@ -221,8 +223,19 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
           Map routerProps = (Map) routerObj;
           routeFieldName = (String) routerProps.get("field");
         }
+        if (routeFieldName == null) {
+          routeFieldName = searcher.getSchema().getUniqueKeyField().getName();
+        }
+
+        Collection<RangeCount> counts = getHashHistogram(searcher, prefixField, router, collection);
+
+        if (counts.size() == 0) {
+          // How to determine if we should look at the id field to figure out the prefix buckets?
+          // There may legitimately be no indexed terms in id_prefix if no ids have a prefix yet.
+          // For now, avoid using splitByPrefix unless you are actually using prefixes.
+          counts = getHashHistogramFromId(searcher, searcher.getSchema().getUniqueKeyField().getName(), router, collection);
+        }
 
-        Collection<RangeCount> counts = getHashHistogram(searcherHolder.get(), prefixField, router, collection);
         Collection<DocRouter.Range> splits = getSplits(counts, currentRange);
         String splitString = toSplitString(splits);
 
@@ -290,7 +303,9 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
   }
 
 
-  // Returns a list of range counts sorted by the range lower bound
+  /*
+   * Returns a list of range counts sorted by the range lower bound
+   */
   static Collection<RangeCount> getHashHistogram(SolrIndexSearcher searcher, String prefixField, DocRouter router, DocCollection collection) throws IOException {
     RTimer timer = new RTimer();
     TreeMap<DocRouter.Range,RangeCount> counts = new TreeMap<>();
@@ -306,9 +321,8 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
     long sumBuckets = 0;
 
     TermsEnum termsEnum = terms.iterator();
-    for (;;) {
-      BytesRef term = termsEnum.next();
-      if (term == null) break;
+    BytesRef term;
+    while ((term = termsEnum.next()) != null) {
       numPrefixes++;
 
       String termStr = term.utf8ToString();
@@ -340,8 +354,102 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
     return counts.values();
   }
 
+  /**
+   *   Returns a list of range counts sorted by the range lower bound, using the indexed "id" field (i.e. the terms are full IDs, not just prefixes)
+   */
+  static Collection<RangeCount> getHashHistogramFromId(SolrIndexSearcher searcher, String idField, DocRouter router, DocCollection collection) throws IOException {
+    RTimer timer = new RTimer();
+
+    TreeMap<DocRouter.Range, RangeCount> counts = new TreeMap<>();
+
+    Terms terms = MultiTerms.getTerms(searcher.getIndexReader(), idField);
+    if (terms == null) {
+      return counts.values();
+    }
+
+    int numPrefixes = 0;
+    int numCollisions = 0;
+    long sumBuckets = 0;
+
+
+    byte sep = (byte) CompositeIdRouter.SEPARATOR.charAt(0);
+    TermsEnum termsEnum = terms.iterator();
+    BytesRef currPrefix = new BytesRef();  // prefix of the previous "id" term
+    int bucketCount = 0; // count of the number of docs in the current bucket
+
+    // We're going to iterate over all terms, so do the minimum amount of work per term.
+    // Terms are sorted, so all terms sharing a prefix will be grouped together.  The extra work
+    // is really just limited to stepping over all the terms in the id field.
+    for (;;) {
+      BytesRef term = termsEnum.next();
 
-  // returns the list of recommended splits, or null if there is not enough information
+      // compare to current prefix bucket and see if this new term shares the same prefix
+      if (term != null && term.length >= currPrefix.length && currPrefix.length > 0) {
+        int i = 0;
+        for (; i < currPrefix.length; i++) {
+          if (currPrefix.bytes[i] != term.bytes[term.offset + i]) {
+            break;
+          }
+        }
+
+        if (i == currPrefix.length) {
+          // prefix was the same (common-case fast path)
+          // int count = termsEnum.docFreq();
+          bucketCount++;  // use 1 since we are dealing with unique ids
+          continue;
+        }
+      }
+
+      // At this point the prefix did not match, so if we had a bucket we were working on, record it.
+      if (currPrefix.length > 0) {
+        numPrefixes++;
+        sumBuckets += bucketCount;
+        String currPrefixStr = currPrefix.utf8ToString();
+        DocRouter.Range range = router.getSearchRangeSingle(currPrefixStr, null, collection);
+
+        RangeCount rangeCount = new RangeCount(range, bucketCount);
+        bucketCount = 0;
+
+        RangeCount prev = counts.put(rangeCount.range, rangeCount);
+        if (prev != null) {
+          // we hit a hash collision, so add the buckets together.
+          rangeCount.count += prev.count;
+          numCollisions++;
+        }
+      }
+
+      // if the current term is null, we ran out of values
+      if (term == null) break;
+
+      // find the new prefix (if any)
+
+      // resize if needed
+      if (currPrefix.length < term.length) {
+        currPrefix.bytes = new byte[term.length+10];
+      }
+
+      // Copy the bytes up to and including the separator, and set the length if the separator is found.
+      // If there was no separator, then length remains 0 and it's the indicator that we have no prefix bucket
+      currPrefix.length = 0;
+      for (int i=0; i<term.length; i++) {
+        byte b = term.bytes[i + term.offset];
+        currPrefix.bytes[i] = b;
+        if (b == sep) {
+          currPrefix.length = i + 1;
+          bucketCount++;
+          break;
+        }
+      }
+    }
+
+    log.info("Split histogram from idField {}: ms={}, numBuckets={} sumBuckets={} numPrefixes={}numCollisions={}", idField, timer.getTime(), counts.size(), sumBuckets, numPrefixes, numCollisions);
+
+    return counts.values();
+  }
+
+  /*
+   * Returns the list of recommended splits, or null if there is not enough information
+   */
   static Collection<DocRouter.Range> getSplits(Collection<RangeCount> rawCounts, DocRouter.Range currentRange) throws Exception {
     int totalCount = 0;
     RangeCount biggest = null; // keep track of the largest in case we need to split it out into it's own shard
@@ -371,6 +479,9 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
 
     if (counts.size() == 1) {
       // We have a single range, so we should split it.
+      // Currently, we only split a prefix/bucket when we have just one, but this could be changed/controlled
+      // in the future via a allowedSizeDifference parameter (i.e. if just separating prefix buckets results in
+      // too large of an imbalanced, allow splitting within a prefix)
 
       // It may already be a partial range, so figure that out
       int lower = Math.max(last.range.min, currentRange.min);
@@ -392,6 +503,7 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
 
     // We have at least two ranges, so we want to partition the ranges
     // and avoid splitting any individual range.
+    // The "middle" bucket we are going to find will be included with the lower range and excluded from the upper range.
 
     int targetCount = totalCount / 2;
     RangeCount middle = null;
@@ -413,10 +525,10 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
       middle = prev;
     }
 
-    // if the middle range turned out to be the last one, pick the one before it instead
-    if (middle == last) {
-      middle = prev;
-    }
+    // The middle should never be the last, since that means that we won't actually do a split.
+    // Minimising the error (above) should already ensure this never happens.
+    assert middle != last;
+
 
     // Make sure to include the shard's current range in the new ranges so we don't create useless empty shards.
     DocRouter.Range lowerRange = new DocRouter.Range(currentRange.min, middle.range.max);
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/SplitByPrefixTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/SplitByPrefixTest.java
index f3ef230..ca2aefc 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/SplitByPrefixTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/SplitByPrefixTest.java
@@ -53,8 +53,12 @@ public class SplitByPrefixTest extends SolrCloudTestCase {
   public static void setupCluster() throws Exception {
     System.setProperty("managed.schema.mutable", "true");  // needed by cloud-managed config set
 
+    // clould-managed has the copyField from ID to id_prefix
+    // cloud-minimal does not and thus histogram should be driven from the "id" field directly
+    String configSetName = random().nextBoolean() ? "cloud-minimal" : "cloud-managed";
+
     configureCluster(1)
-        .addConfig("conf", configset("cloud-managed"))  // cloud-managed has the id copyfield to id_prefix
+        .addConfig("conf", configset(configSetName))  // cloud-managed has the id copyfield to id_prefix
         .configure();
   }
 
@@ -71,9 +75,9 @@ public class SplitByPrefixTest extends SolrCloudTestCase {
     cluster.deleteAllCollections();
   }
 
-  static class Prefix implements Comparable<Prefix> {
-    String key;
-    DocRouter.Range range;
+  public static class Prefix implements Comparable<Prefix> {
+    public String key;
+    public DocRouter.Range range;
 
     @Override
     public int compareTo(Prefix o) {
@@ -87,7 +91,7 @@ public class SplitByPrefixTest extends SolrCloudTestCase {
   }
 
   // find prefixes (shard keys) matching certain criteria
-  public List<Prefix> findPrefixes(int numToFind, int lowerBound, int upperBound) {
+  public static List<Prefix> findPrefixes(int numToFind, int lowerBound, int upperBound) {
     CompositeIdRouter router = new CompositeIdRouter();
 
     ArrayList<Prefix> prefixes = new ArrayList<>();
@@ -112,7 +116,7 @@ public class SplitByPrefixTest extends SolrCloudTestCase {
   }
 
   // remove duplicate prefixes from the sorted prefix list
-  List<Prefix> removeDups(List<Prefix> prefixes) {
+  public static List<Prefix> removeDups(List<Prefix> prefixes) {
     ArrayList<Prefix> result = new ArrayList<>();
     Prefix last = null;
     for (Prefix prefix : prefixes) {
@@ -198,7 +202,7 @@ public class SplitByPrefixTest extends SolrCloudTestCase {
 
 
     //
-    // now lets add enough documents to the first prefix to get it split out on it's own
+    // now lets add enough documents to the first prefix to get it split out on its own
     //
     for (int i=0; i<uniquePrefixes.size(); i++) {
       client.add(  getDoc(uniquePrefixes.get(0).key, "doc"+(i+100)));
diff --git a/solr/core/src/test/org/apache/solr/handler/admin/SplitHandlerTest.java b/solr/core/src/test/org/apache/solr/handler/admin/SplitHandlerTest.java
index e720af8..02174b7 100644
--- a/solr/core/src/test/org/apache/solr/handler/admin/SplitHandlerTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/admin/SplitHandlerTest.java
@@ -18,11 +18,17 @@ package org.apache.solr.handler.admin;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
 
 import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.cloud.api.collections.SplitByPrefixTest;
+import org.apache.solr.cloud.api.collections.SplitByPrefixTest.Prefix;
+import org.apache.solr.common.cloud.CompositeIdRouter;
 import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.request.SolrQueryRequest;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 // test low level splitByPrefix range recommendations.
@@ -30,6 +36,11 @@ import org.junit.Test;
 // See SplitByPrefixTest for cloud level tests of SPLITSHARD that use this by passing getRanges with the SPLIT command
 public class SplitHandlerTest extends SolrTestCaseJ4 {
 
+  @BeforeClass
+  public static void beforeTests() throws Exception {
+    System.setProperty("managed.schema.mutable", "true");  // needed by cloud-managed config set
+    initCore("solrconfig.xml","schema_latest.xml");
+  }
 
   void verifyContiguous(Collection<DocRouter.Range> results, DocRouter.Range currentRange) {
     if (results == null) return;
@@ -215,4 +226,63 @@ public class SplitHandlerTest extends SolrTestCaseJ4 {
     verifyContiguous(results, curr);
   }
 
+  @Test
+  public void testHistoramBuilding() throws Exception {
+    List<Prefix> prefixes = SplitByPrefixTest.findPrefixes(20, 0, 0x00ffffff);
+    List<Prefix> uniquePrefixes = SplitByPrefixTest.removeDups(prefixes);
+    assertTrue(prefixes.size() > uniquePrefixes.size());  // make sure we have some duplicates to test hash collisions
+
+    String prefixField = "id_prefix_s";
+    String idField = "id";
+    DocRouter router = new CompositeIdRouter();
+
+
+    for (int i=0; i<100; i++) {
+      SolrQueryRequest req = req("myquery");
+      try {
+        // the first time through the loop we do this before adding docs to test an empty index
+        Collection<SplitOp.RangeCount> counts1 = SplitOp.getHashHistogram(req.getSearcher(), prefixField, router, null);
+        Collection<SplitOp.RangeCount> counts2 = SplitOp.getHashHistogramFromId(req.getSearcher(), idField, router, null);
+        assertTrue(eqCount(counts1, counts2));
+
+        if (i>0) {
+          assertTrue(counts1.size() > 0);  // make sure we are testing something
+        }
+
+
+        // index a few random documents
+        int ndocs = random().nextInt(10) + 1;
+        for (int j=0; j<ndocs; j++) {
+          String prefix = prefixes.get( random().nextInt(prefixes.size()) ).key;
+          if (random().nextBoolean()) {
+            prefix = prefix + Integer.toString(random().nextInt(3)) + "!";
+          }
+          String id = prefix + "doc" + i + "_" + j;
+          updateJ(jsonAdd(sdoc(idField, id, prefixField, prefix)), null);
+        }
+
+        assertU(commit());
+
+
+      } finally {
+        req.close();
+      }
+
+    }
+
+  }
+
+  private boolean eqCount(Collection<SplitOp.RangeCount> a, Collection<SplitOp.RangeCount> b) {
+    Iterator<SplitOp.RangeCount> it1 = a.iterator();
+    Iterator<SplitOp.RangeCount> it2 = b.iterator();
+    while (it1.hasNext()) {
+      SplitOp.RangeCount r1 = it1.next();
+      SplitOp.RangeCount r2 = it2.next();
+      if (!r1.range.equals(r2.range) || r1.count != r2.count) {
+        return false;
+      }
+    }
+    return true;
+  }
+
 }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java b/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java
index 974b9eb..d700464 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java
@@ -79,15 +79,14 @@ public class CompositeIdRouter extends HashBasedRouter {
       // search across whole range
       return fullRange();
     }
-    String id = shardKey;
 
     if (shardKey.indexOf(SEPARATOR) < 0) {
       // shardKey is a simple id, so don't do a range
-      int hash = Hash.murmurhash3_x86_32(id, 0, id.length(), 0);
+      int hash = Hash.murmurhash3_x86_32(shardKey, 0, shardKey.length(), 0);
       return new Range(hash, hash);
     }
 
-    return new KeyParser(id).getRange();
+    return new KeyParser(shardKey).getRange();
   }
 
   @Override