You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2018/09/06 14:36:04 UTC
[7/7] lucene-solr:jira/solr-12709: SOLR-12709: Modify bulk updates
algorithm.
SOLR-12709: Modify bulk updates algorithm.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/05189103
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/05189103
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/05189103
Branch: refs/heads/jira/solr-12709
Commit: 051891036755aa853f3dcb2146c8a73429ace27b
Parents: 59cce0f
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Thu Sep 6 16:34:56 2018 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Thu Sep 6 16:34:56 2018 +0200
----------------------------------------------------------------------
.../java/org/apache/solr/cloud/CloudUtil.java | 4 +-
.../cloud/api/collections/AddReplicaCmd.java | 2 +-
.../solr/cloud/api/collections/Assign.java | 3 +-
.../cloud/api/collections/CreateShardCmd.java | 2 +-
.../cloud/autoscaling/ComputePlanAction.java | 2 +-
.../cloud/autoscaling/IndexSizeTrigger.java | 11 +-
.../cloud/autoscaling/IndexSizeTriggerTest.java | 4 +-
.../cloud/autoscaling/sim/SimCloudManager.java | 38 ++++--
.../sim/SimClusterStateProvider.java | 130 ++++++++++++-------
.../autoscaling/sim/SimDistribStateManager.java | 7 +
.../autoscaling/sim/TestSimAutoScaling.java | 11 +-
.../client/solrj/cloud/autoscaling/Policy.java | 18 ++-
.../solrj/cloud/autoscaling/PolicyHelper.java | 4 +-
13 files changed, 158 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java b/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java
index 13734f6..55231f8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java
@@ -144,9 +144,9 @@ public class CloudUtil {
}
- public static boolean usePolicyFramework(DocCollection collection, SolrCloudManager cloudManager)
+ public static boolean usePolicyFramework(String collection, SolrCloudManager cloudManager)
throws IOException, InterruptedException {
AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
- return !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty() || collection.getPolicyName() != null;
+ return !autoScalingConfig.getPolicy().isEmpty();
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
index c9dbaec..1a46da3 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
@@ -258,7 +258,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
// Kind of unnecessary, but it does put the logic of whether to override maxShardsPerNode in one place.
if (!skipCreateReplicaInClusterState) {
- if (CloudUtil.usePolicyFramework(coll, cloudManager)) {
+ if (CloudUtil.usePolicyFramework(collection, cloudManager)) {
if (node == null) {
if(coll.getPolicyName() != null) message.getProperties().put(Policy.POLICY, coll.getPolicyName());
node = Assign.identifyNodes(cloudManager,
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
index d323510..1998ace 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
@@ -38,6 +38,7 @@ import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
+import org.apache.solr.cloud.CloudUtil;
import org.apache.solr.cloud.rule.ReplicaAssigner;
import org.apache.solr.cloud.rule.Rule;
import org.apache.solr.common.SolrException;
@@ -255,7 +256,7 @@ public class Assign {
String policyName = message.getStr(POLICY);
AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
- if (rulesMap == null && policyName == null && autoScalingConfig.getPolicy().getClusterPolicy().isEmpty()) {
+ if (rulesMap == null && !CloudUtil.usePolicyFramework(collectionName, cloudManager)) {
log.debug("Identify nodes using default");
int i = 0;
List<ReplicaPosition> result = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
index 802583c..59c0c9a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
@@ -155,7 +155,7 @@ public class CreateShardCmd implements OverseerCollectionMessageHandler.Cmd {
Object createNodeSetStr = message.get(OverseerCollectionMessageHandler.CREATE_NODE_SET);
- boolean usePolicyFramework = CloudUtil.usePolicyFramework(collection, cloudManager);
+ boolean usePolicyFramework = CloudUtil.usePolicyFramework(collectionName, cloudManager);
List<ReplicaPosition> positions;
if (usePolicyFramework) {
if (collection.getPolicyName() != null) message.getProperties().put(Policy.POLICY, collection.getPolicyName());
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
index 36ef524..16a0d97 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
@@ -134,7 +134,7 @@ public class ComputePlanAction extends TriggerActionBase {
continue;
}
}
- log.info("Computed Plan: {}", operation.getParams());
+ log.debug("Computed Plan: {}", operation.getParams());
if (!collections.isEmpty()) {
String coll = operation.getParams().get(CoreAdminParams.COLLECTION);
if (coll != null && !collections.contains(coll)) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java
index 3f2ea8a..967582c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java
@@ -431,8 +431,15 @@ public class IndexSizeTrigger extends TriggerBase {
Map<String, List<ReplicaInfo>> belowSize) {
super(TriggerEventType.INDEXSIZE, source, eventTime, null);
properties.put(TriggerEvent.REQUESTED_OPS, ops);
- properties.put(ABOVE_SIZE_PROP, aboveSize);
- properties.put(BELOW_SIZE_PROP, belowSize);
+ // avoid passing very large amounts of data here - just use replica names
+ Set<String> above = new HashSet<>();
+ aboveSize.forEach((coll, replicas) ->
+ replicas.forEach(r -> above.add(r.getCore())));
+ properties.put(ABOVE_SIZE_PROP, above);
+ Set<String> below = new HashSet<>();
+ belowSize.forEach((coll, replicas) ->
+ replicas.forEach(r -> below.add(r.getCore())));
+ properties.put(BELOW_SIZE_PROP, below);
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java
index 0b4cffc..3226cb0 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java
@@ -361,7 +361,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
CloudTestUtils.waitForState(cloudManager, "failed to create " + collectionName, collectionName,
CloudTestUtils.clusterShape(2, 2, false, true));
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < 20; i++) {
SolrInputDocument doc = new SolrInputDocument("id", "id-" + (i * 100));
solrClient.add(collectionName, doc);
}
@@ -412,7 +412,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
assertEquals(response.get("result").toString(), "success");
// delete some docs to trigger a merge
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < 15; i++) {
solrClient.deleteById(collectionName, "id-" + (i * 100));
}
solrClient.commit(collectionName);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
index 51e3db4..365c488 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
@@ -31,6 +31,7 @@ import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -51,6 +52,7 @@ import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.NodeStateProvider;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
+import org.apache.solr.client.solrj.cloud.autoscaling.Variable;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@@ -119,6 +121,7 @@ public class SimCloudManager implements SolrCloudManager {
private final String metricTag;
private final List<SolrInputDocument> systemColl = Collections.synchronizedList(new ArrayList<>());
+ private final Map<String, Map<String, AtomicInteger>> eventCounts = new ConcurrentHashMap<>();
private final MockSearchableSolrClient solrClient;
private final Map<String, AtomicLong> opCounts = new ConcurrentSkipListMap<>();
@@ -130,9 +133,11 @@ public class SimCloudManager implements SolrCloudManager {
private MetricsHandler metricsHandler;
private MetricsHistoryHandler metricsHistoryHandler;
private TimeSource timeSource;
+ private boolean useSystemCollection = true;
private static int nodeIdPort = 10000;
- public static int DEFAULT_DISK = 1024; // 1000 GiB
+ public static int DEFAULT_FREE_DISK = 1024; // 1000 GiB
+ public static int DEFAULT_TOTAL_DISK = 10240; // 10 TiB
public static long DEFAULT_IDX_SIZE_BYTES = 10240; // 10 kiB
/**
@@ -307,7 +312,8 @@ public class SimCloudManager implements SolrCloudManager {
values.put(ImplicitSnitch.PORT, port);
values.put(ImplicitSnitch.NODE, nodeId);
values.put(ImplicitSnitch.CORES, 0);
- values.put(ImplicitSnitch.DISK, DEFAULT_DISK);
+ values.put(ImplicitSnitch.DISK, DEFAULT_FREE_DISK);
+ values.put(Variable.Type.TOTALDISK.tagName, DEFAULT_TOTAL_DISK);
values.put(ImplicitSnitch.SYSLOADAVG, 1.0);
values.put(ImplicitSnitch.HEAPUSAGE, 123450000);
values.put("sysprop.java.version", System.getProperty("java.version"));
@@ -456,6 +462,10 @@ public class SimCloudManager implements SolrCloudManager {
}
}
+ public void simSetUseSystemCollection(boolean useSystemCollection) {
+ this.useSystemCollection = useSystemCollection;
+ }
+
/**
* Clear the (simulated) .system collection.
*/
@@ -472,17 +482,7 @@ public class SimCloudManager implements SolrCloudManager {
}
public Map<String, Map<String, AtomicInteger>> simGetEventCounts() {
- TreeMap<String, Map<String, AtomicInteger>> counts = new TreeMap<>();
- synchronized (systemColl) {
- for (SolrInputDocument d : systemColl) {
- if (!"autoscaling_event".equals(d.getFieldValue("type"))) {
- continue;
- }
- counts.computeIfAbsent((String)d.getFieldValue("event.source_s"), s -> new TreeMap<>())
- .computeIfAbsent((String)d.getFieldValue("stage_s"), s -> new AtomicInteger())
- .incrementAndGet();
- }
- }
+ TreeMap<String, Map<String, AtomicInteger>> counts = new TreeMap<>(eventCounts);
return counts;
}
@@ -723,7 +723,17 @@ public class SimCloudManager implements SolrCloudManager {
if (collection == null || collection.equals(CollectionAdminParams.SYSTEM_COLL)) {
List<SolrInputDocument> docs = ureq.getDocuments();
if (docs != null) {
- systemColl.addAll(docs);
+ if (useSystemCollection) {
+ systemColl.addAll(docs);
+ }
+ for (SolrInputDocument d : docs) {
+ if (!"autoscaling_event".equals(d.getFieldValue("type"))) {
+ continue;
+ }
+ eventCounts.computeIfAbsent((String)d.getFieldValue("event.source_s"), s -> new TreeMap<>())
+ .computeIfAbsent((String)d.getFieldValue("stage_s"), s -> new AtomicInteger())
+ .incrementAndGet();
+ }
}
return new UpdateResponse();
} else {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
index e6bebec..3f31031 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
@@ -149,6 +149,8 @@ public class SimClusterStateProvider implements ClusterStateProvider {
private AtomicReference<Map<String, DocCollection>> collectionsStatesRef = new AtomicReference<>();
private AtomicBoolean saveClusterState = new AtomicBoolean();
+ private Random bulkUpdateRandom = new Random(0);
+
private transient boolean closed;
/**
@@ -515,7 +517,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.CORES, cores + 1);
Integer disk = (Integer)values.get(ImplicitSnitch.DISK);
if (disk == null) {
- disk = SimCloudManager.DEFAULT_DISK;
+ disk = SimCloudManager.DEFAULT_FREE_DISK;
}
cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.DISK, disk - 1);
// fake metrics
@@ -1186,7 +1188,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
// delay it once again to better simulate replica recoveries
//opDelay(collectionName, CollectionParams.CollectionAction.SPLITSHARD.name());
- CloudTestUtils.waitForState(cloudManager, collectionName, 20, TimeUnit.SECONDS, (liveNodes, state) -> {
+ CloudTestUtils.waitForState(cloudManager, collectionName, 30, TimeUnit.SECONDS, (liveNodes, state) -> {
for (String subSlice : subSlices) {
Slice s = state.getSlice(subSlice);
if (s.getLeader() == null) {
@@ -1349,7 +1351,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
if (deletes != null && !deletes.isEmpty()) {
for (String id : deletes) {
Slice s = router.getTargetSlice(id, null, null, req.getParams(), coll);
- // NOTE: we don't use getProperty because it uses PROPERTY_PROP_PREFIX
Replica leader = s.getLeader();
if (leader == null) {
log.debug("-- no leader in " + s);
@@ -1428,64 +1429,105 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
}
List<SolrInputDocument> docs = req.getDocuments();
- Iterator<SolrInputDocument> it;
+ int docCount = 0;
+ Iterator<SolrInputDocument> it = null;
if (docs != null) {
- it = docs.iterator();
+ docCount = docs.size();
} else {
it = req.getDocIterator();
+ if (it != null) {
+ while (it.hasNext()) {
+ docCount++;
+ }
+ }
}
- if (it != null) {
+ if (docCount > 0) {
// this approach to updating counters and metrics drastically increases performance
// of bulk updates, because simSetShardValue is relatively costly
- // also, skip the hash-based selection of slices in favor of a simple random
- // start + round-robin assignment, because we don't keep individual id-s anyway
Map<String, AtomicLong> docUpdates = new HashMap<>();
Map<String, Map<String, AtomicLong>> metricUpdates = new HashMap<>();
+
+ // XXX don't add more than 2bln docs in one request
+ boolean modified = false;
Slice[] slices = coll.getActiveSlicesArr();
if (slices.length == 0) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Update sent to a collection without slices: " + coll);
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection without slices");
}
- // TODO: we don't use DocRouter so we should verify that active slices cover the whole hash range
-
- long docCount = 0;
- long[] perSlice = new long[slices.length];
- while (it.hasNext()) {
- SolrInputDocument doc = it.next();
- String id = (String) doc.getFieldValue("id");
- if (id == null) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Document without id: " + doc);
+ int[] perSlice = new int[slices.length];
+
+ if (it != null) {
+ // BULK UPDATE: simulate random doc assignment without actually calling DocRouter,
+ // which adds significant overhead
+
+ int totalAdded = 0;
+ for (int i = 0; i < slices.length; i++) {
+ Slice s = slices[i];
+ long count = (long) docCount * ((long) s.getRange().max - (long) s.getRange().min) / 0x100000000L;
+ perSlice[i] = (int) count;
+ totalAdded += perSlice[i];
}
- docCount++;
- }
- int initialSlice = cloudManager.getRandom().nextInt(slices.length);
- for (int i = 0; i < slices.length; i++) {
- long addDocs = perSlice;
- if (i == 0) {
- addDocs += remainder;
- }
- int sliceNum = (initialSlice + i) % slices.length;
- Slice s = slices[sliceNum];
- if (s.getState() != Slice.State.ACTIVE) {
- log.debug("-- slice not active: {}", s);
+ // loss of precision due to integer math
+ int diff = docCount - totalAdded;
+ if (diff > 0) {
+ // spread the remainder more or less equally
+ int perRemain = diff / slices.length;
+ int remainder = diff % slices.length;
+ int remainderSlice = slices.length > 1 ? bulkUpdateRandom.nextInt(slices.length) : 0;
+ for (int i = 0; i < slices.length; i++) {
+ perSlice[i] += perRemain;
+ if (i == remainderSlice) {
+ perSlice[i] += remainder;
+ }
+ }
}
- Replica leader = s.getLeader();
- if (leader == null) {
- log.debug("-- no leader in " + s);
- continue;
+ for (int i = 0; i < slices.length; i++) {
+ Slice s = slices[i];
+ Replica leader = s.getLeader();
+ if (leader == null) {
+ log.debug("-- no leader in " + s);
+ continue;
+ }
+ metricUpdates.computeIfAbsent(s.getName(), sh -> new HashMap<>())
+ .computeIfAbsent(leader.getCoreName(), cn -> new AtomicLong())
+ .addAndGet(perSlice[i]);
+ modified = true;
+ AtomicLong bufferedUpdates = (AtomicLong)s.getProperties().get(BUFFERED_UPDATES);
+ if (bufferedUpdates != null) {
+ bufferedUpdates.addAndGet(perSlice[i]);
+ continue;
+ }
+ docUpdates.computeIfAbsent(s.getName(), sh -> new AtomicLong())
+ .addAndGet(perSlice[i]);
}
- metricUpdates.computeIfAbsent(s.getName(), sh -> new HashMap<>())
- .computeIfAbsent(leader.getCoreName(), cn -> new AtomicLong())
- .addAndGet(addDocs);
- AtomicLong bufferedUpdates = (AtomicLong)s.getProperties().get(BUFFERED_UPDATES);
- if (bufferedUpdates != null) {
- bufferedUpdates.addAndGet(addDocs);
- continue;
+ } else {
+ // SMALL UPDATE: use exact assignment via DocRouter
+ for (SolrInputDocument doc : docs) {
+ String id = (String) doc.getFieldValue("id");
+ if (id == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Document without id: " + doc);
+ }
+ Slice s = coll.getRouter().getTargetSlice(id, doc, null, null, coll);
+ Replica leader = s.getLeader();
+ if (leader == null) {
+ log.debug("-- no leader in " + s);
+ continue;
+ }
+ metricUpdates.computeIfAbsent(s.getName(), sh -> new HashMap<>())
+ .computeIfAbsent(leader.getCoreName(), cn -> new AtomicLong())
+ .incrementAndGet();
+ modified = true;
+ AtomicLong bufferedUpdates = (AtomicLong)s.getProperties().get(BUFFERED_UPDATES);
+ if (bufferedUpdates != null) {
+ bufferedUpdates.incrementAndGet();
+ continue;
+ }
+ docUpdates.computeIfAbsent(s.getName(), sh -> new AtomicLong())
+ .incrementAndGet();
}
- docUpdates.computeIfAbsent(s.getName(), sh -> new AtomicLong())
- .addAndGet(addDocs);
}
- if (docCount > 0) {
+
+ if (modified) {
lock.lockInterruptibly();
try {
docUpdates.forEach((sh, count) -> {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java
index 1e99ff2..2b8940a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java
@@ -214,6 +214,8 @@ public class SimDistribStateManager implements DistribStateManager {
private final String id;
private final Node root;
+ private int juteMaxbuffer = 0xfffff;
+
public SimDistribStateManager() {
this(null);
}
@@ -226,6 +228,8 @@ public class SimDistribStateManager implements DistribStateManager {
this.id = IdUtils.timeRandomId();
this.root = root != null ? root : createNewRootNode();
watchersPool = ExecutorUtil.newMDCAwareFixedThreadPool(10, new DefaultSolrThreadFactory("sim-watchers"));
+ String bufferSize = System.getProperty("jute.maxbuffer", Integer.toString(0xffffff));
+ juteMaxbuffer = Integer.parseInt(bufferSize);
}
public SimDistribStateManager(ActionThrottle actionThrottle, ActionError actionError) {
@@ -493,6 +497,9 @@ public class SimDistribStateManager implements DistribStateManager {
@Override
public void setData(String path, byte[] data, int version) throws NoSuchElementException, BadVersionException, IOException {
+ if (data.length > juteMaxbuffer) {
+ throw new IOException("Len error " + data.length);
+ }
multiLock.lock();
Node n = null;
try {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimAutoScaling.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimAutoScaling.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimAutoScaling.java
index d6bf3ce..369ebc8 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimAutoScaling.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimAutoScaling.java
@@ -32,11 +32,11 @@ public class TestSimAutoScaling extends SimSolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final int SPEED = 50;
- private static final int NUM_NODES = 10;
+ private static final int NUM_NODES = 50;
- private static final long BATCH_SIZE = 200000;
- private static final long NUM_BATCHES = 1000;
- private static final long ABOVE_SIZE = 300000;
+ private static final long BATCH_SIZE = 8000000;
+ private static final long NUM_BATCHES = 100000;
+ private static final long ABOVE_SIZE = 2000000;
private static TimeSource timeSource;
@@ -47,6 +47,7 @@ public class TestSimAutoScaling extends SimSolrCloudTestCase {
configureCluster(NUM_NODES, TimeSource.get("simTime:" + SPEED));
timeSource = cluster.getTimeSource();
solrClient = cluster.simGetSolrClient();
+ cluster.simSetUseSystemCollection(false);
}
@Test
@@ -104,8 +105,8 @@ public class TestSimAutoScaling extends SimSolrCloudTestCase {
this.count = count;
current = start;
max = start + count;
- idField.setValue("foo");
doc.put("id", idField);
+ idField.setValue("foo");
}
@Override
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
index 4a2b880..711b4c3 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
@@ -94,6 +94,7 @@ public class Policy implements MapWriter {
final List<Pair<String, Type>> params;
final List<String> perReplicaAttributes;
final int zkVersion;
+ final boolean empty;
public Policy() {
this(Collections.emptyMap());
@@ -104,6 +105,7 @@ public class Policy implements MapWriter {
}
@SuppressWarnings("unchecked")
public Policy(Map<String, Object> jsonMap, int version) {
+ this.empty = jsonMap.get(CLUSTER_PREFERENCES) == null && jsonMap.get(CLUSTER_POLICY) == null && jsonMap.get(POLICIES) == null;
this.zkVersion = version;
int[] idx = new int[1];
List<Preference> initialClusterPreferences = ((List<Map<String, Object>>) jsonMap.getOrDefault(CLUSTER_PREFERENCES, emptyList())).stream()
@@ -156,6 +158,7 @@ public class Policy implements MapWriter {
}
private Policy(Map<String, List<Clause>> policies, List<Clause> clusterPolicy, List<Preference> clusterPreferences, int version) {
+ this.empty = policies == null && clusterPolicy == null && clusterPreferences == null;
this.zkVersion = version;
this.policies = policies != null ? Collections.unmodifiableMap(policies) : Collections.emptyMap();
this.clusterPolicy = clusterPolicy != null ? Collections.unmodifiableList(clusterPolicy) : Collections.emptyList();
@@ -281,11 +284,16 @@ public class Policy implements MapWriter {
return p.compare(r1, r2, false);
});
} catch (Exception e) {
+// log.error("Exception! prefs = {}, recent r1 = {}, r2 = {}, matrix = {}",
+// clusterPreferences,
+// lastComparison[0],
+// lastComparison[1],
+// Utils.toJSONString(Utils.getDeepCopy(tmpMatrix, 6, false)));
log.error("Exception! prefs = {}, recent r1 = {}, r2 = {}, matrix = {}",
clusterPreferences,
- lastComparison[0],
- lastComparison[1],
- Utils.toJSONString(Utils.getDeepCopy(tmpMatrix, 6, false)));
+ lastComparison[0].node,
+ lastComparison[1].node,
+ matrix.size());
throw e;
}
p.setApproxVal(tmpMatrix);
@@ -461,6 +469,10 @@ public class Policy implements MapWriter {
return Utils.toJSONString(this);
}
+ public boolean isEmpty() {
+ return empty;
+ }
+
/*This stores the logical state of the system, given a policy and
* a cluster state.
*
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
index d052d6f..33fb78d 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
@@ -312,7 +312,7 @@ public class PolicyHelper {
TimeSource timeSource = sessionWrapper.session != null ? sessionWrapper.session.cloudManager.getTimeSource() : TimeSource.NANO_TIME;
synchronized (lockObj) {
sessionWrapper.status = Status.EXECUTING;
- log.info("returnSession, curr-time {} sessionWrapper.createTime {}, this.sessionWrapper.createTime {} ", time(timeSource, MILLISECONDS),
+ log.debug("returnSession, curr-time {} sessionWrapper.createTime {}, this.sessionWrapper.createTime {} ", time(timeSource, MILLISECONDS),
sessionWrapper.createTime,
this.sessionWrapper.createTime);
if (sessionWrapper.createTime == this.sessionWrapper.createTime) {
@@ -323,7 +323,7 @@ public class PolicyHelper {
//one thread who is waiting for this need to be notified.
lockObj.notify();
} else {
- log.info("create time NOT SAME {} ", SessionWrapper.DEFAULT_INSTANCE.createTime);
+ log.debug("create time NOT SAME {} ", SessionWrapper.DEFAULT_INSTANCE.createTime);
//else just ignore it
}
}