You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2018/09/06 14:36:04 UTC

[7/7] lucene-solr:jira/solr-12709: SOLR-12709: Modify bulk updates algorithm.

SOLR-12709: Modify bulk updates algorithm.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/05189103
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/05189103
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/05189103

Branch: refs/heads/jira/solr-12709
Commit: 051891036755aa853f3dcb2146c8a73429ace27b
Parents: 59cce0f
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Thu Sep 6 16:34:56 2018 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Thu Sep 6 16:34:56 2018 +0200

----------------------------------------------------------------------
 .../java/org/apache/solr/cloud/CloudUtil.java   |   4 +-
 .../cloud/api/collections/AddReplicaCmd.java    |   2 +-
 .../solr/cloud/api/collections/Assign.java      |   3 +-
 .../cloud/api/collections/CreateShardCmd.java   |   2 +-
 .../cloud/autoscaling/ComputePlanAction.java    |   2 +-
 .../cloud/autoscaling/IndexSizeTrigger.java     |  11 +-
 .../cloud/autoscaling/IndexSizeTriggerTest.java |   4 +-
 .../cloud/autoscaling/sim/SimCloudManager.java  |  38 ++++--
 .../sim/SimClusterStateProvider.java            | 130 ++++++++++++-------
 .../autoscaling/sim/SimDistribStateManager.java |   7 +
 .../autoscaling/sim/TestSimAutoScaling.java     |  11 +-
 .../client/solrj/cloud/autoscaling/Policy.java  |  18 ++-
 .../solrj/cloud/autoscaling/PolicyHelper.java   |   4 +-
 13 files changed, 158 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java b/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java
index 13734f6..55231f8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java
@@ -144,9 +144,9 @@ public class CloudUtil {
 
   }
 
-  public static boolean usePolicyFramework(DocCollection collection, SolrCloudManager cloudManager)
+  public static boolean usePolicyFramework(String collection, SolrCloudManager cloudManager)
       throws IOException, InterruptedException {
     AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
-    return !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty() || collection.getPolicyName() != null;
+    return !autoScalingConfig.getPolicy().isEmpty();
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
index c9dbaec..1a46da3 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
@@ -258,7 +258,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
 
     // Kind of unnecessary, but it does put the logic of whether to override maxShardsPerNode in one place.
     if (!skipCreateReplicaInClusterState) {
-      if (CloudUtil.usePolicyFramework(coll, cloudManager)) {
+      if (CloudUtil.usePolicyFramework(collection, cloudManager)) {
         if (node == null) {
           if(coll.getPolicyName() != null) message.getProperties().put(Policy.POLICY, coll.getPolicyName());
           node = Assign.identifyNodes(cloudManager,

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
index d323510..1998ace 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
@@ -38,6 +38,7 @@ import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
 import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
 import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
 import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
+import org.apache.solr.cloud.CloudUtil;
 import org.apache.solr.cloud.rule.ReplicaAssigner;
 import org.apache.solr.cloud.rule.Rule;
 import org.apache.solr.common.SolrException;
@@ -255,7 +256,7 @@ public class Assign {
     String policyName = message.getStr(POLICY);
     AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
 
-    if (rulesMap == null && policyName == null && autoScalingConfig.getPolicy().getClusterPolicy().isEmpty()) {
+    if (rulesMap == null && !CloudUtil.usePolicyFramework(collectionName, cloudManager)) {
       log.debug("Identify nodes using default");
       int i = 0;
       List<ReplicaPosition> result = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
index 802583c..59c0c9a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
@@ -155,7 +155,7 @@ public class CreateShardCmd implements OverseerCollectionMessageHandler.Cmd {
 
     Object createNodeSetStr = message.get(OverseerCollectionMessageHandler.CREATE_NODE_SET);
 
-    boolean usePolicyFramework = CloudUtil.usePolicyFramework(collection, cloudManager);
+    boolean usePolicyFramework = CloudUtil.usePolicyFramework(collectionName, cloudManager);
     List<ReplicaPosition> positions;
     if (usePolicyFramework) {
       if (collection.getPolicyName() != null) message.getProperties().put(Policy.POLICY, collection.getPolicyName());

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
index 36ef524..16a0d97 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
@@ -134,7 +134,7 @@ public class ComputePlanAction extends TriggerActionBase {
               continue;
             }
           }
-          log.info("Computed Plan: {}", operation.getParams());
+          log.debug("Computed Plan: {}", operation.getParams());
           if (!collections.isEmpty()) {
             String coll = operation.getParams().get(CoreAdminParams.COLLECTION);
             if (coll != null && !collections.contains(coll)) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java
index 3f2ea8a..967582c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java
@@ -431,8 +431,15 @@ public class IndexSizeTrigger extends TriggerBase {
                           Map<String, List<ReplicaInfo>> belowSize) {
       super(TriggerEventType.INDEXSIZE, source, eventTime, null);
       properties.put(TriggerEvent.REQUESTED_OPS, ops);
-      properties.put(ABOVE_SIZE_PROP, aboveSize);
-      properties.put(BELOW_SIZE_PROP, belowSize);
+      // avoid passing very large amounts of data here - just use replica names
+      Set<String> above = new HashSet<>();
+      aboveSize.forEach((coll, replicas) ->
+          replicas.forEach(r -> above.add(r.getCore())));
+      properties.put(ABOVE_SIZE_PROP, above);
+      Set<String> below = new HashSet<>();
+      belowSize.forEach((coll, replicas) ->
+          replicas.forEach(r -> below.add(r.getCore())));
+      properties.put(BELOW_SIZE_PROP, below);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java
index 0b4cffc..3226cb0 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java
@@ -361,7 +361,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
     CloudTestUtils.waitForState(cloudManager, "failed to create " + collectionName, collectionName,
         CloudTestUtils.clusterShape(2, 2, false, true));
 
-    for (int i = 0; i < 10; i++) {
+    for (int i = 0; i < 20; i++) {
       SolrInputDocument doc = new SolrInputDocument("id", "id-" + (i * 100));
       solrClient.add(collectionName, doc);
     }
@@ -412,7 +412,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
     assertEquals(response.get("result").toString(), "success");
 
     // delete some docs to trigger a merge
-    for (int i = 0; i < 5; i++) {
+    for (int i = 0; i < 15; i++) {
       solrClient.deleteById(collectionName, "id-" + (i * 100));
     }
     solrClient.commit(collectionName);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
index 51e3db4..365c488 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
@@ -31,6 +31,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -51,6 +52,7 @@ import org.apache.solr.client.solrj.cloud.DistribStateManager;
 import org.apache.solr.client.solrj.cloud.NodeStateProvider;
 import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
+import org.apache.solr.client.solrj.cloud.autoscaling.Variable;
 import org.apache.solr.client.solrj.impl.ClusterStateProvider;
 import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@@ -119,6 +121,7 @@ public class SimCloudManager implements SolrCloudManager {
   private final String metricTag;
 
   private final List<SolrInputDocument> systemColl = Collections.synchronizedList(new ArrayList<>());
+  private final Map<String, Map<String, AtomicInteger>> eventCounts = new ConcurrentHashMap<>();
   private final MockSearchableSolrClient solrClient;
   private final Map<String, AtomicLong> opCounts = new ConcurrentSkipListMap<>();
 
@@ -130,9 +133,11 @@ public class SimCloudManager implements SolrCloudManager {
   private MetricsHandler metricsHandler;
   private MetricsHistoryHandler metricsHistoryHandler;
   private TimeSource timeSource;
+  private boolean useSystemCollection = true;
 
   private static int nodeIdPort = 10000;
-  public static int DEFAULT_DISK = 1024; // 1000 GiB
+  public static int DEFAULT_FREE_DISK = 1024; // 1000 GiB
+  public static int DEFAULT_TOTAL_DISK = 10240; // 10 TiB
   public static long DEFAULT_IDX_SIZE_BYTES = 10240; // 10 kiB
 
   /**
@@ -307,7 +312,8 @@ public class SimCloudManager implements SolrCloudManager {
     values.put(ImplicitSnitch.PORT, port);
     values.put(ImplicitSnitch.NODE, nodeId);
     values.put(ImplicitSnitch.CORES, 0);
-    values.put(ImplicitSnitch.DISK, DEFAULT_DISK);
+    values.put(ImplicitSnitch.DISK, DEFAULT_FREE_DISK);
+    values.put(Variable.Type.TOTALDISK.tagName, DEFAULT_TOTAL_DISK);
     values.put(ImplicitSnitch.SYSLOADAVG, 1.0);
     values.put(ImplicitSnitch.HEAPUSAGE, 123450000);
     values.put("sysprop.java.version", System.getProperty("java.version"));
@@ -456,6 +462,10 @@ public class SimCloudManager implements SolrCloudManager {
     }
   }
 
+  public void simSetUseSystemCollection(boolean useSystemCollection) {
+    this.useSystemCollection = useSystemCollection;
+  }
+
   /**
    * Clear the (simulated) .system collection.
    */
@@ -472,17 +482,7 @@ public class SimCloudManager implements SolrCloudManager {
   }
 
   public Map<String, Map<String, AtomicInteger>> simGetEventCounts() {
-    TreeMap<String, Map<String, AtomicInteger>> counts = new TreeMap<>();
-    synchronized (systemColl) {
-      for (SolrInputDocument d : systemColl) {
-        if (!"autoscaling_event".equals(d.getFieldValue("type"))) {
-          continue;
-        }
-        counts.computeIfAbsent((String)d.getFieldValue("event.source_s"), s -> new TreeMap<>())
-            .computeIfAbsent((String)d.getFieldValue("stage_s"), s -> new AtomicInteger())
-            .incrementAndGet();
-      }
-    }
+    TreeMap<String, Map<String, AtomicInteger>> counts = new TreeMap<>(eventCounts);
     return counts;
   }
 
@@ -723,7 +723,17 @@ public class SimCloudManager implements SolrCloudManager {
       if (collection == null || collection.equals(CollectionAdminParams.SYSTEM_COLL)) {
         List<SolrInputDocument> docs = ureq.getDocuments();
         if (docs != null) {
-          systemColl.addAll(docs);
+          if (useSystemCollection) {
+            systemColl.addAll(docs);
+          }
+          for (SolrInputDocument d : docs) {
+            if (!"autoscaling_event".equals(d.getFieldValue("type"))) {
+              continue;
+            }
+            eventCounts.computeIfAbsent((String)d.getFieldValue("event.source_s"), s -> new TreeMap<>())
+                .computeIfAbsent((String)d.getFieldValue("stage_s"), s -> new AtomicInteger())
+                .incrementAndGet();
+          }
         }
         return new UpdateResponse();
       } else {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
index e6bebec..3f31031 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
@@ -149,6 +149,8 @@ public class SimClusterStateProvider implements ClusterStateProvider {
   private AtomicReference<Map<String, DocCollection>> collectionsStatesRef = new AtomicReference<>();
   private AtomicBoolean saveClusterState = new AtomicBoolean();
 
+  private Random bulkUpdateRandom = new Random(0);
+
   private transient boolean closed;
 
   /**
@@ -515,7 +517,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
       cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.CORES, cores + 1);
       Integer disk = (Integer)values.get(ImplicitSnitch.DISK);
       if (disk == null) {
-        disk = SimCloudManager.DEFAULT_DISK;
+        disk = SimCloudManager.DEFAULT_FREE_DISK;
       }
       cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.DISK, disk - 1);
       // fake metrics
@@ -1186,7 +1188,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
     // delay it once again to better simulate replica recoveries
     //opDelay(collectionName, CollectionParams.CollectionAction.SPLITSHARD.name());
 
-    CloudTestUtils.waitForState(cloudManager, collectionName, 20, TimeUnit.SECONDS, (liveNodes, state) -> {
+    CloudTestUtils.waitForState(cloudManager, collectionName, 30, TimeUnit.SECONDS, (liveNodes, state) -> {
       for (String subSlice : subSlices) {
         Slice s = state.getSlice(subSlice);
         if (s.getLeader() == null) {
@@ -1349,7 +1351,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
     if (deletes != null && !deletes.isEmpty()) {
       for (String id : deletes) {
         Slice s = router.getTargetSlice(id, null, null, req.getParams(), coll);
-        // NOTE: we don't use getProperty because it uses PROPERTY_PROP_PREFIX
         Replica leader = s.getLeader();
         if (leader == null) {
           log.debug("-- no leader in " + s);
@@ -1428,64 +1429,105 @@ public class SimClusterStateProvider implements ClusterStateProvider {
       }
     }
     List<SolrInputDocument> docs = req.getDocuments();
-    Iterator<SolrInputDocument> it;
+    int docCount = 0;
+    Iterator<SolrInputDocument> it = null;
     if (docs != null) {
-      it = docs.iterator();
+      docCount = docs.size();
     } else {
       it = req.getDocIterator();
+      if (it != null) {
+        while (it.hasNext()) {
+          docCount++;
+        }
+      }
     }
-    if (it != null) {
+    if (docCount > 0) {
       // this approach to updating counters and metrics drastically increases performance
       // of bulk updates, because simSetShardValue is relatively costly
 
-      // also, skip the hash-based selection of slices in favor of a simple random
-      // start + round-robin assignment, because we don't keep individual id-s anyway
       Map<String, AtomicLong> docUpdates = new HashMap<>();
       Map<String, Map<String, AtomicLong>> metricUpdates = new HashMap<>();
+
+      // XXX don't add more than 2bln docs in one request
+      boolean modified = false;
       Slice[] slices = coll.getActiveSlicesArr();
       if (slices.length == 0) {
-        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Update sent to a collection without slices: " + coll);
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection without slices");
       }
-      // TODO: we don't use DocRouter so we should verify that active slices cover the whole hash range
-
-      long docCount = 0;
-      long[] perSlice = new long[slices.length];
-      while (it.hasNext()) {
-        SolrInputDocument doc = it.next();
-        String id = (String) doc.getFieldValue("id");
-        if (id == null) {
-          throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Document without id: " + doc);
+      int[] perSlice = new int[slices.length];
+
+      if (it != null) {
+        // BULK UPDATE: simulate random doc assignment without actually calling DocRouter,
+        // which adds significant overhead
+
+        int totalAdded = 0;
+        for (int i = 0; i < slices.length; i++) {
+          Slice s = slices[i];
+          long count = (long) docCount * ((long) s.getRange().max - (long) s.getRange().min) / 0x100000000L;
+          perSlice[i] = (int) count;
+          totalAdded += perSlice[i];
         }
-        docCount++;
-      }
-      int initialSlice = cloudManager.getRandom().nextInt(slices.length);
-      for (int i = 0; i < slices.length; i++) {
-        long addDocs = perSlice;
-        if (i == 0) {
-          addDocs += remainder;
-        }
-        int sliceNum = (initialSlice + i) % slices.length;
-        Slice s = slices[sliceNum];
-        if (s.getState() != Slice.State.ACTIVE) {
-          log.debug("-- slice not active: {}", s);
+        // loss of precision due to integer math
+        int diff = docCount - totalAdded;
+        if (diff > 0) {
+          // spread the remainder more or less equally
+          int perRemain = diff / slices.length;
+          int remainder = diff % slices.length;
+          int remainderSlice = slices.length > 1 ? bulkUpdateRandom.nextInt(slices.length) : 0;
+          for (int i = 0; i < slices.length; i++) {
+            perSlice[i] += perRemain;
+            if (i == remainderSlice) {
+              perSlice[i] += remainder;
+            }
+          }
         }
-        Replica leader = s.getLeader();
-        if (leader == null) {
-          log.debug("-- no leader in " + s);
-          continue;
+        for (int i = 0; i < slices.length; i++) {
+          Slice s = slices[i];
+          Replica leader = s.getLeader();
+          if (leader == null) {
+            log.debug("-- no leader in " + s);
+            continue;
+          }
+          metricUpdates.computeIfAbsent(s.getName(), sh -> new HashMap<>())
+              .computeIfAbsent(leader.getCoreName(), cn -> new AtomicLong())
+              .addAndGet(perSlice[i]);
+          modified = true;
+          AtomicLong bufferedUpdates = (AtomicLong)s.getProperties().get(BUFFERED_UPDATES);
+          if (bufferedUpdates != null) {
+            bufferedUpdates.addAndGet(perSlice[i]);
+            continue;
+          }
+          docUpdates.computeIfAbsent(s.getName(), sh -> new AtomicLong())
+              .addAndGet(perSlice[i]);
         }
-        metricUpdates.computeIfAbsent(s.getName(), sh -> new HashMap<>())
-            .computeIfAbsent(leader.getCoreName(), cn -> new AtomicLong())
-            .addAndGet(addDocs);
-        AtomicLong bufferedUpdates = (AtomicLong)s.getProperties().get(BUFFERED_UPDATES);
-        if (bufferedUpdates != null) {
-          bufferedUpdates.addAndGet(addDocs);
-          continue;
+      } else {
+        // SMALL UPDATE: use exact assignment via DocRouter
+        for (SolrInputDocument doc : docs) {
+          String id = (String) doc.getFieldValue("id");
+          if (id == null) {
+            throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Document without id: " + doc);
+          }
+          Slice s = coll.getRouter().getTargetSlice(id, doc, null, null, coll);
+          Replica leader = s.getLeader();
+          if (leader == null) {
+            log.debug("-- no leader in " + s);
+            continue;
+          }
+          metricUpdates.computeIfAbsent(s.getName(), sh -> new HashMap<>())
+              .computeIfAbsent(leader.getCoreName(), cn -> new AtomicLong())
+              .incrementAndGet();
+          modified = true;
+          AtomicLong bufferedUpdates = (AtomicLong)s.getProperties().get(BUFFERED_UPDATES);
+          if (bufferedUpdates != null) {
+            bufferedUpdates.incrementAndGet();
+            continue;
+          }
+          docUpdates.computeIfAbsent(s.getName(), sh -> new AtomicLong())
+              .incrementAndGet();
         }
-        docUpdates.computeIfAbsent(s.getName(), sh -> new AtomicLong())
-            .addAndGet(addDocs);
       }
-      if (docCount > 0) {
+
+      if (modified) {
         lock.lockInterruptibly();
         try {
           docUpdates.forEach((sh, count) -> {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java
index 1e99ff2..2b8940a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java
@@ -214,6 +214,8 @@ public class SimDistribStateManager implements DistribStateManager {
   private final String id;
   private final Node root;
 
+  private int juteMaxbuffer = 0xfffff;
+
   public SimDistribStateManager() {
     this(null);
   }
@@ -226,6 +228,8 @@ public class SimDistribStateManager implements DistribStateManager {
     this.id = IdUtils.timeRandomId();
     this.root = root != null ? root : createNewRootNode();
     watchersPool = ExecutorUtil.newMDCAwareFixedThreadPool(10, new DefaultSolrThreadFactory("sim-watchers"));
+    String bufferSize = System.getProperty("jute.maxbuffer", Integer.toString(0xffffff));
+    juteMaxbuffer = Integer.parseInt(bufferSize);
   }
 
   public SimDistribStateManager(ActionThrottle actionThrottle, ActionError actionError) {
@@ -493,6 +497,9 @@ public class SimDistribStateManager implements DistribStateManager {
 
   @Override
   public void setData(String path, byte[] data, int version) throws NoSuchElementException, BadVersionException, IOException {
+    if (data.length > juteMaxbuffer) {
+      throw new IOException("Len error " + data.length);
+    }
     multiLock.lock();
     Node n = null;
     try {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimAutoScaling.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimAutoScaling.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimAutoScaling.java
index d6bf3ce..369ebc8 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimAutoScaling.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimAutoScaling.java
@@ -32,11 +32,11 @@ public class TestSimAutoScaling extends SimSolrCloudTestCase {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private static final int SPEED = 50;
-  private static final int NUM_NODES = 10;
+  private static final int NUM_NODES = 50;
 
-  private static final long BATCH_SIZE = 200000;
-  private static final long NUM_BATCHES = 1000;
-  private static final long ABOVE_SIZE = 300000;
+  private static final long BATCH_SIZE = 8000000;
+  private static final long NUM_BATCHES = 100000;
+  private static final long ABOVE_SIZE = 2000000;
 
 
   private static TimeSource timeSource;
@@ -47,6 +47,7 @@ public class TestSimAutoScaling extends SimSolrCloudTestCase {
     configureCluster(NUM_NODES, TimeSource.get("simTime:" + SPEED));
     timeSource = cluster.getTimeSource();
     solrClient = cluster.simGetSolrClient();
+    cluster.simSetUseSystemCollection(false);
   }
 
   @Test
@@ -104,8 +105,8 @@ public class TestSimAutoScaling extends SimSolrCloudTestCase {
       this.count = count;
       current = start;
       max = start + count;
-      idField.setValue("foo");
       doc.put("id", idField);
+      idField.setValue("foo");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
index 4a2b880..711b4c3 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
@@ -94,6 +94,7 @@ public class Policy implements MapWriter {
   final List<Pair<String, Type>> params;
   final List<String> perReplicaAttributes;
   final int zkVersion;
+  final boolean empty;
 
   public Policy() {
     this(Collections.emptyMap());
@@ -104,6 +105,7 @@ public class Policy implements MapWriter {
   }
   @SuppressWarnings("unchecked")
   public Policy(Map<String, Object> jsonMap, int version) {
+    this.empty = jsonMap.get(CLUSTER_PREFERENCES) == null && jsonMap.get(CLUSTER_POLICY) == null && jsonMap.get(POLICIES) == null;
     this.zkVersion = version;
     int[] idx = new int[1];
     List<Preference> initialClusterPreferences = ((List<Map<String, Object>>) jsonMap.getOrDefault(CLUSTER_PREFERENCES, emptyList())).stream()
@@ -156,6 +158,7 @@ public class Policy implements MapWriter {
   }
 
   private Policy(Map<String, List<Clause>> policies, List<Clause> clusterPolicy, List<Preference> clusterPreferences, int version) {
+    this.empty = policies == null && clusterPolicy == null && clusterPreferences == null;
     this.zkVersion = version;
     this.policies = policies != null ? Collections.unmodifiableMap(policies) : Collections.emptyMap();
     this.clusterPolicy = clusterPolicy != null ? Collections.unmodifiableList(clusterPolicy) : Collections.emptyList();
@@ -281,11 +284,16 @@ public class Policy implements MapWriter {
             return p.compare(r1, r2, false);
           });
         } catch (Exception e) {
+//          log.error("Exception! prefs = {}, recent r1 = {}, r2 = {}, matrix = {}",
+//              clusterPreferences,
+//              lastComparison[0],
+//              lastComparison[1],
+//              Utils.toJSONString(Utils.getDeepCopy(tmpMatrix, 6, false)));
           log.error("Exception! prefs = {}, recent r1 = {}, r2 = {}, matrix = {}",
               clusterPreferences,
-              lastComparison[0],
-              lastComparison[1],
-              Utils.toJSONString(Utils.getDeepCopy(tmpMatrix, 6, false)));
+              lastComparison[0].node,
+              lastComparison[1].node,
+              matrix.size());
           throw e;
         }
         p.setApproxVal(tmpMatrix);
@@ -461,6 +469,10 @@ public class Policy implements MapWriter {
     return Utils.toJSONString(this);
   }
 
+  public boolean isEmpty() {
+    return empty;
+  }
+
   /*This stores the logical state of the system, given a policy and
    * a cluster state.
    *

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
index d052d6f..33fb78d 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
@@ -312,7 +312,7 @@ public class PolicyHelper {
       TimeSource timeSource = sessionWrapper.session != null ? sessionWrapper.session.cloudManager.getTimeSource() : TimeSource.NANO_TIME;
       synchronized (lockObj) {
         sessionWrapper.status = Status.EXECUTING;
-        log.info("returnSession, curr-time {} sessionWrapper.createTime {}, this.sessionWrapper.createTime {} ", time(timeSource, MILLISECONDS),
+        log.debug("returnSession, curr-time {} sessionWrapper.createTime {}, this.sessionWrapper.createTime {} ", time(timeSource, MILLISECONDS),
             sessionWrapper.createTime,
             this.sessionWrapper.createTime);
         if (sessionWrapper.createTime == this.sessionWrapper.createTime) {
@@ -323,7 +323,7 @@ public class PolicyHelper {
           //one thread who is waiting for this need to be notified.
           lockObj.notify();
         } else {
-          log.info("create time NOT SAME {} ", SessionWrapper.DEFAULT_INSTANCE.createTime);
+          log.debug("create time NOT SAME {} ", SessionWrapper.DEFAULT_INSTANCE.createTime);
           //else just ignore it
         }
       }