You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2018/09/06 14:35:58 UTC
[1/7] lucene-solr:jira/solr-12709: SOLR-12709: TestSimAutoScaling and
improvements to simUpdate performance.
Repository: lucene-solr
Updated Branches:
refs/heads/jira/solr-12709 [created] 051891036
SOLR-12709: TestSimAutoScaling and improvements to simUpdate performance.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/a7a3eb3e
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/a7a3eb3e
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/a7a3eb3e
Branch: refs/heads/jira/solr-12709
Commit: a7a3eb3e0a93fdb76549407de1b8ba0a231e2e3a
Parents: 1cfc735
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Thu Aug 30 13:16:45 2018 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Thu Aug 30 13:16:45 2018 +0200
----------------------------------------------------------------------
.../apache/solr/metrics/SolrMetricManager.java | 2 +-
.../sim/SimClusterStateProvider.java | 279 +++++++++++++------
.../autoscaling/sim/TestSimAutoScaling.java | 115 ++++++++
.../solrj/cloud/autoscaling/VariableBase.java | 2 +-
4 files changed, 315 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a7a3eb3e/solr/core/src/java/org/apache/solr/metrics/SolrMetricManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/metrics/SolrMetricManager.java b/solr/core/src/java/org/apache/solr/metrics/SolrMetricManager.java
index f1b7923..e9cb111 100644
--- a/solr/core/src/java/org/apache/solr/metrics/SolrMetricManager.java
+++ b/solr/core/src/java/org/apache/solr/metrics/SolrMetricManager.java
@@ -791,7 +791,7 @@ public class SolrMetricManager {
*/
public static String getRegistryName(SolrInfoBean.Group group, String... names) {
String fullName;
- String prefix = REGISTRY_NAME_PREFIX + group.toString() + ".";
+ String prefix = new StringBuilder(REGISTRY_NAME_PREFIX).append(group.name()).append('.').toString();
// check for existing prefix and group
if (names != null && names.length > 0 && names[0] != null && names[0].startsWith(prefix)) {
// assume the first segment already was expanded
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a7a3eb3e/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
index 17b56d7..f261745 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
@@ -37,9 +37,11 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
+import com.google.common.util.concurrent.AtomicDouble;
import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
@@ -117,11 +119,14 @@ public class SimClusterStateProvider implements ClusterStateProvider {
public static final long DEFAULT_DOC_SIZE_BYTES = 500;
+ private static final String BUFFERED_UPDATES = "__buffered_updates__";
+
private final LiveNodesSet liveNodes;
private final SimDistribStateManager stateManager;
private final SimCloudManager cloudManager;
private final Map<String, List<ReplicaInfo>> nodeReplicaMap = new ConcurrentHashMap<>();
+ private final Map<String, Map<String, List<ReplicaInfo>>> colShardReplicaMap = new ConcurrentHashMap<>();
private final Map<String, Object> clusterProperties = new ConcurrentHashMap<>();
private final Map<String, Map<String, Object>> collProperties = new ConcurrentHashMap<>();
private final Map<String, Map<String, Map<String, Object>>> sliceProperties = new ConcurrentHashMap<>();
@@ -482,12 +487,15 @@ public class SimClusterStateProvider implements ClusterStateProvider {
replicaInfo.getVariables().put(ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
// add a property expected in Policy calculations, if missing
if (replicaInfo.getVariable(Type.CORE_IDX.metricsAttribute) == null) {
- replicaInfo.getVariables().put(Type.CORE_IDX.metricsAttribute, SimCloudManager.DEFAULT_IDX_SIZE_BYTES);
+ replicaInfo.getVariables().put(Type.CORE_IDX.metricsAttribute, new AtomicLong(SimCloudManager.DEFAULT_IDX_SIZE_BYTES));
replicaInfo.getVariables().put(Variable.coreidxsize,
- Type.CORE_IDX.convertVal(SimCloudManager.DEFAULT_IDX_SIZE_BYTES));
+ new AtomicDouble((Double)Type.CORE_IDX.convertVal(SimCloudManager.DEFAULT_IDX_SIZE_BYTES)));
}
replicas.add(replicaInfo);
+ colShardReplicaMap.computeIfAbsent(replicaInfo.getCollection(), c -> new ConcurrentHashMap<>())
+ .computeIfAbsent(replicaInfo.getShard(), s -> new ArrayList<>())
+ .add(replicaInfo);
Map<String, Object> values = cloudManager.getSimNodeStateProvider().simGetAllNodeValues()
.computeIfAbsent(nodeId, id -> new ConcurrentHashMap<>(SimCloudManager.createNodeValues(id)));
@@ -509,7 +517,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
cloudManager.getMetricManager().registry(registry).counter("UPDATE./update.requests");
cloudManager.getMetricManager().registry(registry).counter("QUERY./select.requests");
cloudManager.getMetricManager().registerGauge(null, registry,
- () -> replicaInfo.getVariable(Type.CORE_IDX.metricsAttribute),
+ () -> ((Number)replicaInfo.getVariable(Type.CORE_IDX.metricsAttribute)).longValue(),
"", true, "INDEX.sizeInBytes");
// at this point nuke our cached DocCollection state
collectionsStatesRef.set(null);
@@ -535,6 +543,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
for (int i = 0; i < replicas.size(); i++) {
if (coreNodeName.equals(replicas.get(i).getName())) {
ReplicaInfo ri = replicas.remove(i);
+ colShardReplicaMap.computeIfAbsent(ri.getCollection(), c -> new ConcurrentHashMap<>())
+ .computeIfAbsent(ri.getShard(), s -> new ArrayList<>())
+ .remove(ri);
collectionsStatesRef.set(null);
opDelay(ri.getCollection(), CollectionParams.CollectionAction.DELETEREPLICA.name());
@@ -611,12 +622,19 @@ public class SimClusterStateProvider implements ClusterStateProvider {
return;
}
dc.getSlices().forEach(s -> {
+ if (s.getState() == Slice.State.INACTIVE) {
+ log.trace("-- slice state is {}, skip leader election {} / {}", s.getState(), dc.getName(), s.getName());
+ return;
+ }
+ if (s.getState() != Slice.State.ACTIVE) {
+ log.trace("-- slice state is {}, but I will run leader election {} / {}", s.getState(), dc.getName(), s.getName());
+ }
if (s.getLeader() != null) {
- log.debug("-- already has leader {} / {}", dc.getName(), s.getName());
+ log.trace("-- already has leader {} / {}", dc.getName(), s.getName());
return;
}
if (s.getReplicas().isEmpty()) {
- log.debug("-- no replicas in {} / {}", dc.getName(), s.getName());
+ log.trace("-- no replicas in {} / {}", dc.getName(), s.getName());
return;
}
log.debug("-- submit leader election for {} / {}", dc.getName(), s.getName());
@@ -632,7 +650,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
AtomicBoolean stateChanged = new AtomicBoolean(Boolean.FALSE);
Replica leader = s.getLeader();
if (leader == null || !liveNodes.contains(leader.getNodeName())) {
- log.debug("Running leader election for {} / {}", collection, s.getName());
+ log.trace("Running leader election for {} / {}", collection, s.getName());
if (s.getReplicas().isEmpty()) { // no replicas - punt
log.debug("-- no replicas in {} / {}", collection, s.getName());
return;
@@ -698,7 +716,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
log.debug("-- elected new leader for " + collection + " / " + s.getName() + ": " + ri.getName());
}
} else {
- log.debug("-- already has leader for {} / {}", collection, s.getName());
+ log.trace("-- already has leader for {} / {}", collection, s.getName());
}
if (stateChanged.get() || saveState) {
collectionsStatesRef.set(null);
@@ -787,9 +805,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
collection.getReplicas().size() + 1);
try {
replicaProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
- replicaProps.put("SEARCHER.searcher.deletedDocs", 0);
- replicaProps.put("SEARCHER.searcher.numDocs", 0);
- replicaProps.put("SEARCHER.searcher.maxDoc", 0);
+ replicaProps.put("SEARCHER.searcher.deletedDocs", new AtomicLong(0));
+ replicaProps.put("SEARCHER.searcher.numDocs", new AtomicLong(0));
+ replicaProps.put("SEARCHER.searcher.maxDoc", new AtomicLong(0));
ReplicaInfo ri = new ReplicaInfo("core_node" + Assign.incAndGetId(stateManager, withCollection, 0),
coreName, withCollection, withCollectionShard, pos.type, pos.node, replicaProps);
cloudManager.submit(() -> {
@@ -810,9 +828,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
replicaNum.getAndIncrement());
try {
replicaProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
- replicaProps.put("SEARCHER.searcher.deletedDocs", 0);
- replicaProps.put("SEARCHER.searcher.numDocs", 0);
- replicaProps.put("SEARCHER.searcher.maxDoc", 0);
+ replicaProps.put("SEARCHER.searcher.deletedDocs", new AtomicLong(0));
+ replicaProps.put("SEARCHER.searcher.numDocs", new AtomicLong(0));
+ replicaProps.put("SEARCHER.searcher.maxDoc", new AtomicLong(0));
ReplicaInfo ri = new ReplicaInfo("core_node" + Assign.incAndGetId(stateManager, collectionName, 0),
coreName, collectionName, pos.shard, pos.type, pos.node, replicaProps);
cloudManager.submit(() -> {
@@ -877,6 +895,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
collProperties.remove(collection);
sliceProperties.remove(collection);
leaderThrottles.remove(collection);
+ colShardReplicaMap.remove(collection);
opDelay(collection, CollectionParams.CollectionAction.DELETE.name());
@@ -919,6 +938,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
lock.lockInterruptibly();
try {
nodeReplicaMap.clear();
+ colShardReplicaMap.clear();
collProperties.clear();
sliceProperties.clear();
leaderThrottles.clear();
@@ -1064,7 +1084,12 @@ public class SimClusterStateProvider implements ClusterStateProvider {
sliceName.set(message.getStr(SHARD_ID_PROP));
String splitKey = message.getStr("split.key");
+ // start counting buffered updates and
// always invalidate cached collection states to get up-to-date metrics
+ Map<String, Object> props = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
+ .computeIfAbsent(sliceName.get(), ss -> new ConcurrentHashMap<>());
+ props.put(BUFFERED_UPDATES, new AtomicLong());
+
collectionsStatesRef.set(null);
ClusterState clusterState = getClusterState();
@@ -1106,7 +1131,8 @@ public class SimClusterStateProvider implements ClusterStateProvider {
for (ReplicaPosition replicaPosition : replicaPositions) {
String subSliceName = replicaPosition.shard;
String subShardNodeName = replicaPosition.node;
- String solrCoreName = collectionName + "_" + subSliceName + "_replica" + (replicaPosition.index);
+// String solrCoreName = collectionName + "_" + subSliceName + "_replica_n" + (replicaPosition.index);
+ String solrCoreName = Assign.buildSolrCoreName(collectionName, subSliceName, replicaPosition.type, Assign.incAndGetId(stateManager, collectionName, 0));
Map<String, Object> replicaProps = new HashMap<>();
replicaProps.put(ZkStateReader.SHARD_ID_PROP, replicaPosition.shard);
replicaProps.put(ZkStateReader.NODE_NAME_PROP, replicaPosition.node);
@@ -1122,30 +1148,16 @@ public class SimClusterStateProvider implements ClusterStateProvider {
replicasNumDocs += remainderDocs;
replicasIndexSize += remainderIndexSize;
}
- replicaProps.put("SEARCHER.searcher.numDocs", replicasNumDocs);
- replicaProps.put("SEARCHER.searcher.maxDoc", replicasNumDocs);
- replicaProps.put("SEARCHER.searcher.deletedDocs", 0);
- replicaProps.put(Type.CORE_IDX.metricsAttribute, replicasIndexSize);
- replicaProps.put(Variable.coreidxsize, Type.CORE_IDX.convertVal(replicasIndexSize));
+ replicaProps.put("SEARCHER.searcher.numDocs", new AtomicLong(replicasNumDocs));
+ replicaProps.put("SEARCHER.searcher.maxDoc", new AtomicLong(replicasNumDocs));
+ replicaProps.put("SEARCHER.searcher.deletedDocs", new AtomicLong(0));
+ replicaProps.put(Type.CORE_IDX.metricsAttribute, new AtomicLong(replicasIndexSize));
+ replicaProps.put(Variable.coreidxsize, new AtomicDouble((Double)Type.CORE_IDX.convertVal(replicasIndexSize)));
ReplicaInfo ri = new ReplicaInfo("core_node" + Assign.incAndGetId(stateManager, collectionName, 0),
solrCoreName, collectionName, replicaPosition.shard, replicaPosition.type, subShardNodeName, replicaProps);
simAddReplica(replicaPosition.node, ri, false);
}
- // mark the old slice as inactive
- lock.lockInterruptibly();
- try {
- Map<String, Object> props = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
- .computeIfAbsent(sliceName.get(), s -> new ConcurrentHashMap<>());
- props.put(ZkStateReader.STATE_PROP, Slice.State.INACTIVE.toString());
- props.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs()));
- // XXX also mark replicas as down? currently SplitShardCmd doesn't do this
-
- // invalidate cached state
- collectionsStatesRef.set(null);
- } finally {
- lock.unlock();
- }
// add slice props
for (int i = 0; i < subRanges.size(); i++) {
String subSlice = subSlices.get(i);
@@ -1154,11 +1166,59 @@ public class SimClusterStateProvider implements ClusterStateProvider {
.computeIfAbsent(subSlice, ss -> new ConcurrentHashMap<>());
sliceProps.put(Slice.RANGE, range);
sliceProps.put(Slice.PARENT, sliceName.get());
- sliceProps.put(ZkStateReader.STATE_PROP, Slice.State.ACTIVE.toString());
+ sliceProps.put(ZkStateReader.STATE_PROP, Slice.State.CONSTRUCTION.toString());
sliceProps.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs()));
}
collectionsStatesRef.set(null);
simRunLeaderElection(Collections.singleton(collectionName), true);
+
+ CloudTestUtils.waitForState(cloudManager, collectionName, 20, TimeUnit.SECONDS,
+ CloudTestUtils.clusterShape(collection.getSlices().size() + subShardNames.size(),
+ repFactor, true, false));
+ // mark the new slices as active and the old slice as inactive
+ log.debug("-- switching slice states after split shard: collection={}, parent={}, subSlices={}", collectionName,
+ sliceName.get(), subSlices);
+ lock.lockInterruptibly();
+ try {
+ Map<String, Object> sProps = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
+ .computeIfAbsent(sliceName.get(), s -> new ConcurrentHashMap<>());
+ sProps.put(ZkStateReader.STATE_PROP, Slice.State.INACTIVE.toString());
+ sProps.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs()));
+ AtomicLong bufferedUpdates = (AtomicLong)sProps.remove(BUFFERED_UPDATES);
+ if (bufferedUpdates.get() > 0) {
+ // apply buffered updates
+ long perShard = bufferedUpdates.get() / subSlices.size();
+ long remainder = bufferedUpdates.get() % subSlices.size();
+ String subSlice = null;
+ for (String sub : subSlices) {
+ long numUpdates = perShard;
+ if (subSlice == null) {
+ subSlice = sub;
+ }
+ if (subSlice.equals(sub)) {
+ numUpdates += remainder;
+ }
+ simSetShardValue(collectionName, sub, "SEARCHER.searcher.numDocs", numUpdates, true, false);
+ simSetShardValue(collectionName, sub, "SEARCHER.searcher.maxDoc", numUpdates, true, false);
+ }
+ }
+ // XXX also mark replicas as down? currently SplitShardCmd doesn't do this
+
+ for (String s : subSlices) {
+ Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
+ .computeIfAbsent(s, ss -> new ConcurrentHashMap<>());
+ sliceProps.put(ZkStateReader.STATE_PROP, Slice.State.ACTIVE.toString());
+ sliceProps.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs()));
+ }
+
+ // invalidate cached state
+ collectionsStatesRef.set(null);
+ } finally {
+ lock.unlock();
+ }
+// cloudManager.submit(() -> {
+// return true;
+// });
results.add("success", "");
}
@@ -1189,7 +1249,8 @@ public class SimClusterStateProvider implements ClusterStateProvider {
lock.lockInterruptibly();
try {
- sliceProperties.computeIfAbsent(collectionName, coll -> new ConcurrentHashMap<>()).remove(sliceName);
+ sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>()).remove(sliceName);
+ colShardReplicaMap.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>()).remove(sliceName);
nodeReplicaMap.forEach((n, replicas) -> {
Iterator<ReplicaInfo> it = replicas.iterator();
while (it.hasNext()) {
@@ -1210,7 +1271,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
public void createSystemCollection() throws IOException {
try {
- if (simListCollections().contains(CollectionAdminParams.SYSTEM_COLL)) {
+ if (colShardReplicaMap.containsKey(CollectionAdminParams.SYSTEM_COLL)) {
return;
}
ZkNodeProps props = new ZkNodeProps(
@@ -1251,7 +1312,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
if (collection == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection not set");
}
- if (!simListCollections().contains(collection)) {
+ if (!colShardReplicaMap.containsKey(collection)) {
if (CollectionAdminParams.SYSTEM_COLL.equals(collection)) {
// auto-create
createSystemCollection();
@@ -1259,16 +1320,16 @@ public class SimClusterStateProvider implements ClusterStateProvider {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection '" + collection + "' doesn't exist");
}
}
- // always reset first to get the current metrics - it's easier than to keep matching
- // Replica with ReplicaInfo where the current real counts are stored
- collectionsStatesRef.set(null);
- DocCollection coll = getClusterState().getCollection(collection);
- DocRouter router = coll.getRouter();
boolean modified = false;
lock.lockInterruptibly();
try {
+ // always reset first to get the current metrics - it's easier than to keep matching
+ // Replica with ReplicaInfo where the current real counts are stored
+ collectionsStatesRef.set(null);
+ DocCollection coll = getClusterState().getCollection(collection);
+ DocRouter router = coll.getRouter();
List<String> deletes = req.getDeleteById();
if (deletes != null && !deletes.isEmpty()) {
for (String id : deletes) {
@@ -1286,6 +1347,15 @@ public class SimClusterStateProvider implements ClusterStateProvider {
log.debug("-- attempting to delete nonexistent doc " + id + " from " + s.getLeader());
continue;
}
+ AtomicLong bufferedUpdates = (AtomicLong)s.getProperties().get(BUFFERED_UPDATES);
+ if (bufferedUpdates != null) {
+ if (bufferedUpdates.get() > 0) {
+ bufferedUpdates.decrementAndGet();
+ } else {
+ log.debug("-- attempting to delete nonexistent buffered doc " + id + " from " + s.getLeader());
+ }
+ continue;
+ }
modified = true;
try {
simSetShardValue(collection, s.getName(), "SEARCHER.searcher.deletedDocs", 1, true, false);
@@ -1294,9 +1364,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
if (indexSize != null && indexSize.longValue() > SimCloudManager.DEFAULT_IDX_SIZE_BYTES) {
indexSize = indexSize.longValue() - DEFAULT_DOC_SIZE_BYTES;
simSetShardValue(collection, s.getName(), Type.CORE_IDX.metricsAttribute,
- indexSize.intValue(), false, false);
+ new AtomicLong(indexSize.longValue()), false, false);
simSetShardValue(collection, s.getName(), Variable.coreidxsize,
- Type.CORE_IDX.convertVal(indexSize), false, false);
+ new AtomicDouble((Double)Type.CORE_IDX.convertVal(indexSize)), false, false);
} else {
throw new Exception("unexpected indexSize ri=" + ri);
}
@@ -1326,12 +1396,12 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
modified = true;
try {
- simSetShardValue(collection, s.getName(), "SEARCHER.searcher.deletedDocs", numDocs, false, false);
- simSetShardValue(collection, s.getName(), "SEARCHER.searcher.numDocs", 0, false, false);
+ simSetShardValue(collection, s.getName(), "SEARCHER.searcher.deletedDocs", new AtomicLong(numDocs.longValue()), false, false);
+ simSetShardValue(collection, s.getName(), "SEARCHER.searcher.numDocs", new AtomicLong(0), false, false);
simSetShardValue(collection, s.getName(), Type.CORE_IDX.metricsAttribute,
- SimCloudManager.DEFAULT_IDX_SIZE_BYTES, false, false);
+ new AtomicLong(SimCloudManager.DEFAULT_IDX_SIZE_BYTES), false, false);
simSetShardValue(collection, s.getName(), Variable.coreidxsize,
- Type.CORE_IDX.convertVal(SimCloudManager.DEFAULT_IDX_SIZE_BYTES), false, false);
+ new AtomicDouble((Double)Type.CORE_IDX.convertVal(SimCloudManager.DEFAULT_IDX_SIZE_BYTES)), false, false);
} catch (Exception e) {
throw new IOException(e);
}
@@ -1339,36 +1409,64 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
}
List<SolrInputDocument> docs = req.getDocuments();
- if (docs != null && !docs.isEmpty()) {
- for (SolrInputDocument doc : docs) {
+ Iterator<SolrInputDocument> it;
+ if (docs != null) {
+ it = docs.iterator();
+ } else {
+ it = req.getDocIterator();
+ }
+ if (it != null) {
+ // this approach to updating metrics drastically increases performance
+ // of bulk updates, because simSetShardValue is relatively costly
+ Map<String, AtomicLong> docUpdates = new HashMap<>();
+ Map<String, Map<String, AtomicLong>> metricUpdates = new HashMap<>();
+ while (it.hasNext()) {
+ SolrInputDocument doc = it.next();
String id = (String) doc.getFieldValue("id");
if (id == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Document without id: " + doc);
}
Slice s = router.getTargetSlice(id, null, null, req.getParams(), coll);
+ if (s.getState() != Slice.State.ACTIVE) {
+ log.debug("-- slice not active: {}", s);
+ }
Replica leader = s.getLeader();
if (leader == null) {
log.debug("-- no leader in " + s);
continue;
}
- cloudManager.getMetricManager().registry(createRegistryName(collection, s.getName(), leader)).counter("UPDATE./update.requests").inc();
+ metricUpdates.computeIfAbsent(s.getName(), sh -> new HashMap<>())
+ .computeIfAbsent(leader.getCoreName(), cn -> new AtomicLong())
+ .incrementAndGet();
+ AtomicLong bufferedUpdates = (AtomicLong)s.getProperties().get(BUFFERED_UPDATES);
+ if (bufferedUpdates != null) {
+ bufferedUpdates.incrementAndGet();
+ continue;
+ }
modified = true;
+ docUpdates.computeIfAbsent(s.getName(), sh -> new AtomicLong())
+ .incrementAndGet();
+ }
+ docUpdates.forEach((sh, count) -> {
try {
- simSetShardValue(collection, s.getName(), "SEARCHER.searcher.numDocs", 1, true, false);
- simSetShardValue(collection, s.getName(), "SEARCHER.searcher.maxDoc", 1, true, false);
-
- ReplicaInfo ri = getReplicaInfo(leader);
- Number indexSize = (Number)ri.getVariable(Type.CORE_IDX.metricsAttribute);
+ simSetShardValue(collection, sh, "SEARCHER.searcher.numDocs", count.get(), true, false);
+ simSetShardValue(collection, sh, "SEARCHER.searcher.maxDoc", count.get(), true, false);
// for each new document increase the size by DEFAULT_DOC_SIZE_BYTES
- indexSize = indexSize.longValue() + DEFAULT_DOC_SIZE_BYTES;
- simSetShardValue(collection, s.getName(), Type.CORE_IDX.metricsAttribute,
- indexSize.longValue(), false, false);
- simSetShardValue(collection, s.getName(), Variable.coreidxsize,
- Type.CORE_IDX.convertVal(indexSize), false, false);
+ simSetShardValue(collection, sh, Type.CORE_IDX.metricsAttribute,
+ DEFAULT_DOC_SIZE_BYTES * count.get(), true, false);
+ simSetShardValue(collection, sh, Variable.coreidxsize,
+ Type.CORE_IDX.convertVal(DEFAULT_DOC_SIZE_BYTES * count.get()), true, false);
} catch (Exception e) {
- throw new IOException(e);
+ throw new RuntimeException(e);
}
- }
+ });
+ metricUpdates.forEach((sh, cores) -> {
+ cores.forEach((core, count) -> {
+ String registry = SolrMetricManager.getRegistryName(SolrInfoBean.Group.core, collection, sh,
+ Utils.parseMetricsReplicaName(collection, core));
+ cloudManager.getMetricManager().registry(registry).counter("UPDATE./update.requests").inc(count.get());
+ });
+ });
}
if (modified) {
collectionsStatesRef.set(null);
@@ -1545,17 +1643,8 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* divided by the number of replicas.
*/
public void simSetShardValue(String collection, String shard, String key, Object value, boolean delta, boolean divide) throws Exception {
- List<ReplicaInfo> infos = new ArrayList<>();
- nodeReplicaMap.forEach((n, replicas) -> {
- replicas.forEach(r -> {
- if (r.getCollection().equals(collection)) {
- if (shard != null && !shard.equals(r.getShard())) {
- return;
- }
- infos.add(r);
- }
- });
- });
+ List<ReplicaInfo> infos = colShardReplicaMap.computeIfAbsent(collection, c -> new ConcurrentHashMap<>())
+ .computeIfAbsent(shard, s -> new ArrayList<>());
if (infos.isEmpty()) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection " + collection + " doesn't exist (shard=" + shard + ").");
}
@@ -1575,22 +1664,50 @@ public class SimClusterStateProvider implements ClusterStateProvider {
Object prevValue = r.getVariables().get(key);
if (prevValue != null) {
if ((prevValue instanceof Number) && (value instanceof Number)) {
- if (((prevValue instanceof Long) || (prevValue instanceof Integer)) &&
+ if (((prevValue instanceof Long) || (prevValue instanceof Integer) ||
+ (prevValue instanceof AtomicLong) || (prevValue instanceof AtomicInteger)) &&
((value instanceof Long) || (value instanceof Integer))) {
- Long newValue = ((Number)prevValue).longValue() + ((Number)value).longValue();
- r.getVariables().put(key, newValue);
+ long newValue = ((Number)prevValue).longValue() + ((Number)value).longValue();
+ // minimize object allocations
+ if (prevValue instanceof AtomicLong) {
+ ((AtomicLong)prevValue).set(newValue);
+ } else if (prevValue instanceof AtomicInteger) {
+ ((AtomicInteger)prevValue).set(((Number)prevValue).intValue() + ((Number)value).intValue());
+ } else {
+ r.getVariables().put(key, newValue);
+ }
} else {
- Double newValue = ((Number)prevValue).doubleValue() + ((Number)value).doubleValue();
- r.getVariables().put(key, newValue);
+ double newValue = ((Number)prevValue).doubleValue() + ((Number)value).doubleValue();
+ if (prevValue instanceof AtomicDouble) {
+ ((AtomicDouble)prevValue).set(newValue);
+ } else {
+ r.getVariables().put(key, newValue);
+ }
}
} else {
throw new UnsupportedOperationException("delta cannot be applied to non-numeric values: " + prevValue + " and " + value);
}
} else {
- r.getVariables().put(key, value);
+ if (value instanceof Integer) {
+ r.getVariables().put(key, new AtomicInteger((Integer)value));
+ } else if (value instanceof Long) {
+ r.getVariables().put(key, new AtomicLong((Long)value));
+ } else if (value instanceof Double) {
+ r.getVariables().put(key, new AtomicDouble((Double)value));
+ } else {
+ r.getVariables().put(key, value);
+ }
}
} else {
- r.getVariables().put(key, value);
+ if (value instanceof Integer) {
+ r.getVariables().put(key, new AtomicInteger((Integer)value));
+ } else if (value instanceof Long) {
+ r.getVariables().put(key, new AtomicLong((Long)value));
+ } else if (value instanceof Double) {
+ r.getVariables().put(key, new AtomicDouble((Double)value));
+ } else {
+ r.getVariables().put(key, value);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a7a3eb3e/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimAutoScaling.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimAutoScaling.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimAutoScaling.java
new file mode 100644
index 0000000..dbb9785
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimAutoScaling.java
@@ -0,0 +1,115 @@
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Iterator;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.cloud.CloudTestUtils;
+import org.apache.solr.cloud.autoscaling.ExecutePlanAction;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.SolrInputField;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.util.LogLevel;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
+
+/**
+ *
+ */
+@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.cloud.autoscaling.NodeLostTrigger=INFO;org.apache.client.solrj.cloud.autoscaling=DEBUG")
+public class TestSimAutoScaling extends SimSolrCloudTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final int SPEED = 50;
+
+ private static TimeSource timeSource;
+ private static SolrClient solrClient;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ // 20 mln docs / node
+ configureCluster(50, TimeSource.get("simTime:" + SPEED));
+ timeSource = cluster.getTimeSource();
+ solrClient = cluster.simGetSolrClient();
+ }
+
+ @Test
+ public void testScaleUp() throws Exception {
+ String collectionName = "testScaleUp_collection";
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
+ "conf", 2, 2).setMaxShardsPerNode(10);
+ create.process(solrClient);
+ CloudTestUtils.waitForState(cluster, "failed to create " + collectionName, collectionName,
+ CloudTestUtils.clusterShape(2, 2, false, true));
+
+ long waitForSeconds = 3 + random().nextInt(5);
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'scaleUpTrigger'," +
+ "'event' : 'indexSize'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'aboveDocs' : 5000000," +
+ "'enabled' : true," +
+ "'actions' : [{'name' : 'compute_plan', 'class' : 'solr.ComputePlanAction'}," +
+ "{'name' : 'execute_plan', 'class' : '" + ExecutePlanAction.class.getName() + "'}]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ long batchSize = 250000;
+ for (long i = 0; i < 10000; i++) {
+ log.info("#### Total docs so far: " + (i * batchSize));
+ addDocs(collectionName, i * batchSize, batchSize);
+ timeSource.sleep(waitForSeconds);
+ }
+ }
+
+ private void addDocs(String collection, long start, long count) throws Exception {
+ UpdateRequest ureq = new UpdateRequest();
+ ureq.setParam("collection", collection);
+ ureq.setDocIterator(new FakeDocIterator(start, count));
+ solrClient.request(ureq);
+ }
+
+ // lightweight generator of fake documents
+ private static class FakeDocIterator implements Iterator<SolrInputDocument> {
+ final SolrInputDocument doc = new SolrInputDocument();
+ final SolrInputField idField = new SolrInputField("id");
+
+ final long start, count;
+ final StringBuilder sb = new StringBuilder("id-");
+
+ long current, max;
+
+ FakeDocIterator(long start, long count) {
+ this.start = start;
+ this.count = count;
+ current = start;
+ max = start + count;
+ doc.put("id", idField);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return current < max;
+ }
+
+ @Override
+ public SolrInputDocument next() {
+ sb.setLength(3);
+ idField.setValue(sb.append(Long.toString(current)).toString());
+ current++;
+ return doc;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a7a3eb3e/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/VariableBase.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/VariableBase.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/VariableBase.java
index 8b0c1cf..16f52d9 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/VariableBase.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/VariableBase.java
@@ -199,7 +199,7 @@ public class VariableBase implements Variable {
private static Map<String, Type> validatetypes = null;
/** SOLR-12662: Lazily init validatetypes to avoid Type.values() NPE due to static initializer ordering */
- private static Map<String, Type> getValidatetypes() {
+ private static synchronized Map<String, Type> getValidatetypes() {
if (validatetypes == null) {
validatetypes = new HashMap<>();
for (Type t : Type.values())
[4/7] lucene-solr:jira/solr-12709: SOLR-12723: Performance
improvements in the simulator.
Posted by ab...@apache.org.
SOLR-12723: Performance improvements in the simulator.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/4a1ee8e1
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/4a1ee8e1
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/4a1ee8e1
Branch: refs/heads/jira/solr-12709
Commit: 4a1ee8e13b7c19d3dd80c361a6023dc55aa109fa
Parents: b56095b
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Tue Sep 4 16:11:39 2018 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Tue Sep 4 16:11:39 2018 +0200
----------------------------------------------------------------------
.../cloud/autoscaling/ComputePlanAction.java | 6 +-
.../org/apache/solr/cloud/CloudTestUtils.java | 6 +-
.../cloud/autoscaling/sim/SimCloudManager.java | 8 +
.../sim/SimClusterStateProvider.java | 408 +++++++++++--------
.../autoscaling/sim/SimNodeStateProvider.java | 21 +-
.../autoscaling/sim/TestSimAutoScaling.java | 22 +-
.../sim/TestSimExecutePlanAction.java | 4 +-
.../autoscaling/sim/TestSimNodeLostTrigger.java | 2 +-
.../autoscaling/sim/TestSimPolicyCloud.java | 10 +-
.../sim/TestSimTriggerIntegration.java | 8 +-
.../solrj/cloud/autoscaling/ReplicaInfo.java | 4 +-
11 files changed, 280 insertions(+), 219 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4a1ee8e1/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
index 923a27a..5d211d2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
@@ -171,7 +171,11 @@ public class ComputePlanAction extends TriggerActionBase {
clusterState.forEachCollection(coll -> {
Integer rf = coll.getReplicationFactor();
if (rf == null) {
- rf = coll.getReplicas().size() / coll.getSlices().size();
+ if (coll.getSlices().isEmpty()) {
+ rf = 1; // ???
+ } else {
+ rf = coll.getReplicas().size() / coll.getSlices().size();
+ }
}
totalRF.addAndGet(rf * coll.getSlices().size());
});
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4a1ee8e1/solr/core/src/test/org/apache/solr/cloud/CloudTestUtils.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/CloudTestUtils.java b/solr/core/src/test/org/apache/solr/cloud/CloudTestUtils.java
index b67b551..eb50b96 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CloudTestUtils.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CloudTestUtils.java
@@ -109,7 +109,7 @@ public class CloudTestUtils {
log.trace("-- still not matching predicate: {}", state);
}
}
- throw new TimeoutException("last state: " + coll);
+ throw new TimeoutException("last ClusterState: " + state + ", last coll state: " + coll);
}
/**
@@ -141,13 +141,13 @@ public class CloudTestUtils {
}
Collection<Slice> slices = withInactive ? collectionState.getSlices() : collectionState.getActiveSlices();
if (slices.size() != expectedShards) {
- log.trace("-- wrong number of active slices, expected={}, found={}", expectedShards, collectionState.getSlices().size());
+ log.trace("-- wrong number of slices, expected={}, found={}: {}", expectedShards, collectionState.getSlices().size(), collectionState.getSlices());
return false;
}
Set<String> leaderless = new HashSet<>();
for (Slice slice : slices) {
int activeReplicas = 0;
- if (requireLeaders && slice.getLeader() == null) {
+ if (requireLeaders && slice.getState() != Slice.State.INACTIVE && slice.getLeader() == null) {
leaderless.add(slice.getName());
continue;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4a1ee8e1/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
index 1f0b6cf..51e3db4 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
@@ -37,6 +37,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import com.carrotsearch.randomizedtesting.RandomizedContext;
import com.codahale.metrics.jvm.ClassLoadingGaugeSet;
import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
@@ -386,6 +387,13 @@ public class SimCloudManager implements SolrCloudManager {
}
/**
+ * Get the source of randomness (usually initialized by the test suite).
+ */
+ public Random getRandom() {
+ return RandomizedContext.current().getRandom();
+ }
+
+ /**
* Add a new node and initialize its node values (metrics). The
* /live_nodes list is updated with the new node id.
* @return new node id
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4a1ee8e1/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
index f261745..3dd26e9 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
@@ -232,6 +232,14 @@ public class SimClusterStateProvider implements ClusterStateProvider {
/**
* Get random node id.
+ * @return one of the live nodes
+ */
+ public String simGetRandomNode() {
+ return simGetRandomNode(cloudManager.getRandom());
+ }
+
+ /**
+ * Get random node id.
* @param random instance of random.
* @return one of the live nodes
*/
@@ -637,7 +645,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
log.trace("-- no replicas in {} / {}", dc.getName(), s.getName());
return;
}
- log.debug("-- submit leader election for {} / {}", dc.getName(), s.getName());
+ log.trace("-- submit leader election for {} / {}", dc.getName(), s.getName());
cloudManager.submit(() -> {
simRunLeaderElection(dc.getName(), s, saveClusterState);
return true;
@@ -652,7 +660,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
if (leader == null || !liveNodes.contains(leader.getNodeName())) {
log.trace("Running leader election for {} / {}", collection, s.getName());
if (s.getReplicas().isEmpty()) { // no replicas - punt
- log.debug("-- no replicas in {} / {}", collection, s.getName());
+ log.trace("-- no replicas in {} / {}", collection, s.getName());
return;
}
ActionThrottle lt = getThrottle(collection, s.getName());
@@ -686,7 +694,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
});
if (alreadyHasLeader.get()) {
- log.debug("-- already has leader {} / {}: {}", collection, s.getName(), s);
+ log.trace("-- already has leader {} / {}: {}", collection, s.getName(), s);
return;
}
if (active.isEmpty()) {
@@ -712,8 +720,8 @@ public class SimClusterStateProvider implements ClusterStateProvider {
synchronized (ri) {
ri.getVariables().put(ZkStateReader.LEADER_PROP, "true");
}
+ log.trace("-- elected new leader for {} / {}: {}", collection, s.getName(), ri);
stateChanged.set(true);
- log.debug("-- elected new leader for " + collection + " / " + s.getName() + ": " + ri.getName());
}
} else {
log.trace("-- already has leader for {} / {}", collection, s.getName());
@@ -1084,17 +1092,24 @@ public class SimClusterStateProvider implements ClusterStateProvider {
sliceName.set(message.getStr(SHARD_ID_PROP));
String splitKey = message.getStr("split.key");
+ ClusterState clusterState = getClusterState();
+ DocCollection collection = clusterState.getCollection(collectionName);
+ Slice parentSlice = SplitShardCmd.getParentSlice(clusterState, collectionName, sliceName, splitKey);
+ Replica leader = parentSlice.getLeader();
+ // XXX leader election may not have happened yet - should we require it?
+ if (leader == null) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Shard " + collectionName +
+ " / " + sliceName.get() + " has no leader and can't be split");
+ }
// start counting buffered updates and
- // always invalidate cached collection states to get up-to-date metrics
Map<String, Object> props = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
.computeIfAbsent(sliceName.get(), ss -> new ConcurrentHashMap<>());
+ if (props.containsKey(BUFFERED_UPDATES)) {
+ log.trace("--- SOLR-12729: Overlapping splitShard commands for {} / {}", collectionName, sliceName.get());
+ return;
+ }
props.put(BUFFERED_UPDATES, new AtomicLong());
- collectionsStatesRef.set(null);
-
- ClusterState clusterState = getClusterState();
- DocCollection collection = clusterState.getCollection(collectionName);
- Slice parentSlice = SplitShardCmd.getParentSlice(clusterState, collectionName, sliceName, splitKey);
List<DocRouter.Range> subRanges = new ArrayList<>();
List<String> subSlices = new ArrayList<>();
List<String> subShardNames = new ArrayList<>();
@@ -1115,12 +1130,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
if (sessionWrapper != null) sessionWrapper.release();
// adjust numDocs / deletedDocs / maxDoc
- Replica leader = parentSlice.getLeader();
- // XXX leader election may not have happened yet - should we require it?
- if (leader == null) {
- leader = parentSlice.getReplicas().iterator().next();
- }
- String numDocsStr = leader.getStr("SEARCHER.searcher.numDocs", "0");
+ String numDocsStr = String.valueOf(getReplicaInfo(leader).getVariable("SEARCHER.searcher.numDocs", "0"));
long numDocs = Long.parseLong(numDocsStr);
long newNumDocs = numDocs / subSlices.size();
long remainderDocs = numDocs % subSlices.size();
@@ -1128,6 +1138,18 @@ public class SimClusterStateProvider implements ClusterStateProvider {
long remainderIndexSize = SimCloudManager.DEFAULT_IDX_SIZE_BYTES + remainderDocs * DEFAULT_DOC_SIZE_BYTES;
String remainderSlice = null;
+ // add slice props
+ for (int i = 0; i < subRanges.size(); i++) {
+ String subSlice = subSlices.get(i);
+ DocRouter.Range range = subRanges.get(i);
+ Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
+ .computeIfAbsent(subSlice, ss -> new ConcurrentHashMap<>());
+ sliceProps.put(Slice.RANGE, range);
+ sliceProps.put(Slice.PARENT, sliceName.get());
+ sliceProps.put(ZkStateReader.STATE_PROP, Slice.State.CONSTRUCTION.toString());
+ sliceProps.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs()));
+ }
+ // add replicas
for (ReplicaPosition replicaPosition : replicaPositions) {
String subSliceName = replicaPosition.shard;
String subShardNodeName = replicaPosition.node;
@@ -1158,25 +1180,27 @@ public class SimClusterStateProvider implements ClusterStateProvider {
solrCoreName, collectionName, replicaPosition.shard, replicaPosition.type, subShardNodeName, replicaProps);
simAddReplica(replicaPosition.node, ri, false);
}
- // add slice props
- for (int i = 0; i < subRanges.size(); i++) {
- String subSlice = subSlices.get(i);
- DocRouter.Range range = subRanges.get(i);
- Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
- .computeIfAbsent(subSlice, ss -> new ConcurrentHashMap<>());
- sliceProps.put(Slice.RANGE, range);
- sliceProps.put(Slice.PARENT, sliceName.get());
- sliceProps.put(ZkStateReader.STATE_PROP, Slice.State.CONSTRUCTION.toString());
- sliceProps.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs()));
- }
- collectionsStatesRef.set(null);
simRunLeaderElection(Collections.singleton(collectionName), true);
- CloudTestUtils.waitForState(cloudManager, collectionName, 20, TimeUnit.SECONDS,
- CloudTestUtils.clusterShape(collection.getSlices().size() + subShardNames.size(),
- repFactor, true, false));
+ // delay it once again to better simulate replica recoveries
+ //opDelay(collectionName, CollectionParams.CollectionAction.SPLITSHARD.name());
+
+ CloudTestUtils.waitForState(cloudManager, collectionName, 20, TimeUnit.SECONDS, (liveNodes, state) -> {
+ for (String subSlice : subSlices) {
+ Slice s = state.getSlice(subSlice);
+ if (s.getLeader() == null) {
+ log.debug("** no leader in {} / {}", collectionName, s);
+ return false;
+ }
+ if (s.getReplicas().size() < repFactor) {
+ log.debug("** expected {} repFactor but there are {} replicas", repFactor, s.getReplicas().size());
+ return false;
+ }
+ }
+ return true;
+ });
// mark the new slices as active and the old slice as inactive
- log.debug("-- switching slice states after split shard: collection={}, parent={}, subSlices={}", collectionName,
+ log.trace("-- switching slice states after split shard: collection={}, parent={}, subSlices={}", collectionName,
sliceName.get(), subSlices);
lock.lockInterruptibly();
try {
@@ -1216,9 +1240,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
} finally {
lock.unlock();
}
-// cloudManager.submit(() -> {
-// return true;
-// });
results.add("success", "");
}
@@ -1321,158 +1342,176 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
}
- boolean modified = false;
-
- lock.lockInterruptibly();
- try {
- // always reset first to get the current metrics - it's easier than to keep matching
- // Replica with ReplicaInfo where the current real counts are stored
- collectionsStatesRef.set(null);
- DocCollection coll = getClusterState().getCollection(collection);
- DocRouter router = coll.getRouter();
- List<String> deletes = req.getDeleteById();
- if (deletes != null && !deletes.isEmpty()) {
- for (String id : deletes) {
- Slice s = router.getTargetSlice(id, null, null, req.getParams(), coll);
- // NOTE: we don't use getProperty because it uses PROPERTY_PROP_PREFIX
+ DocCollection coll = getClusterState().getCollection(collection);
+ DocRouter router = coll.getRouter();
+ List<String> deletes = req.getDeleteById();
+ if (deletes != null && !deletes.isEmpty()) {
+ for (String id : deletes) {
+ Slice s = router.getTargetSlice(id, null, null, req.getParams(), coll);
+ // NOTE: we don't use getProperty because it uses PROPERTY_PROP_PREFIX
+ Replica leader = s.getLeader();
+ if (leader == null) {
+ log.debug("-- no leader in " + s);
+ continue;
+ }
+ cloudManager.getMetricManager().registry(createRegistryName(collection, s.getName(), leader)).counter("UPDATE./update.requests").inc();
+ ReplicaInfo ri = getReplicaInfo(leader);
+ Number numDocs = (Number)ri.getVariable("SEARCHER.searcher.numDocs");
+ if (numDocs == null || numDocs.intValue() <= 0) {
+ log.debug("-- attempting to delete nonexistent doc " + id + " from " + s.getLeader());
+ continue;
+ }
+ AtomicLong bufferedUpdates = (AtomicLong)s.getProperties().get(BUFFERED_UPDATES);
+ if (bufferedUpdates != null) {
+ if (bufferedUpdates.get() > 0) {
+ bufferedUpdates.decrementAndGet();
+ } else {
+ log.debug("-- attempting to delete nonexistent buffered doc " + id + " from " + s.getLeader());
+ }
+ continue;
+ }
+ lock.lockInterruptibly();
+ try {
+ simSetShardValue(collection, s.getName(), "SEARCHER.searcher.deletedDocs", 1, true, false);
+ simSetShardValue(collection, s.getName(), "SEARCHER.searcher.numDocs", -1, true, false);
+ Number indexSize = (Number)ri.getVariable(Type.CORE_IDX.metricsAttribute);
+ if (indexSize != null && indexSize.longValue() > SimCloudManager.DEFAULT_IDX_SIZE_BYTES) {
+ indexSize = indexSize.longValue() - DEFAULT_DOC_SIZE_BYTES;
+ simSetShardValue(collection, s.getName(), Type.CORE_IDX.metricsAttribute,
+ new AtomicLong(indexSize.longValue()), false, false);
+ simSetShardValue(collection, s.getName(), Variable.coreidxsize,
+ new AtomicDouble((Double)Type.CORE_IDX.convertVal(indexSize)), false, false);
+ } else {
+ throw new Exception("unexpected indexSize ri=" + ri);
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+ deletes = req.getDeleteQuery();
+ if (deletes != null && !deletes.isEmpty()) {
+ for (String q : deletes) {
+ if (!"*:*".equals(q)) {
+ throw new UnsupportedOperationException("Only '*:*' query is supported in deleteByQuery");
+ }
+ for (Slice s : coll.getSlices()) {
Replica leader = s.getLeader();
if (leader == null) {
log.debug("-- no leader in " + s);
continue;
}
+
cloudManager.getMetricManager().registry(createRegistryName(collection, s.getName(), leader)).counter("UPDATE./update.requests").inc();
ReplicaInfo ri = getReplicaInfo(leader);
Number numDocs = (Number)ri.getVariable("SEARCHER.searcher.numDocs");
- if (numDocs == null || numDocs.intValue() <= 0) {
- log.debug("-- attempting to delete nonexistent doc " + id + " from " + s.getLeader());
- continue;
- }
- AtomicLong bufferedUpdates = (AtomicLong)s.getProperties().get(BUFFERED_UPDATES);
- if (bufferedUpdates != null) {
- if (bufferedUpdates.get() > 0) {
- bufferedUpdates.decrementAndGet();
- } else {
- log.debug("-- attempting to delete nonexistent buffered doc " + id + " from " + s.getLeader());
- }
+ if (numDocs == null || numDocs.intValue() == 0) {
continue;
}
- modified = true;
+ lock.lockInterruptibly();
try {
- simSetShardValue(collection, s.getName(), "SEARCHER.searcher.deletedDocs", 1, true, false);
- simSetShardValue(collection, s.getName(), "SEARCHER.searcher.numDocs", -1, true, false);
- Number indexSize = (Number)ri.getVariable(Type.CORE_IDX.metricsAttribute);
- if (indexSize != null && indexSize.longValue() > SimCloudManager.DEFAULT_IDX_SIZE_BYTES) {
- indexSize = indexSize.longValue() - DEFAULT_DOC_SIZE_BYTES;
- simSetShardValue(collection, s.getName(), Type.CORE_IDX.metricsAttribute,
- new AtomicLong(indexSize.longValue()), false, false);
- simSetShardValue(collection, s.getName(), Variable.coreidxsize,
- new AtomicDouble((Double)Type.CORE_IDX.convertVal(indexSize)), false, false);
- } else {
- throw new Exception("unexpected indexSize ri=" + ri);
- }
+ simSetShardValue(collection, s.getName(), "SEARCHER.searcher.deletedDocs", new AtomicLong(numDocs.longValue()), false, false);
+ simSetShardValue(collection, s.getName(), "SEARCHER.searcher.numDocs", new AtomicLong(0), false, false);
+ simSetShardValue(collection, s.getName(), Type.CORE_IDX.metricsAttribute,
+ new AtomicLong(SimCloudManager.DEFAULT_IDX_SIZE_BYTES), false, false);
+ simSetShardValue(collection, s.getName(), Variable.coreidxsize,
+ new AtomicDouble((Double)Type.CORE_IDX.convertVal(SimCloudManager.DEFAULT_IDX_SIZE_BYTES)), false, false);
} catch (Exception e) {
throw new IOException(e);
+ } finally {
+ lock.unlock();
}
}
}
- deletes = req.getDeleteQuery();
- if (deletes != null && !deletes.isEmpty()) {
- for (String q : deletes) {
- if (!"*:*".equals(q)) {
- throw new UnsupportedOperationException("Only '*:*' query is supported in deleteByQuery");
- }
- for (Slice s : coll.getSlices()) {
- Replica leader = s.getLeader();
- if (leader == null) {
- log.debug("-- no leader in " + s);
- continue;
- }
-
- cloudManager.getMetricManager().registry(createRegistryName(collection, s.getName(), leader)).counter("UPDATE./update.requests").inc();
- ReplicaInfo ri = getReplicaInfo(leader);
- Number numDocs = (Number)ri.getVariable("SEARCHER.searcher.numDocs");
- if (numDocs == null || numDocs.intValue() == 0) {
- continue;
- }
- modified = true;
- try {
- simSetShardValue(collection, s.getName(), "SEARCHER.searcher.deletedDocs", new AtomicLong(numDocs.longValue()), false, false);
- simSetShardValue(collection, s.getName(), "SEARCHER.searcher.numDocs", new AtomicLong(0), false, false);
- simSetShardValue(collection, s.getName(), Type.CORE_IDX.metricsAttribute,
- new AtomicLong(SimCloudManager.DEFAULT_IDX_SIZE_BYTES), false, false);
- simSetShardValue(collection, s.getName(), Variable.coreidxsize,
- new AtomicDouble((Double)Type.CORE_IDX.convertVal(SimCloudManager.DEFAULT_IDX_SIZE_BYTES)), false, false);
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
- }
+ }
+ List<SolrInputDocument> docs = req.getDocuments();
+ Iterator<SolrInputDocument> it;
+ if (docs != null) {
+ it = docs.iterator();
+ } else {
+ it = req.getDocIterator();
+ }
+ if (it != null) {
+ // this approach to updating counters and metrics drastically increases performance
+ // of bulk updates, because simSetShardValue is relatively costly
+
+ // also, skip the hash-based selection of slices in favor of a simple random
+ // start + round-robin assignment, because we don't keep individual id-s anyway
+ Map<String, AtomicLong> docUpdates = new HashMap<>();
+ Map<String, Map<String, AtomicLong>> metricUpdates = new HashMap<>();
+ Slice[] slices = coll.getActiveSlicesArr();
+ if (slices.length == 0) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Update sent to a collection without slices: " + coll);
}
- List<SolrInputDocument> docs = req.getDocuments();
- Iterator<SolrInputDocument> it;
- if (docs != null) {
- it = docs.iterator();
- } else {
- it = req.getDocIterator();
+ // TODO: we don't use DocRouter so we should verify that active slices cover the whole hash range
+
+ long docCount = 0;
+ while (it.hasNext()) {
+ SolrInputDocument doc = it.next();
+ String id = (String) doc.getFieldValue("id");
+ if (id == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Document without id: " + doc);
+ }
+ docCount++;
}
- if (it != null) {
- // this approach to updating metrics drastically increases performance
- // of bulk updates, because simSetShardValue is relatively costly
- Map<String, AtomicLong> docUpdates = new HashMap<>();
- Map<String, Map<String, AtomicLong>> metricUpdates = new HashMap<>();
- while (it.hasNext()) {
- SolrInputDocument doc = it.next();
- String id = (String) doc.getFieldValue("id");
- if (id == null) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Document without id: " + doc);
- }
- Slice s = router.getTargetSlice(id, null, null, req.getParams(), coll);
- if (s.getState() != Slice.State.ACTIVE) {
- log.debug("-- slice not active: {}", s);
- }
- Replica leader = s.getLeader();
- if (leader == null) {
- log.debug("-- no leader in " + s);
- continue;
- }
- metricUpdates.computeIfAbsent(s.getName(), sh -> new HashMap<>())
- .computeIfAbsent(leader.getCoreName(), cn -> new AtomicLong())
- .incrementAndGet();
- AtomicLong bufferedUpdates = (AtomicLong)s.getProperties().get(BUFFERED_UPDATES);
- if (bufferedUpdates != null) {
- bufferedUpdates.incrementAndGet();
- continue;
- }
- modified = true;
- docUpdates.computeIfAbsent(s.getName(), sh -> new AtomicLong())
- .incrementAndGet();
+ long perSlice = docCount / slices.length;
+ long remainder = docCount % slices.length;
+ int initialSlice = cloudManager.getRandom().nextInt(slices.length);
+ for (int i = 0; i < slices.length; i++) {
+ long addDocs = perSlice;
+ if (i == 0) {
+ addDocs += remainder;
}
- docUpdates.forEach((sh, count) -> {
- try {
- simSetShardValue(collection, sh, "SEARCHER.searcher.numDocs", count.get(), true, false);
- simSetShardValue(collection, sh, "SEARCHER.searcher.maxDoc", count.get(), true, false);
- // for each new document increase the size by DEFAULT_DOC_SIZE_BYTES
- simSetShardValue(collection, sh, Type.CORE_IDX.metricsAttribute,
- DEFAULT_DOC_SIZE_BYTES * count.get(), true, false);
- simSetShardValue(collection, sh, Variable.coreidxsize,
- Type.CORE_IDX.convertVal(DEFAULT_DOC_SIZE_BYTES * count.get()), true, false);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
- metricUpdates.forEach((sh, cores) -> {
- cores.forEach((core, count) -> {
- String registry = SolrMetricManager.getRegistryName(SolrInfoBean.Group.core, collection, sh,
- Utils.parseMetricsReplicaName(collection, core));
- cloudManager.getMetricManager().registry(registry).counter("UPDATE./update.requests").inc(count.get());
- });
- });
+ int sliceNum = (initialSlice + i) % slices.length;
+ Slice s = slices[sliceNum];
+ if (s.getState() != Slice.State.ACTIVE) {
+ log.debug("-- slice not active: {}", s);
+ }
+ Replica leader = s.getLeader();
+ if (leader == null) {
+ log.debug("-- no leader in " + s);
+ continue;
+ }
+ metricUpdates.computeIfAbsent(s.getName(), sh -> new HashMap<>())
+ .computeIfAbsent(leader.getCoreName(), cn -> new AtomicLong())
+ .addAndGet(addDocs);
+ AtomicLong bufferedUpdates = (AtomicLong)s.getProperties().get(BUFFERED_UPDATES);
+ if (bufferedUpdates != null) {
+ bufferedUpdates.addAndGet(addDocs);
+ continue;
+ }
+ docUpdates.computeIfAbsent(s.getName(), sh -> new AtomicLong())
+ .addAndGet(addDocs);
}
- if (modified) {
- collectionsStatesRef.set(null);
+ if (docCount > 0) {
+ lock.lockInterruptibly();
+ try {
+ docUpdates.forEach((sh, count) -> {
+ try {
+ simSetShardValue(collection, sh, "SEARCHER.searcher.numDocs", count.get(), true, false);
+ simSetShardValue(collection, sh, "SEARCHER.searcher.maxDoc", count.get(), true, false);
+ // for each new document increase the size by DEFAULT_DOC_SIZE_BYTES
+ simSetShardValue(collection, sh, Type.CORE_IDX.metricsAttribute,
+ DEFAULT_DOC_SIZE_BYTES * count.get(), true, false);
+ simSetShardValue(collection, sh, Variable.coreidxsize,
+ Type.CORE_IDX.convertVal(DEFAULT_DOC_SIZE_BYTES * count.get()), true, false);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ metricUpdates.forEach((sh, cores) -> {
+ cores.forEach((core, count) -> {
+ String registry = SolrMetricManager.getRegistryName(SolrInfoBean.Group.core, collection, sh,
+ Utils.parseMetricsReplicaName(collection, core));
+ cloudManager.getMetricManager().registry(registry).counter("UPDATE./update.requests").inc(count.get());
+ });
+ });
+ } finally {
+ lock.unlock();
+ }
}
- } finally {
- lock.unlock();
}
return new UpdateResponse();
}
@@ -1643,8 +1682,15 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* divided by the number of replicas.
*/
public void simSetShardValue(String collection, String shard, String key, Object value, boolean delta, boolean divide) throws Exception {
- List<ReplicaInfo> infos = colShardReplicaMap.computeIfAbsent(collection, c -> new ConcurrentHashMap<>())
- .computeIfAbsent(shard, s -> new ArrayList<>());
+ final List<ReplicaInfo> infos;
+ if (shard == null) {
+ infos = new ArrayList<>();
+ colShardReplicaMap.computeIfAbsent(collection, c -> new ConcurrentHashMap<>())
+ .forEach((sh, replicas) -> infos.addAll(replicas));
+ } else {
+ infos = colShardReplicaMap.computeIfAbsent(collection, c -> new ConcurrentHashMap<>())
+ .computeIfAbsent(shard, s -> new ArrayList<>());
+ }
if (infos.isEmpty()) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection " + collection + " doesn't exist (shard=" + shard + ").");
}
@@ -1729,24 +1775,23 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
}
+ public List<ReplicaInfo> simGetReplicaInfos(String collection, String shard) {
+ List<ReplicaInfo> replicas = colShardReplicaMap.computeIfAbsent(collection, c -> new ConcurrentHashMap<>())
+ .computeIfAbsent(shard, s -> new ArrayList<>());
+ if (replicas == null) {
+ return Collections.emptyList();
+ } else {
+ // make a defensive copy to avoid ConcurrentModificationException
+ return Arrays.asList(replicas.toArray(new ReplicaInfo[replicas.size()]));
+ }
+ }
+
/**
* List collections.
* @return list of existing collections.
*/
public List<String> simListCollections() throws InterruptedException {
- final Set<String> collections = new HashSet<>();
- lock.lockInterruptibly();
- try {
- nodeReplicaMap.forEach((n, replicas) -> {
- replicas.forEach(ri -> collections.add(ri.getCollection()));
- });
- // check collProps and sliceProps too
- collProperties.forEach((coll, props) -> collections.add(coll));
- sliceProperties.forEach((coll, slices) -> collections.add(coll));
- return new ArrayList<>(collections);
- } finally {
- lock.unlock();
- }
+ return new ArrayList<>(colShardReplicaMap.keySet());
}
// interface methods
@@ -1790,6 +1835,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
lock.lock();
collectionsStatesRef.set(null);
saveClusterState.set(true);
+ log.debug("** creating new collection states");
try {
Map<String, Map<String, Map<String, Replica>>> collMap = new HashMap<>();
nodeReplicaMap.forEach((n, replicas) -> {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4a1ee8e1/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java
index 9673fa7..5f9293b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java
@@ -233,6 +233,7 @@ public class SimNodeStateProvider implements NodeStateProvider {
}
private static final Pattern REGISTRY_PATTERN = Pattern.compile("^solr\\.core\\.([\\w.-_]+?)\\.(shard[\\d_]+?)\\.(replica.*)");
+ private static final Pattern METRIC_KEY_PATTERN = Pattern.compile("^metrics:([^:]+?):([^:]+?)(:([^:]+))?$");
/**
* Simulate getting replica metrics values. This uses per-replica properties set in
* {@link SimClusterStateProvider#simSetCollectionValue(String, String, Object, boolean, boolean)} and
@@ -245,33 +246,31 @@ public class SimNodeStateProvider implements NodeStateProvider {
if (!liveNodesSet.contains(node)) {
throw new RuntimeException("non-live node " + node);
}
- List<ReplicaInfo> replicas = clusterStateProvider.simGetReplicaInfos(node);
- if (replicas == null || replicas.isEmpty()) {
- return Collections.emptyMap();
- }
Map<String, Object> values = new HashMap<>();
for (String tag : tags) {
- String[] parts = tag.split(":");
- if (parts.length < 3 || !parts[0].equals("metrics")) {
+ Matcher m = METRIC_KEY_PATTERN.matcher(tag);
+ if (!m.matches() || m.groupCount() < 2) {
log.warn("Invalid metrics: tag: " + tag);
continue;
}
- if (!parts[1].startsWith("solr.core.")) {
+ String registryName = m.group(1);
+ String key = m.group(3) != null ? m.group(2) + m.group(3) : m.group(2);
+ if (!registryName.startsWith("solr.core.")) {
// skip - this is probably solr.node or solr.jvm metric
continue;
}
- Matcher m = REGISTRY_PATTERN.matcher(parts[1]);
+ m = REGISTRY_PATTERN.matcher(registryName);
if (!m.matches()) {
- log.warn("Invalid registry name: " + parts[1]);
+ log.warn("Invalid registry name: " + registryName);
continue;
}
String collection = m.group(1);
String shard = m.group(2);
String replica = m.group(3);
- String key = parts.length > 3 ? parts[2] + ":" + parts[3] : parts[2];
+ List<ReplicaInfo> replicas = clusterStateProvider.simGetReplicaInfos(collection, shard);
replicas.forEach(r -> {
- if (r.getCollection().equals(collection) && r.getShard().equals(shard) && r.getCore().endsWith(replica)) {
+ if (r.getNode().equals(node) && r.getCore().endsWith(replica)) {
Object value = r.getVariables().get(key);
if (value != null) {
values.put(tag, value);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4a1ee8e1/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimAutoScaling.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimAutoScaling.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimAutoScaling.java
index dbb9785..c564fec 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimAutoScaling.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimAutoScaling.java
@@ -3,6 +3,7 @@ package org.apache.solr.cloud.autoscaling.sim;
import java.lang.invoke.MethodHandles;
import java.util.Iterator;
+import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@@ -24,7 +25,9 @@ import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAut
/**
*
*/
-@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.cloud.autoscaling.NodeLostTrigger=INFO;org.apache.client.solrj.cloud.autoscaling=DEBUG")
+@TimeoutSuite(millis = 48 * 3600 * 1000)
+@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.cloud.autoscaling.NodeLostTrigger=INFO;org.apache.client.solrj.cloud.autoscaling=DEBUG;org.apache.solr.cloud.autoscaling.ComputePlanAction=INFO;org.apache.solr.cloud.autoscaling.ExecutePlanAction=INFO;org.apache.solr.cloud.autoscaling.ScheduledTriggers=INFO")
+//@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.cloud.autoscaling.NodeLostTrigger=INFO;org.apache.client.solrj.cloud.autoscaling=DEBUG;org.apache.solr.cloud.CloudTestUtils=TRACE")
public class TestSimAutoScaling extends SimSolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -35,8 +38,7 @@ public class TestSimAutoScaling extends SimSolrCloudTestCase {
@BeforeClass
public static void setupCluster() throws Exception {
- // 20 mln docs / node
- configureCluster(50, TimeSource.get("simTime:" + SPEED));
+ configureCluster(500, TimeSource.get("simTime:" + SPEED));
timeSource = cluster.getTimeSource();
solrClient = cluster.simGetSolrClient();
}
@@ -56,7 +58,7 @@ public class TestSimAutoScaling extends SimSolrCloudTestCase {
"'name' : 'scaleUpTrigger'," +
"'event' : 'indexSize'," +
"'waitFor' : '" + waitForSeconds + "s'," +
- "'aboveDocs' : 5000000," +
+ "'aboveDocs' : 10000000," +
"'enabled' : true," +
"'actions' : [{'name' : 'compute_plan', 'class' : 'solr.ComputePlanAction'}," +
"{'name' : 'execute_plan', 'class' : '" + ExecutePlanAction.class.getName() + "'}]" +
@@ -65,9 +67,9 @@ public class TestSimAutoScaling extends SimSolrCloudTestCase {
NamedList<Object> response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
- long batchSize = 250000;
- for (long i = 0; i < 10000; i++) {
- log.info("#### Total docs so far: " + (i * batchSize));
+ long batchSize = 4000000;
+ for (long i = 0; i < 100000; i++) {
+ log.info(String.format("#### Total docs so far: %,d", (i * batchSize)));
addDocs(collectionName, i * batchSize, batchSize);
timeSource.sleep(waitForSeconds);
}
@@ -81,12 +83,13 @@ public class TestSimAutoScaling extends SimSolrCloudTestCase {
}
// lightweight generator of fake documents
+ // NOTE: this iterator only ever returns the same document, which works ok
+ // for our "index update" simulation. Obviously don't use this for real indexing.
private static class FakeDocIterator implements Iterator<SolrInputDocument> {
final SolrInputDocument doc = new SolrInputDocument();
final SolrInputField idField = new SolrInputField("id");
final long start, count;
- final StringBuilder sb = new StringBuilder("id-");
long current, max;
@@ -95,6 +98,7 @@ public class TestSimAutoScaling extends SimSolrCloudTestCase {
this.count = count;
current = start;
max = start + count;
+ idField.setValue("foo");
doc.put("id", idField);
}
@@ -105,8 +109,6 @@ public class TestSimAutoScaling extends SimSolrCloudTestCase {
@Override
public SolrInputDocument next() {
- sb.setLength(3);
- idField.setValue(sb.append(Long.toString(current)).toString());
current++;
return doc;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4a1ee8e1/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimExecutePlanAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimExecutePlanAction.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimExecutePlanAction.java
index d0d08fd..9a1d63e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimExecutePlanAction.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimExecutePlanAction.java
@@ -92,7 +92,7 @@ public class TestSimExecutePlanAction extends SimSolrCloudTestCase {
log.info("Collection ready after " + CloudTestUtils.waitForState(cluster, collectionName, 120, TimeUnit.SECONDS,
CloudTestUtils.clusterShape(1, 2, false, true)) + "ms");
- String sourceNodeName = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+ String sourceNodeName = cluster.getSimClusterStateProvider().simGetRandomNode();
ClusterState clusterState = cluster.getClusterStateProvider().getClusterState();
DocCollection docCollection = clusterState.getCollection(collectionName);
List<Replica> replicas = docCollection.getReplicas(sourceNodeName);
@@ -181,7 +181,7 @@ public class TestSimExecutePlanAction extends SimSolrCloudTestCase {
CloudTestUtils.waitForState(cluster, "Timed out waiting for replicas of new collection to be active",
collectionName, CloudTestUtils.clusterShape(1, 2, false, true));
- String sourceNodeName = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+ String sourceNodeName = cluster.getSimClusterStateProvider().simGetRandomNode();
ClusterState clusterState = cluster.getClusterStateProvider().getClusterState();
DocCollection docCollection = clusterState.getCollection(collectionName);
List<Replica> replicas = docCollection.getReplicas(sourceNodeName);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4a1ee8e1/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimNodeLostTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimNodeLostTrigger.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimNodeLostTrigger.java
index 4ad0623..c1c5f4c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimNodeLostTrigger.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimNodeLostTrigger.java
@@ -131,7 +131,7 @@ public class TestSimNodeLostTrigger extends SimSolrCloudTestCase {
trigger.setProcessor(noFirstRunProcessor);
trigger.run();
- String lostNode = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+ String lostNode = cluster.getSimClusterStateProvider().simGetRandomNode();
cluster.simRemoveNode(lostNode, false);
AtomicBoolean fired = new AtomicBoolean(false);
trigger.setProcessor(event -> {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4a1ee8e1/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimPolicyCloud.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimPolicyCloud.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimPolicyCloud.java
index c964e44..7e9da4e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimPolicyCloud.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimPolicyCloud.java
@@ -109,7 +109,7 @@ public class TestSimPolicyCloud extends SimSolrCloudTestCase {
public void testCreateCollectionAddReplica() throws Exception {
SolrClient solrClient = cluster.simGetSolrClient();
- String nodeId = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+ String nodeId = cluster.getSimClusterStateProvider().simGetRandomNode();
int port = (Integer)cluster.getSimNodeStateProvider().simGetNodeValue(nodeId, ImplicitSnitch.PORT);
@@ -134,13 +134,13 @@ public class TestSimPolicyCloud extends SimSolrCloudTestCase {
public void testCreateCollectionSplitShard() throws Exception {
SolrClient solrClient = cluster.simGetSolrClient();
- String firstNode = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+ String firstNode = cluster.getSimClusterStateProvider().simGetRandomNode();
int firstNodePort = (Integer)cluster.getSimNodeStateProvider().simGetNodeValue(firstNode, ImplicitSnitch.PORT);
String secondNode;
int secondNodePort;
while (true) {
- secondNode = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+ secondNode = cluster.getSimClusterStateProvider().simGetRandomNode();
secondNodePort = (Integer)cluster.getSimNodeStateProvider().simGetNodeValue(secondNode, ImplicitSnitch.PORT);
if (secondNodePort != firstNodePort) break;
}
@@ -292,7 +292,7 @@ public class TestSimPolicyCloud extends SimSolrCloudTestCase {
public void testCreateCollectionAddShardUsingPolicy() throws Exception {
SolrClient solrClient = cluster.simGetSolrClient();
- String nodeId = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+ String nodeId = cluster.getSimClusterStateProvider().simGetRandomNode();
int port = (Integer)cluster.getSimNodeStateProvider().simGetNodeValue(nodeId, ImplicitSnitch.PORT);
String commands = "{set-policy :{c1 : [{replica:1 , shard:'#EACH', port: '" + port + "'}]}}";
@@ -343,7 +343,7 @@ public class TestSimPolicyCloud extends SimSolrCloudTestCase {
assertTrue("sysLoadAvg value is " + ((Number) val.get("sysLoadAvg")).doubleValue(), Double.compare(((Number) val.get("sysLoadAvg")).doubleValue(), 0.0d) > 0);
}
// simulator doesn't have Overseer, so just pick a random node
- String overseerNode = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+ String overseerNode = cluster.getSimClusterStateProvider().simGetRandomNode();
solrClient.request(CollectionAdminRequest.addRole(overseerNode, "overseer"));
for (int i = 0; i < 10; i++) {
Map<String, Object> data = Utils.getJson(cluster.getDistribStateManager(), ZkStateReader.ROLES);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4a1ee8e1/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimTriggerIntegration.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimTriggerIntegration.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimTriggerIntegration.java
index 81952af..53f26b9 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimTriggerIntegration.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimTriggerIntegration.java
@@ -479,7 +479,7 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
fail("The TriggerAction should have been created by now");
}
- String lostNodeName = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+ String lostNodeName = cluster.getSimClusterStateProvider().simGetRandomNode();
cluster.simRemoveNode(lostNodeName, false);
boolean await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
assertTrue("The trigger did not fire at all", await);
@@ -647,7 +647,7 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
"'actions' : [{'name':'test','class':'" + TestEventQueueAction.class.getName() + "'}]" +
"}}";
- String overseerLeader = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+ String overseerLeader = cluster.getSimClusterStateProvider().simGetRandomNode();
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
NamedList<Object> response = solrClient.request(req);
@@ -805,7 +805,7 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
SolrClient solrClient = cluster.simGetSolrClient();
// pick overseer node
- String overseerLeader = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+ String overseerLeader = cluster.getSimClusterStateProvider().simGetRandomNode();
// add a node
String node = cluster.simAddNode();
@@ -864,7 +864,7 @@ public class TestSimTriggerIntegration extends SimSolrCloudTestCase {
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
- overseerLeader = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+ overseerLeader = cluster.getSimClusterStateProvider().simGetRandomNode();
// create another node
log.info("====== ADD NODE 1");
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4a1ee8e1/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java
index 97f2521..ca83ad4 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java
@@ -49,10 +49,12 @@ public class ReplicaInfo implements MapWriter {
this.collection = coll;
this.shard = shard;
this.type = r.getType();
- this.isLeader = r.getBool(LEADER_PROP, false);
+ boolean maybeLeader = r.getBool(LEADER_PROP, false);
if (vals != null) {
this.variables.putAll(vals);
+ maybeLeader = "true".equals(String.valueOf(vals.getOrDefault(LEADER_PROP, maybeLeader)));
}
+ this.isLeader = maybeLeader;
this.node = r.getNodeName();
}
[3/7] lucene-solr:jira/solr-12709: Merge branch 'jira/solr-12709'
into jira/solr-12723
Posted by ab...@apache.org.
Merge branch 'jira/solr-12709' into jira/solr-12723
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/b56095ba
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/b56095ba
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/b56095ba
Branch: refs/heads/jira/solr-12709
Commit: b56095bad4cc42e3405efe7c77b118e33945ea3c
Parents: e5d12f3 a7a3eb3
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Fri Aug 31 13:11:17 2018 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Fri Aug 31 13:11:17 2018 +0200
----------------------------------------------------------------------
.../apache/solr/metrics/SolrMetricManager.java | 2 +-
.../sim/SimClusterStateProvider.java | 279 +++++++++++++------
.../autoscaling/sim/TestSimAutoScaling.java | 115 ++++++++
3 files changed, 314 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
[5/7] lucene-solr:jira/solr-12709: Merge branch 'master' into
jira/solr-12723
Posted by ab...@apache.org.
Merge branch 'master' into jira/solr-12723
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/8db16c1a
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/8db16c1a
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/8db16c1a
Branch: refs/heads/jira/solr-12709
Commit: 8db16c1ab90626c441569c5cf89e518543a28eb5
Parents: 4a1ee8e e0eb7ba
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Tue Sep 4 18:21:23 2018 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Tue Sep 4 18:21:23 2018 +0200
----------------------------------------------------------------------
dev-tools/scripts/buildAndPushRelease.py | 54 ++-
lucene/CHANGES.txt | 28 ++
lucene/MIGRATE.txt | 13 +-
.../codecs/blocktree/BlockTreeTermsReader.java | 10 -
.../lucene/codecs/blocktree/FieldReader.java | 3 +-
.../codecs/blocktree/IntersectTermsEnum.java | 220 +---------
.../blocktree/IntersectTermsEnumFrame.java | 15 -
.../codecs/blocktree/SegmentTermsEnum.java | 5 +-
.../apache/lucene/codecs/blocktree/Stats.java | 2 -
.../java/org/apache/lucene/document/Field.java | 30 +-
.../document/LatLonPointDistanceComparator.java | 7 +-
.../document/LongDistanceFeatureQuery.java | 438 +++++++++++++++++++
.../org/apache/lucene/document/LongPoint.java | 26 ++
.../org/apache/lucene/document/StoredField.java | 23 +-
.../org/apache/lucene/index/CheckIndex.java | 75 ----
.../index/SoftDeletesRetentionMergePolicy.java | 2 +-
.../src/java/org/apache/lucene/index/Terms.java | 8 +-
.../org/apache/lucene/index/package-info.java | 278 ++++++++----
.../search/BlockMaxConjunctionScorer.java | 8 +-
.../org/apache/lucene/search/BooleanScorer.java | 4 +-
.../org/apache/lucene/search/BooleanWeight.java | 4 +-
.../apache/lucene/search/CachingCollector.java | 4 +-
.../apache/lucene/search/ConjunctionScorer.java | 8 +-
.../lucene/search/ConstantScoreQuery.java | 14 +-
.../lucene/search/DisjunctionMaxQuery.java | 2 +-
.../apache/lucene/search/DisjunctionScorer.java | 6 +-
.../lucene/search/DocValuesRewriteMethod.java | 2 +-
.../lucene/search/DoubleValuesSource.java | 6 +-
.../org/apache/lucene/search/FakeScorer.java | 2 +-
.../apache/lucene/search/FieldComparator.java | 10 +-
.../lucene/search/FilterLeafCollector.java | 2 +-
.../lucene/search/FilterMatchesIterator.java | 74 ++++
.../apache/lucene/search/FilterScorable.java | 58 +++
.../org/apache/lucene/search/LRUQueryCache.java | 4 +-
.../org/apache/lucene/search/LeafCollector.java | 2 +-
.../lucene/search/LeafFieldComparator.java | 2 +-
.../apache/lucene/search/LongValuesSource.java | 2 +-
.../java/org/apache/lucene/search/Matches.java | 96 ----
.../org/apache/lucene/search/MatchesUtils.java | 132 ++++++
.../lucene/search/MaxScoreSumPropagator.java | 2 +-
.../lucene/search/MinShouldMatchSumScorer.java | 6 +-
.../apache/lucene/search/MultiCollector.java | 9 +-
.../lucene/search/MultiCollectorManager.java | 6 +-
.../lucene/search/MultiLeafFieldComparator.java | 2 +-
.../MultiTermQueryConstantScoreWrapper.java | 2 +-
.../org/apache/lucene/search/PhraseWeight.java | 2 +-
.../search/PositiveScoresOnlyCollector.java | 4 +-
.../org/apache/lucene/search/ReqExclScorer.java | 6 +-
.../apache/lucene/search/ReqOptSumScorer.java | 10 +-
.../java/org/apache/lucene/search/Scorable.java | 86 ++++
.../search/ScoreCachingWrappingScorer.java | 24 +-
.../java/org/apache/lucene/search/Scorer.java | 70 +--
.../apache/lucene/search/SimpleCollector.java | 2 +-
.../lucene/search/SimpleFieldComparator.java | 2 +-
.../org/apache/lucene/search/SynonymQuery.java | 2 +-
.../apache/lucene/search/TermInSetQuery.java | 2 +-
.../org/apache/lucene/search/TermQuery.java | 2 +-
.../apache/lucene/search/TermRangeQuery.java | 4 +
.../apache/lucene/search/TopFieldCollector.java | 45 +-
.../lucene/search/TopScoreDocCollector.java | 8 +-
.../org/apache/lucene/search/WANDScorer.java | 8 +-
.../java/org/apache/lucene/search/Weight.java | 4 +-
.../org/apache/lucene/search/package-info.java | 127 +++---
.../search/similarities/package-info.java | 52 +--
.../apache/lucene/search/spans/SpanWeight.java | 3 +-
.../apache/lucene/util/RamUsageEstimator.java | 51 +--
.../org/apache/lucene/util/packed/Direct16.java | 2 +-
.../org/apache/lucene/util/packed/Direct32.java | 2 +-
.../org/apache/lucene/util/packed/Direct64.java | 2 +-
.../org/apache/lucene/util/packed/Direct8.java | 2 +-
.../lucene/util/packed/Packed16ThreeBlocks.java | 2 +-
.../lucene/util/packed/Packed64SingleBlock.java | 2 +-
.../lucene/util/packed/Packed8ThreeBlocks.java | 2 +-
lucene/core/src/java/overview.html | 4 +-
.../document/TestLongDistanceFeatureQuery.java | 350 +++++++++++++++
.../org/apache/lucene/index/TestOmitTf.java | 18 +-
.../TestSoftDeletesRetentionMergePolicy.java | 27 ++
.../apache/lucene/search/JustCompileSearch.java | 2 +-
.../lucene/search/MultiCollectorTest.java | 8 +-
.../org/apache/lucene/search/TestBooleanOr.java | 2 +-
.../apache/lucene/search/TestBooleanQuery.java | 4 +-
.../search/TestBooleanQueryVisitSubscorers.java | 19 +-
.../apache/lucene/search/TestConjunctions.java | 8 +-
.../lucene/search/TestConstantScoreQuery.java | 26 +-
.../lucene/search/TestDoubleValuesSource.java | 6 +-
.../lucene/search/TestElevationComparator.java | 2 +-
.../lucene/search/TestMultiCollector.java | 6 +-
.../lucene/search/TestReqExclBulkScorer.java | 4 +-
.../search/TestScoreCachingWrappingScorer.java | 4 +-
.../apache/lucene/search/TestSimilarity.java | 16 +-
.../lucene/search/TestSloppyPhraseQuery.java | 14 +-
.../lucene/search/TestSubScorerFreqs.java | 22 +-
.../apache/lucene/search/TestTermScorer.java | 4 +-
.../search/TestTimeLimitingCollector.java | 2 +-
.../lucene/search/TestTopDocsCollector.java | 2 +-
.../lucene/search/TestTopFieldCollector.java | 19 +-
.../lucene/facet/DrillSidewaysScorer.java | 4 +-
.../apache/lucene/facet/FacetsCollector.java | 6 +-
.../directory/DirectoryTaxonomyReader.java | 2 +-
.../taxonomy/directory/TaxonomyIndexArrays.java | 2 +-
.../facet/AssertingSubDocsAtOnceCollector.java | 11 +-
.../search/grouping/AllGroupHeadsCollector.java | 24 +-
.../search/grouping/AllGroupsCollector.java | 4 +-
.../search/grouping/BlockGroupingCollector.java | 5 +-
.../grouping/FirstPassGroupingCollector.java | 4 +-
.../search/grouping/GroupFacetCollector.java | 4 +-
.../lucene/search/grouping/GroupReducer.java | 4 +-
.../grouping/SecondPassGroupingCollector.java | 4 +-
.../search/grouping/TopGroupsCollector.java | 6 +-
.../search/highlight/HighlighterPhraseTest.java | 5 +-
.../search/join/GlobalOrdinalsCollector.java | 6 +-
.../join/GlobalOrdinalsWithScoreCollector.java | 14 +-
.../org/apache/lucene/search/join/JoinUtil.java | 10 +-
.../search/join/TermsWithScoreCollector.java | 6 +-
.../search/join/ToChildBlockJoinQuery.java | 4 +-
.../search/join/ToParentBlockJoinQuery.java | 7 +-
.../apache/lucene/search/join/TestJoinUtil.java | 10 +-
.../apache/lucene/index/memory/MemoryIndex.java | 6 +-
.../search/DiversifiedTopDocsCollector.java | 4 +-
.../lucene/search/DocValuesStatsCollector.java | 4 +-
.../surround/query/BooleanQueryTst.java | 10 +-
.../org/apache/lucene/search/CoveringQuery.java | 2 +-
.../apache/lucene/search/CoveringScorer.java | 6 +-
.../intervals/ConjunctionIntervalsSource.java | 113 ++++-
.../intervals/DifferenceIntervalsSource.java | 23 +-
.../intervals/DisjunctionIntervalsSource.java | 14 +
.../lucene/search/intervals/IntervalFilter.java | 2 +-
.../search/intervals/IntervalFunction.java | 4 +-
.../search/intervals/IntervalIterator.java | 5 +
.../search/intervals/IntervalMatches.java | 156 +++++++
.../lucene/search/intervals/IntervalQuery.java | 20 +
.../lucene/search/intervals/Intervals.java | 4 +-
.../search/intervals/IntervalsSource.java | 13 +
.../intervals/LowpassIntervalsSource.java | 16 +
.../MinimizingConjunctionIntervalsSource.java | 236 ++++++++++
.../search/intervals/TermIntervalsSource.java | 66 +++
.../lucene/search/intervals/TestIntervals.java | 232 +++++++++-
.../spatial3d/Geo3DPointDistanceComparator.java | 4 +-
.../Geo3DPointOutsideDistanceComparator.java | 4 +-
.../document/TopSuggestDocsCollector.java | 2 +-
.../lucene/search/AssertingBulkScorer.java | 4 +-
.../lucene/search/AssertingCollector.java | 11 +-
.../lucene/search/AssertingIndexSearcher.java | 2 +-
.../lucene/search/AssertingLeafCollector.java | 13 +-
.../apache/lucene/search/AssertingScorable.java | 64 +++
.../apache/lucene/search/AssertingScorer.java | 6 +-
.../lucene/search/BulkScorerWrapperScorer.java | 4 +-
.../org/apache/lucene/search/CheckHits.java | 6 +-
.../org/apache/lucene/search/QueryUtils.java | 14 +-
solr/CHANGES.txt | 26 +-
.../org/apache/solr/ltr/LTRScoringQuery.java | 14 +-
.../solr/ltr/TestLTRReRankingPipeline.java | 6 +-
.../solr/cloud/api/collections/Assign.java | 30 +-
.../api/collections/CreateCollectionCmd.java | 14 +-
.../autoscaling/AutoAddReplicasPlanAction.java | 2 +-
.../cloud/autoscaling/ComputePlanAction.java | 44 +-
.../cloud/autoscaling/NodeAddedTrigger.java | 30 +-
.../org/apache/solr/core/SolrXmlConfig.java | 9 +-
.../solr/handler/component/ExpandComponent.java | 6 +-
.../component/HttpShardHandlerFactory.java | 8 +-
.../solr/handler/component/QueryComponent.java | 2 +-
.../solr/search/CollapsingQParserPlugin.java | 11 +-
.../apache/solr/search/DelegatingCollector.java | 8 +-
.../org/apache/solr/search/DocSetCollector.java | 4 +-
.../apache/solr/search/ExportQParserPlugin.java | 4 +-
.../apache/solr/search/HashQParserPlugin.java | 7 +-
.../apache/solr/search/MaxScoreCollector.java | 6 +-
.../apache/solr/search/SolrIndexSearcher.java | 8 +-
.../apache/solr/update/UpdateShardHandler.java | 4 +-
.../solr/update/UpdateShardHandlerConfig.java | 10 +-
.../AddSchemaFieldsUpdateProcessorFactory.java | 36 +-
.../ParseDateFieldUpdateProcessorFactory.java | 16 +-
solr/core/src/test-files/log4j2.xml | 12 +-
...lrconfig-parsing-update-processor-chains.xml | 3 -
.../apache/solr/cloud/TestWithCollection.java | 5 +-
.../autoscaling/ComputePlanActionTest.java | 124 +++++-
.../MetricTriggerIntegrationTest.java | 8 +-
.../solr/cloud/autoscaling/TestPolicyCloud.java | 20 +-
.../apache/solr/handler/RequestLoggingTest.java | 23 +-
.../org/apache/solr/logging/TestLogWatcher.java | 79 ++--
.../apache/solr/search/TestRankQueryPlugin.java | 9 +-
...dSchemaFieldsUpdateProcessorFactoryTest.java | 15 +
.../ParsingFieldUpdateProcessorsTest.java | 9 +-
solr/server/resources/log4j2-console.xml | 8 +-
solr/server/resources/log4j2.xml | 30 +-
.../src/solrcloud-autoscaling-overview.adoc | 2 +-
...olrcloud-autoscaling-policy-preferences.adoc | 33 ++
.../src/solrcloud-autoscaling-triggers.adoc | 36 +-
...store-data-with-the-data-import-handler.adoc | 28 +-
.../solr/client/solrj/impl/HttpClientUtil.java | 2 +
.../solrj/request/CollectionAdminRequest.java | 4 +
.../java/org/apache/solr/util/TestHarness.java | 5 +-
192 files changed, 3435 insertions(+), 1340 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8db16c1a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
----------------------------------------------------------------------
[7/7] lucene-solr:jira/solr-12709: SOLR-12709: Modify bulk updates
algorithm.
Posted by ab...@apache.org.
SOLR-12709: Modify bulk updates algorithm.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/05189103
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/05189103
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/05189103
Branch: refs/heads/jira/solr-12709
Commit: 051891036755aa853f3dcb2146c8a73429ace27b
Parents: 59cce0f
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Thu Sep 6 16:34:56 2018 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Thu Sep 6 16:34:56 2018 +0200
----------------------------------------------------------------------
.../java/org/apache/solr/cloud/CloudUtil.java | 4 +-
.../cloud/api/collections/AddReplicaCmd.java | 2 +-
.../solr/cloud/api/collections/Assign.java | 3 +-
.../cloud/api/collections/CreateShardCmd.java | 2 +-
.../cloud/autoscaling/ComputePlanAction.java | 2 +-
.../cloud/autoscaling/IndexSizeTrigger.java | 11 +-
.../cloud/autoscaling/IndexSizeTriggerTest.java | 4 +-
.../cloud/autoscaling/sim/SimCloudManager.java | 38 ++++--
.../sim/SimClusterStateProvider.java | 130 ++++++++++++-------
.../autoscaling/sim/SimDistribStateManager.java | 7 +
.../autoscaling/sim/TestSimAutoScaling.java | 11 +-
.../client/solrj/cloud/autoscaling/Policy.java | 18 ++-
.../solrj/cloud/autoscaling/PolicyHelper.java | 4 +-
13 files changed, 158 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java b/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java
index 13734f6..55231f8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java
@@ -144,9 +144,9 @@ public class CloudUtil {
}
- public static boolean usePolicyFramework(DocCollection collection, SolrCloudManager cloudManager)
+ public static boolean usePolicyFramework(String collection, SolrCloudManager cloudManager)
throws IOException, InterruptedException {
AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
- return !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty() || collection.getPolicyName() != null;
+ return !autoScalingConfig.getPolicy().isEmpty();
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
index c9dbaec..1a46da3 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
@@ -258,7 +258,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
// Kind of unnecessary, but it does put the logic of whether to override maxShardsPerNode in one place.
if (!skipCreateReplicaInClusterState) {
- if (CloudUtil.usePolicyFramework(coll, cloudManager)) {
+ if (CloudUtil.usePolicyFramework(collection, cloudManager)) {
if (node == null) {
if(coll.getPolicyName() != null) message.getProperties().put(Policy.POLICY, coll.getPolicyName());
node = Assign.identifyNodes(cloudManager,
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
index d323510..1998ace 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
@@ -38,6 +38,7 @@ import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
+import org.apache.solr.cloud.CloudUtil;
import org.apache.solr.cloud.rule.ReplicaAssigner;
import org.apache.solr.cloud.rule.Rule;
import org.apache.solr.common.SolrException;
@@ -255,7 +256,7 @@ public class Assign {
String policyName = message.getStr(POLICY);
AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
- if (rulesMap == null && policyName == null && autoScalingConfig.getPolicy().getClusterPolicy().isEmpty()) {
+ if (rulesMap == null && !CloudUtil.usePolicyFramework(collectionName, cloudManager)) {
log.debug("Identify nodes using default");
int i = 0;
List<ReplicaPosition> result = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
index 802583c..59c0c9a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
@@ -155,7 +155,7 @@ public class CreateShardCmd implements OverseerCollectionMessageHandler.Cmd {
Object createNodeSetStr = message.get(OverseerCollectionMessageHandler.CREATE_NODE_SET);
- boolean usePolicyFramework = CloudUtil.usePolicyFramework(collection, cloudManager);
+ boolean usePolicyFramework = CloudUtil.usePolicyFramework(collectionName, cloudManager);
List<ReplicaPosition> positions;
if (usePolicyFramework) {
if (collection.getPolicyName() != null) message.getProperties().put(Policy.POLICY, collection.getPolicyName());
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
index 36ef524..16a0d97 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
@@ -134,7 +134,7 @@ public class ComputePlanAction extends TriggerActionBase {
continue;
}
}
- log.info("Computed Plan: {}", operation.getParams());
+ log.debug("Computed Plan: {}", operation.getParams());
if (!collections.isEmpty()) {
String coll = operation.getParams().get(CoreAdminParams.COLLECTION);
if (coll != null && !collections.contains(coll)) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java
index 3f2ea8a..967582c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java
@@ -431,8 +431,15 @@ public class IndexSizeTrigger extends TriggerBase {
Map<String, List<ReplicaInfo>> belowSize) {
super(TriggerEventType.INDEXSIZE, source, eventTime, null);
properties.put(TriggerEvent.REQUESTED_OPS, ops);
- properties.put(ABOVE_SIZE_PROP, aboveSize);
- properties.put(BELOW_SIZE_PROP, belowSize);
+ // avoid passing very large amounts of data here - just use replica names
+ Set<String> above = new HashSet<>();
+ aboveSize.forEach((coll, replicas) ->
+ replicas.forEach(r -> above.add(r.getCore())));
+ properties.put(ABOVE_SIZE_PROP, above);
+ Set<String> below = new HashSet<>();
+ belowSize.forEach((coll, replicas) ->
+ replicas.forEach(r -> below.add(r.getCore())));
+ properties.put(BELOW_SIZE_PROP, below);
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java
index 0b4cffc..3226cb0 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java
@@ -361,7 +361,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
CloudTestUtils.waitForState(cloudManager, "failed to create " + collectionName, collectionName,
CloudTestUtils.clusterShape(2, 2, false, true));
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < 20; i++) {
SolrInputDocument doc = new SolrInputDocument("id", "id-" + (i * 100));
solrClient.add(collectionName, doc);
}
@@ -412,7 +412,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
assertEquals(response.get("result").toString(), "success");
// delete some docs to trigger a merge
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < 15; i++) {
solrClient.deleteById(collectionName, "id-" + (i * 100));
}
solrClient.commit(collectionName);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
index 51e3db4..365c488 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
@@ -31,6 +31,7 @@ import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -51,6 +52,7 @@ import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.NodeStateProvider;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
+import org.apache.solr.client.solrj.cloud.autoscaling.Variable;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@@ -119,6 +121,7 @@ public class SimCloudManager implements SolrCloudManager {
private final String metricTag;
private final List<SolrInputDocument> systemColl = Collections.synchronizedList(new ArrayList<>());
+ private final Map<String, Map<String, AtomicInteger>> eventCounts = new ConcurrentHashMap<>();
private final MockSearchableSolrClient solrClient;
private final Map<String, AtomicLong> opCounts = new ConcurrentSkipListMap<>();
@@ -130,9 +133,11 @@ public class SimCloudManager implements SolrCloudManager {
private MetricsHandler metricsHandler;
private MetricsHistoryHandler metricsHistoryHandler;
private TimeSource timeSource;
+ private boolean useSystemCollection = true;
private static int nodeIdPort = 10000;
- public static int DEFAULT_DISK = 1024; // 1000 GiB
+ public static int DEFAULT_FREE_DISK = 1024; // 1000 GiB
+ public static int DEFAULT_TOTAL_DISK = 10240; // 10 TiB
public static long DEFAULT_IDX_SIZE_BYTES = 10240; // 10 kiB
/**
@@ -307,7 +312,8 @@ public class SimCloudManager implements SolrCloudManager {
values.put(ImplicitSnitch.PORT, port);
values.put(ImplicitSnitch.NODE, nodeId);
values.put(ImplicitSnitch.CORES, 0);
- values.put(ImplicitSnitch.DISK, DEFAULT_DISK);
+ values.put(ImplicitSnitch.DISK, DEFAULT_FREE_DISK);
+ values.put(Variable.Type.TOTALDISK.tagName, DEFAULT_TOTAL_DISK);
values.put(ImplicitSnitch.SYSLOADAVG, 1.0);
values.put(ImplicitSnitch.HEAPUSAGE, 123450000);
values.put("sysprop.java.version", System.getProperty("java.version"));
@@ -456,6 +462,10 @@ public class SimCloudManager implements SolrCloudManager {
}
}
+ public void simSetUseSystemCollection(boolean useSystemCollection) {
+ this.useSystemCollection = useSystemCollection;
+ }
+
/**
* Clear the (simulated) .system collection.
*/
@@ -472,17 +482,7 @@ public class SimCloudManager implements SolrCloudManager {
}
public Map<String, Map<String, AtomicInteger>> simGetEventCounts() {
- TreeMap<String, Map<String, AtomicInteger>> counts = new TreeMap<>();
- synchronized (systemColl) {
- for (SolrInputDocument d : systemColl) {
- if (!"autoscaling_event".equals(d.getFieldValue("type"))) {
- continue;
- }
- counts.computeIfAbsent((String)d.getFieldValue("event.source_s"), s -> new TreeMap<>())
- .computeIfAbsent((String)d.getFieldValue("stage_s"), s -> new AtomicInteger())
- .incrementAndGet();
- }
- }
+ TreeMap<String, Map<String, AtomicInteger>> counts = new TreeMap<>(eventCounts);
return counts;
}
@@ -723,7 +723,17 @@ public class SimCloudManager implements SolrCloudManager {
if (collection == null || collection.equals(CollectionAdminParams.SYSTEM_COLL)) {
List<SolrInputDocument> docs = ureq.getDocuments();
if (docs != null) {
- systemColl.addAll(docs);
+ if (useSystemCollection) {
+ systemColl.addAll(docs);
+ }
+ for (SolrInputDocument d : docs) {
+ if (!"autoscaling_event".equals(d.getFieldValue("type"))) {
+ continue;
+ }
+ eventCounts.computeIfAbsent((String)d.getFieldValue("event.source_s"), s -> new TreeMap<>())
+ .computeIfAbsent((String)d.getFieldValue("stage_s"), s -> new AtomicInteger())
+ .incrementAndGet();
+ }
}
return new UpdateResponse();
} else {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
index e6bebec..3f31031 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
@@ -149,6 +149,8 @@ public class SimClusterStateProvider implements ClusterStateProvider {
private AtomicReference<Map<String, DocCollection>> collectionsStatesRef = new AtomicReference<>();
private AtomicBoolean saveClusterState = new AtomicBoolean();
+ private Random bulkUpdateRandom = new Random(0);
+
private transient boolean closed;
/**
@@ -515,7 +517,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.CORES, cores + 1);
Integer disk = (Integer)values.get(ImplicitSnitch.DISK);
if (disk == null) {
- disk = SimCloudManager.DEFAULT_DISK;
+ disk = SimCloudManager.DEFAULT_FREE_DISK;
}
cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.DISK, disk - 1);
// fake metrics
@@ -1186,7 +1188,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
// delay it once again to better simulate replica recoveries
//opDelay(collectionName, CollectionParams.CollectionAction.SPLITSHARD.name());
- CloudTestUtils.waitForState(cloudManager, collectionName, 20, TimeUnit.SECONDS, (liveNodes, state) -> {
+ CloudTestUtils.waitForState(cloudManager, collectionName, 30, TimeUnit.SECONDS, (liveNodes, state) -> {
for (String subSlice : subSlices) {
Slice s = state.getSlice(subSlice);
if (s.getLeader() == null) {
@@ -1349,7 +1351,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
if (deletes != null && !deletes.isEmpty()) {
for (String id : deletes) {
Slice s = router.getTargetSlice(id, null, null, req.getParams(), coll);
- // NOTE: we don't use getProperty because it uses PROPERTY_PROP_PREFIX
Replica leader = s.getLeader();
if (leader == null) {
log.debug("-- no leader in " + s);
@@ -1428,64 +1429,105 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
}
List<SolrInputDocument> docs = req.getDocuments();
- Iterator<SolrInputDocument> it;
+ int docCount = 0;
+ Iterator<SolrInputDocument> it = null;
if (docs != null) {
- it = docs.iterator();
+ docCount = docs.size();
} else {
it = req.getDocIterator();
+ if (it != null) {
+ while (it.hasNext()) {
+ docCount++;
+ }
+ }
}
- if (it != null) {
+ if (docCount > 0) {
// this approach to updating counters and metrics drastically increases performance
// of bulk updates, because simSetShardValue is relatively costly
- // also, skip the hash-based selection of slices in favor of a simple random
- // start + round-robin assignment, because we don't keep individual id-s anyway
Map<String, AtomicLong> docUpdates = new HashMap<>();
Map<String, Map<String, AtomicLong>> metricUpdates = new HashMap<>();
+
+ // XXX don't add more than 2bln docs in one request
+ boolean modified = false;
Slice[] slices = coll.getActiveSlicesArr();
if (slices.length == 0) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Update sent to a collection without slices: " + coll);
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection without slices");
}
- // TODO: we don't use DocRouter so we should verify that active slices cover the whole hash range
-
- long docCount = 0;
- long[] perSlice = new long[slices.length];
- while (it.hasNext()) {
- SolrInputDocument doc = it.next();
- String id = (String) doc.getFieldValue("id");
- if (id == null) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Document without id: " + doc);
+ int[] perSlice = new int[slices.length];
+
+ if (it != null) {
+ // BULK UPDATE: simulate random doc assignment without actually calling DocRouter,
+ // which adds significant overhead
+
+ int totalAdded = 0;
+ for (int i = 0; i < slices.length; i++) {
+ Slice s = slices[i];
+ long count = (long) docCount * ((long) s.getRange().max - (long) s.getRange().min) / 0x100000000L;
+ perSlice[i] = (int) count;
+ totalAdded += perSlice[i];
}
- docCount++;
- }
- int initialSlice = cloudManager.getRandom().nextInt(slices.length);
- for (int i = 0; i < slices.length; i++) {
- long addDocs = perSlice;
- if (i == 0) {
- addDocs += remainder;
- }
- int sliceNum = (initialSlice + i) % slices.length;
- Slice s = slices[sliceNum];
- if (s.getState() != Slice.State.ACTIVE) {
- log.debug("-- slice not active: {}", s);
+ // loss of precision due to integer math
+ int diff = docCount - totalAdded;
+ if (diff > 0) {
+ // spread the remainder more or less equally
+ int perRemain = diff / slices.length;
+ int remainder = diff % slices.length;
+ int remainderSlice = slices.length > 1 ? bulkUpdateRandom.nextInt(slices.length) : 0;
+ for (int i = 0; i < slices.length; i++) {
+ perSlice[i] += perRemain;
+ if (i == remainderSlice) {
+ perSlice[i] += remainder;
+ }
+ }
}
- Replica leader = s.getLeader();
- if (leader == null) {
- log.debug("-- no leader in " + s);
- continue;
+ for (int i = 0; i < slices.length; i++) {
+ Slice s = slices[i];
+ Replica leader = s.getLeader();
+ if (leader == null) {
+ log.debug("-- no leader in " + s);
+ continue;
+ }
+ metricUpdates.computeIfAbsent(s.getName(), sh -> new HashMap<>())
+ .computeIfAbsent(leader.getCoreName(), cn -> new AtomicLong())
+ .addAndGet(perSlice[i]);
+ modified = true;
+ AtomicLong bufferedUpdates = (AtomicLong)s.getProperties().get(BUFFERED_UPDATES);
+ if (bufferedUpdates != null) {
+ bufferedUpdates.addAndGet(perSlice[i]);
+ continue;
+ }
+ docUpdates.computeIfAbsent(s.getName(), sh -> new AtomicLong())
+ .addAndGet(perSlice[i]);
}
- metricUpdates.computeIfAbsent(s.getName(), sh -> new HashMap<>())
- .computeIfAbsent(leader.getCoreName(), cn -> new AtomicLong())
- .addAndGet(addDocs);
- AtomicLong bufferedUpdates = (AtomicLong)s.getProperties().get(BUFFERED_UPDATES);
- if (bufferedUpdates != null) {
- bufferedUpdates.addAndGet(addDocs);
- continue;
+ } else {
+ // SMALL UPDATE: use exact assignment via DocRouter
+ for (SolrInputDocument doc : docs) {
+ String id = (String) doc.getFieldValue("id");
+ if (id == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Document without id: " + doc);
+ }
+ Slice s = coll.getRouter().getTargetSlice(id, doc, null, null, coll);
+ Replica leader = s.getLeader();
+ if (leader == null) {
+ log.debug("-- no leader in " + s);
+ continue;
+ }
+ metricUpdates.computeIfAbsent(s.getName(), sh -> new HashMap<>())
+ .computeIfAbsent(leader.getCoreName(), cn -> new AtomicLong())
+ .incrementAndGet();
+ modified = true;
+ AtomicLong bufferedUpdates = (AtomicLong)s.getProperties().get(BUFFERED_UPDATES);
+ if (bufferedUpdates != null) {
+ bufferedUpdates.incrementAndGet();
+ continue;
+ }
+ docUpdates.computeIfAbsent(s.getName(), sh -> new AtomicLong())
+ .incrementAndGet();
}
- docUpdates.computeIfAbsent(s.getName(), sh -> new AtomicLong())
- .addAndGet(addDocs);
}
- if (docCount > 0) {
+
+ if (modified) {
lock.lockInterruptibly();
try {
docUpdates.forEach((sh, count) -> {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java
index 1e99ff2..2b8940a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java
@@ -214,6 +214,8 @@ public class SimDistribStateManager implements DistribStateManager {
private final String id;
private final Node root;
+ private int juteMaxbuffer = 0xfffff;
+
public SimDistribStateManager() {
this(null);
}
@@ -226,6 +228,8 @@ public class SimDistribStateManager implements DistribStateManager {
this.id = IdUtils.timeRandomId();
this.root = root != null ? root : createNewRootNode();
watchersPool = ExecutorUtil.newMDCAwareFixedThreadPool(10, new DefaultSolrThreadFactory("sim-watchers"));
+ String bufferSize = System.getProperty("jute.maxbuffer", Integer.toString(0xffffff));
+ juteMaxbuffer = Integer.parseInt(bufferSize);
}
public SimDistribStateManager(ActionThrottle actionThrottle, ActionError actionError) {
@@ -493,6 +497,9 @@ public class SimDistribStateManager implements DistribStateManager {
@Override
public void setData(String path, byte[] data, int version) throws NoSuchElementException, BadVersionException, IOException {
+ if (data.length > juteMaxbuffer) {
+ throw new IOException("Len error " + data.length);
+ }
multiLock.lock();
Node n = null;
try {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimAutoScaling.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimAutoScaling.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimAutoScaling.java
index d6bf3ce..369ebc8 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimAutoScaling.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimAutoScaling.java
@@ -32,11 +32,11 @@ public class TestSimAutoScaling extends SimSolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final int SPEED = 50;
- private static final int NUM_NODES = 10;
+ private static final int NUM_NODES = 50;
- private static final long BATCH_SIZE = 200000;
- private static final long NUM_BATCHES = 1000;
- private static final long ABOVE_SIZE = 300000;
+ private static final long BATCH_SIZE = 8000000;
+ private static final long NUM_BATCHES = 100000;
+ private static final long ABOVE_SIZE = 2000000;
private static TimeSource timeSource;
@@ -47,6 +47,7 @@ public class TestSimAutoScaling extends SimSolrCloudTestCase {
configureCluster(NUM_NODES, TimeSource.get("simTime:" + SPEED));
timeSource = cluster.getTimeSource();
solrClient = cluster.simGetSolrClient();
+ cluster.simSetUseSystemCollection(false);
}
@Test
@@ -104,8 +105,8 @@ public class TestSimAutoScaling extends SimSolrCloudTestCase {
this.count = count;
current = start;
max = start + count;
- idField.setValue("foo");
doc.put("id", idField);
+ idField.setValue("foo");
}
@Override
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
index 4a2b880..711b4c3 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
@@ -94,6 +94,7 @@ public class Policy implements MapWriter {
final List<Pair<String, Type>> params;
final List<String> perReplicaAttributes;
final int zkVersion;
+ final boolean empty;
public Policy() {
this(Collections.emptyMap());
@@ -104,6 +105,7 @@ public class Policy implements MapWriter {
}
@SuppressWarnings("unchecked")
public Policy(Map<String, Object> jsonMap, int version) {
+ this.empty = jsonMap.get(CLUSTER_PREFERENCES) == null && jsonMap.get(CLUSTER_POLICY) == null && jsonMap.get(POLICIES) == null;
this.zkVersion = version;
int[] idx = new int[1];
List<Preference> initialClusterPreferences = ((List<Map<String, Object>>) jsonMap.getOrDefault(CLUSTER_PREFERENCES, emptyList())).stream()
@@ -156,6 +158,7 @@ public class Policy implements MapWriter {
}
private Policy(Map<String, List<Clause>> policies, List<Clause> clusterPolicy, List<Preference> clusterPreferences, int version) {
+ this.empty = policies == null && clusterPolicy == null && clusterPreferences == null;
this.zkVersion = version;
this.policies = policies != null ? Collections.unmodifiableMap(policies) : Collections.emptyMap();
this.clusterPolicy = clusterPolicy != null ? Collections.unmodifiableList(clusterPolicy) : Collections.emptyList();
@@ -281,11 +284,16 @@ public class Policy implements MapWriter {
return p.compare(r1, r2, false);
});
} catch (Exception e) {
+// log.error("Exception! prefs = {}, recent r1 = {}, r2 = {}, matrix = {}",
+// clusterPreferences,
+// lastComparison[0],
+// lastComparison[1],
+// Utils.toJSONString(Utils.getDeepCopy(tmpMatrix, 6, false)));
log.error("Exception! prefs = {}, recent r1 = {}, r2 = {}, matrix = {}",
clusterPreferences,
- lastComparison[0],
- lastComparison[1],
- Utils.toJSONString(Utils.getDeepCopy(tmpMatrix, 6, false)));
+ lastComparison[0].node,
+ lastComparison[1].node,
+ matrix.size());
throw e;
}
p.setApproxVal(tmpMatrix);
@@ -461,6 +469,10 @@ public class Policy implements MapWriter {
return Utils.toJSONString(this);
}
+ public boolean isEmpty() {
+ return empty;
+ }
+
/*This stores the logical state of the system, given a policy and
* a cluster state.
*
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/05189103/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
index d052d6f..33fb78d 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
@@ -312,7 +312,7 @@ public class PolicyHelper {
TimeSource timeSource = sessionWrapper.session != null ? sessionWrapper.session.cloudManager.getTimeSource() : TimeSource.NANO_TIME;
synchronized (lockObj) {
sessionWrapper.status = Status.EXECUTING;
- log.info("returnSession, curr-time {} sessionWrapper.createTime {}, this.sessionWrapper.createTime {} ", time(timeSource, MILLISECONDS),
+ log.debug("returnSession, curr-time {} sessionWrapper.createTime {}, this.sessionWrapper.createTime {} ", time(timeSource, MILLISECONDS),
sessionWrapper.createTime,
this.sessionWrapper.createTime);
if (sessionWrapper.createTime == this.sessionWrapper.createTime) {
@@ -323,7 +323,7 @@ public class PolicyHelper {
//one thread who is waiting for this need to be notified.
lockObj.notify();
} else {
- log.info("create time NOT SAME {} ", SessionWrapper.DEFAULT_INSTANCE.createTime);
+ log.debug("create time NOT SAME {} ", SessionWrapper.DEFAULT_INSTANCE.createTime);
//else just ignore it
}
}
[6/7] lucene-solr:jira/solr-12709: SOLR-12709: Don't keep a reference
to parent Session.
Posted by ab...@apache.org.
SOLR-12709: Don't keep a reference to parent Session.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/59cce0fb
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/59cce0fb
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/59cce0fb
Branch: refs/heads/jira/solr-12709
Commit: 59cce0fb90e0b55128836dd3cada3b6e2d01ca0d
Parents: 8db16c1
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Wed Sep 5 15:35:41 2018 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Wed Sep 5 15:35:41 2018 +0200
----------------------------------------------------------------------
.../cloud/autoscaling/IndexSizeTriggerTest.java | 6 +++---
.../autoscaling/sim/SimClusterStateProvider.java | 8 ++++----
.../cloud/autoscaling/sim/TestSimAutoScaling.java | 14 ++++++++++----
.../client/solrj/cloud/autoscaling/Policy.java | 18 +++++++++---------
4 files changed, 26 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/59cce0fb/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java
index faabda1..0b4cffc 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java
@@ -93,7 +93,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
configureCluster(2)
.addConfig("conf", configset("cloud-minimal"))
.configure();
- if (random().nextBoolean() || true) {
+ if (random().nextBoolean() && false) {
cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
solrClient = cluster.getSolrClient();
loader = cluster.getJettySolrRunner(0).getCoreContainer().getResourceLoader();
@@ -190,7 +190,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
assertNotNull("should have fired an event", ev);
List<TriggerEvent.Op> ops = (List<TriggerEvent.Op>) ev.getProperty(TriggerEvent.REQUESTED_OPS);
assertNotNull("should contain requestedOps", ops);
- assertEquals("number of ops", 2, ops.size());
+ assertEquals("number of ops: " + ops, 2, ops.size());
boolean shard1 = false;
boolean shard2 = false;
for (TriggerEvent.Op op : ops) {
@@ -425,7 +425,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
"}";
req = createAutoScalingRequest(SolrRequest.METHOD.POST, resumeTriggerCommand);
response = solrClient.request(req);
- assertEquals(response.get("result").toString(), "success");
+ assertEquals("success", response.get("result").toString());
timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/59cce0fb/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
index 3dd26e9..e6bebec 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
@@ -593,6 +593,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
int version = oldData != null ? oldData.getVersion() : -1;
Assert.assertEquals(clusterStateVersion, version + 1);
stateManager.setData(ZkStateReader.CLUSTER_STATE, data, version);
+ log.debug("** saved cluster state version " + version);
clusterStateVersion++;
} catch (Exception e) {
throw new IOException(e);
@@ -720,7 +721,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
synchronized (ri) {
ri.getVariables().put(ZkStateReader.LEADER_PROP, "true");
}
- log.trace("-- elected new leader for {} / {}: {}", collection, s.getName(), ri);
+ log.debug("-- elected new leader for {} / {}: {}", collection, s.getName(), ri);
stateChanged.set(true);
}
} else {
@@ -1448,6 +1449,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
// TODO: we don't use DocRouter so we should verify that active slices cover the whole hash range
long docCount = 0;
+ long[] perSlice = new long[slices.length];
while (it.hasNext()) {
SolrInputDocument doc = it.next();
String id = (String) doc.getFieldValue("id");
@@ -1456,8 +1458,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
docCount++;
}
- long perSlice = docCount / slices.length;
- long remainder = docCount % slices.length;
int initialSlice = cloudManager.getRandom().nextInt(slices.length);
for (int i = 0; i < slices.length; i++) {
long addDocs = perSlice;
@@ -1877,7 +1877,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
Map<String, Object> collProps = collProperties.computeIfAbsent(coll, c -> new ConcurrentHashMap<>());
Map<String, Object> routerProp = (Map<String, Object>) collProps.getOrDefault(DocCollection.DOC_ROUTER, Collections.singletonMap("name", DocRouter.DEFAULT_NAME));
DocRouter router = DocRouter.getDocRouter((String)routerProp.getOrDefault("name", DocRouter.DEFAULT_NAME));
- DocCollection dc = new DocCollection(coll, slices, collProps, router, clusterStateVersion, ZkStateReader.CLUSTER_STATE);
+ DocCollection dc = new DocCollection(coll, slices, collProps, router, clusterStateVersion + 1, ZkStateReader.CLUSTER_STATE);
res.put(coll, dc);
});
collectionsStatesRef.set(res);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/59cce0fb/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimAutoScaling.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimAutoScaling.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimAutoScaling.java
index c564fec..d6bf3ce 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimAutoScaling.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimAutoScaling.java
@@ -32,13 +32,19 @@ public class TestSimAutoScaling extends SimSolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final int SPEED = 50;
+ private static final int NUM_NODES = 10;
+
+ private static final long BATCH_SIZE = 200000;
+ private static final long NUM_BATCHES = 1000;
+ private static final long ABOVE_SIZE = 300000;
+
private static TimeSource timeSource;
private static SolrClient solrClient;
@BeforeClass
public static void setupCluster() throws Exception {
- configureCluster(500, TimeSource.get("simTime:" + SPEED));
+ configureCluster(NUM_NODES, TimeSource.get("simTime:" + SPEED));
timeSource = cluster.getTimeSource();
solrClient = cluster.simGetSolrClient();
}
@@ -58,7 +64,7 @@ public class TestSimAutoScaling extends SimSolrCloudTestCase {
"'name' : 'scaleUpTrigger'," +
"'event' : 'indexSize'," +
"'waitFor' : '" + waitForSeconds + "s'," +
- "'aboveDocs' : 10000000," +
+ "'aboveDocs' : " + ABOVE_SIZE + "," +
"'enabled' : true," +
"'actions' : [{'name' : 'compute_plan', 'class' : 'solr.ComputePlanAction'}," +
"{'name' : 'execute_plan', 'class' : '" + ExecutePlanAction.class.getName() + "'}]" +
@@ -67,8 +73,8 @@ public class TestSimAutoScaling extends SimSolrCloudTestCase {
NamedList<Object> response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
- long batchSize = 4000000;
- for (long i = 0; i < 100000; i++) {
+ long batchSize = BATCH_SIZE;
+ for (long i = 0; i < NUM_BATCHES; i++) {
log.info(String.format("#### Total docs so far: %,d", (i * batchSize)));
addDocs(collectionName, i * batchSize, batchSize);
timeSource.sleep(waitForSeconds);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/59cce0fb/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
index 210e324..4a2b880 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
@@ -405,13 +405,13 @@ public class Policy implements MapWriter {
return currentSession.getViolations();
}
- public boolean undo() {
- if (currentSession.parent != null) {
- currentSession = currentSession.parent;
- return true;
- }
- return false;
- }
+// public boolean undo() {
+// if (currentSession.parent != null) {
+// currentSession = currentSession.parent;
+// return true;
+// }
+// return false;
+// }
public Session getCurrentSession() {
@@ -475,12 +475,12 @@ public class Policy implements MapWriter {
List<Clause> expandedClauses;
List<Violation> violations = new ArrayList<>();
Transaction transaction;
- private Session parent = null;
+ //private Session parent = null;
private Session(List<String> nodes, SolrCloudManager cloudManager,
List<Row> matrix, List<Clause> expandedClauses, int znodeVersion,
NodeStateProvider nodeStateProvider, Transaction transaction, Session parent) {
- this.parent = parent;
+ //this.parent = parent;
this.transaction = transaction;
this.nodes = nodes;
this.cloudManager = cloudManager;
[2/7] lucene-solr:jira/solr-12709: Initial patch.
Posted by ab...@apache.org.
Initial patch.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/e5d12f30
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/e5d12f30
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/e5d12f30
Branch: refs/heads/jira/solr-12709
Commit: e5d12f30943a5701ce4145dfa730ca284c5440ba
Parents: 86ba65c
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Fri Aug 31 13:10:01 2018 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Fri Aug 31 13:10:01 2018 +0200
----------------------------------------------------------------------
.../stream/AnalyticsShardRequestManager.java | 2 +-
.../handler/admin/MetricsHistoryHandler.java | 8 ++---
.../apache/solr/schema/ManagedIndexSchema.java | 4 +--
.../search/join/ScoreJoinQParserPlugin.java | 2 +-
.../org/apache/solr/servlet/HttpSolrCall.java | 35 +++++++++++---------
.../processor/DistributedUpdateProcessor.java | 6 ++--
.../DocExpirationUpdateProcessorFactory.java | 9 +++--
.../TimeRoutedAliasUpdateProcessor.java | 7 ++--
.../solr/client/solrj/impl/CloudSolrClient.java | 8 ++---
.../solr/client/solrj/io/sql/StatementImpl.java | 2 +-
.../client/solrj/io/stream/CloudSolrStream.java | 10 +++---
.../io/stream/FeaturesSelectionStream.java | 2 +-
.../client/solrj/io/stream/TextLogitStream.java | 2 +-
.../client/solrj/io/stream/TopicStream.java | 6 ++--
.../client/solrj/io/stream/TupleStream.java | 2 +-
.../solr/common/cloud/ClusterStateUtil.java | 4 +--
.../solr/common/cloud/CompositeIdRouter.java | 2 +-
.../apache/solr/common/cloud/DocCollection.java | 9 +++++
.../solr/common/cloud/HashBasedRouter.java | 3 +-
19 files changed, 68 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e5d12f30/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/AnalyticsShardRequestManager.java
----------------------------------------------------------------------
diff --git a/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/AnalyticsShardRequestManager.java b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/AnalyticsShardRequestManager.java
index 7a53500..2ad6003 100644
--- a/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/AnalyticsShardRequestManager.java
+++ b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/AnalyticsShardRequestManager.java
@@ -107,7 +107,7 @@ public class AnalyticsShardRequestManager {
ClusterState clusterState = zkStateReader.getClusterState();
Set<String> liveNodes = clusterState.getLiveNodes();
- Collection<Slice> slices = clusterState.getCollection(collection).getActiveSlices();
+ Slice[] slices = clusterState.getCollection(collection).getActiveSlicesArr();
for(Slice slice : slices) {
Collection<Replica> replicas = slice.getReplicas();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e5d12f30/solr/core/src/java/org/apache/solr/handler/admin/MetricsHistoryHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/MetricsHistoryHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/MetricsHistoryHandler.java
index 9a46d04..1c74dba 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/MetricsHistoryHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/MetricsHistoryHandler.java
@@ -528,16 +528,16 @@ public class MetricsHistoryHandler extends RequestHandlerBase implements Permiss
Map<String, Number> perReg = totals
.computeIfAbsent(Group.collection, g -> new HashMap<>())
.computeIfAbsent(registry, r -> new HashMap<>());
- Collection<Slice> slices = coll.getActiveSlices();
- perReg.put(NUM_SHARDS_KEY, slices.size());
+ Slice[] slices = coll.getActiveSlicesArr();
+ perReg.put(NUM_SHARDS_KEY, slices.length);
DoubleAdder numActiveReplicas = new DoubleAdder();
- slices.forEach(s -> {
+ for (Slice s : slices) {
s.forEach(r -> {
if (r.isActive(state.getLiveNodes())) {
numActiveReplicas.add(1.0);
}
});
- });
+ }
perReg.put(NUM_REPLICAS_KEY, numActiveReplicas);
});
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e5d12f30/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java b/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java
index 04b2606..0314ad9 100644
--- a/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java
+++ b/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java
@@ -289,8 +289,8 @@ public final class ManagedIndexSchema extends IndexSchema {
ClusterState clusterState = zkStateReader.getClusterState();
Set<String> liveNodes = clusterState.getLiveNodes();
final DocCollection docCollection = clusterState.getCollectionOrNull(collection);
- if (docCollection != null && docCollection.getActiveSlices() != null && docCollection.getActiveSlices().size() > 0) {
- final Collection<Slice> activeSlices = docCollection.getActiveSlices();
+ if (docCollection != null && docCollection.getActiveSlicesArr().length > 0) {
+ final Slice[] activeSlices = docCollection.getActiveSlicesArr();
for (Slice next : activeSlices) {
Map<String, Replica> replicasMap = next.getReplicasMap();
if (replicasMap != null) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e5d12f30/solr/core/src/java/org/apache/solr/search/join/ScoreJoinQParserPlugin.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/search/join/ScoreJoinQParserPlugin.java b/solr/core/src/java/org/apache/solr/search/join/ScoreJoinQParserPlugin.java
index 946125f..55d58fc 100644
--- a/solr/core/src/java/org/apache/solr/search/join/ScoreJoinQParserPlugin.java
+++ b/solr/core/src/java/org/apache/solr/search/join/ScoreJoinQParserPlugin.java
@@ -302,7 +302,7 @@ public class ScoreJoinQParserPlugin extends QParserPlugin {
String fromReplica = null;
String nodeName = zkController.getNodeName();
- for (Slice slice : zkController.getClusterState().getCollection(fromIndex).getActiveSlices()) {
+ for (Slice slice : zkController.getClusterState().getCollection(fromIndex).getActiveSlicesArr()) {
if (fromReplica != null)
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"SolrCloud join: multiple shards not yet supported " + fromIndex);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e5d12f30/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
index b297a44..4a3c34f 100644
--- a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
+++ b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
@@ -862,9 +862,9 @@ public class HttpSolrCall {
Collection<Slice> slices, boolean activeSlices) {
if (activeSlices) {
for (Map.Entry<String, DocCollection> entry : clusterState.getCollectionsMap().entrySet()) {
- final Collection<Slice> activeCollectionSlices = entry.getValue().getActiveSlices();
- if (activeCollectionSlices != null) {
- slices.addAll(activeCollectionSlices);
+ final Slice[] activeCollectionSlices = entry.getValue().getActiveSlicesArr();
+ for (Slice s : activeCollectionSlices) {
+ slices.add(s);
}
}
} else {
@@ -880,45 +880,48 @@ public class HttpSolrCall {
private String getRemotCoreUrl(String collectionName, String origCorename) {
ClusterState clusterState = cores.getZkController().getClusterState();
final DocCollection docCollection = clusterState.getCollectionOrNull(collectionName);
- Collection<Slice> slices = (docCollection != null) ? docCollection.getActiveSlices() : null;
+ Slice[] slices = (docCollection != null) ? docCollection.getActiveSlicesArr() : null;
+ List<Slice> activeSlices = new ArrayList<>();
boolean byCoreName = false;
if (slices == null) {
- slices = new ArrayList<>();
+ activeSlices = new ArrayList<>();
// look by core name
byCoreName = true;
- getSlicesForCollections(clusterState, slices, true);
- if (slices.isEmpty()) {
- getSlicesForCollections(clusterState, slices, false);
+ getSlicesForCollections(clusterState, activeSlices, true);
+ if (activeSlices.isEmpty()) {
+ getSlicesForCollections(clusterState, activeSlices, false);
+ }
+ } else {
+ for (Slice s : slices) {
+ activeSlices.add(s);
}
}
- if (slices.isEmpty()) {
+ if (activeSlices.isEmpty()) {
return null;
}
collectionsList.add(collectionName);
String coreUrl = getCoreUrl(collectionName, origCorename, clusterState,
- slices, byCoreName, true);
+ activeSlices, byCoreName, true);
if (coreUrl == null) {
coreUrl = getCoreUrl(collectionName, origCorename, clusterState,
- slices, byCoreName, false);
+ activeSlices, byCoreName, false);
}
return coreUrl;
}
private String getCoreUrl(String collectionName,
- String origCorename, ClusterState clusterState, Collection<Slice> slices,
+ String origCorename, ClusterState clusterState, List<Slice> slices,
boolean byCoreName, boolean activeReplicas) {
String coreUrl;
Set<String> liveNodes = clusterState.getLiveNodes();
- List<Slice> randomizedSlices = new ArrayList<>(slices.size());
- randomizedSlices.addAll(slices);
- Collections.shuffle(randomizedSlices, random);
+ Collections.shuffle(slices, random);
- for (Slice slice : randomizedSlices) {
+ for (Slice slice : slices) {
List<Replica> randomizedReplicas = new ArrayList<>();
randomizedReplicas.addAll(slice.getReplicas());
Collections.shuffle(randomizedReplicas, random);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e5d12f30/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index e1e7968..247f593 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -481,9 +481,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
for (Entry<String, RoutingRule> entry : routingRules.entrySet()) {
String targetCollectionName = entry.getValue().getTargetCollectionName();
final DocCollection docCollection = cstate.getCollectionOrNull(targetCollectionName);
- if (docCollection != null && docCollection.getActiveSlices() != null && !docCollection.getActiveSlices().isEmpty()) {
- final Collection<Slice> activeSlices = docCollection.getActiveSlices();
- Slice any = activeSlices.iterator().next();
+ if (docCollection != null && docCollection.getActiveSlicesArr().length > 0) {
+ final Slice[] activeSlices = docCollection.getActiveSlicesArr();
+ Slice any = activeSlices[0];
if (nodes == null) nodes = new ArrayList<>();
nodes.add(new StdNode(new ZkCoreNodeProps(any.getLeader())));
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e5d12f30/solr/core/src/java/org/apache/solr/update/processor/DocExpirationUpdateProcessorFactory.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DocExpirationUpdateProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/DocExpirationUpdateProcessorFactory.java
index 1f2fe1e..6eeb083 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DocExpirationUpdateProcessorFactory.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DocExpirationUpdateProcessorFactory.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.text.ParseException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@@ -32,6 +33,7 @@ import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.util.ExecutorUtil;
@@ -470,12 +472,13 @@ public final class DocExpirationUpdateProcessorFactory
CloudDescriptor desc = core.getCoreDescriptor().getCloudDescriptor();
String col = desc.getCollectionName();
- List<Slice> slices = new ArrayList<Slice>(zk.getClusterState().getCollection(col).getActiveSlices());
- Collections.sort(slices, COMPARE_SLICES_BY_NAME);
- if (slices.isEmpty()) {
+ DocCollection docCollection = zk.getClusterState().getCollection(col);
+ if (docCollection.getActiveSlicesArr().length == 0) {
log.error("Collection {} has no active Slices?", col);
return false;
}
+ List<Slice> slices = new ArrayList<>(Arrays.asList(docCollection.getActiveSlicesArr()));
+ Collections.sort(slices, COMPARE_SLICES_BY_NAME);
Replica firstSliceLeader = slices.get(0).getLeader();
if (null == firstSliceLeader) {
log.warn("Slice in charge of periodic deletes for {} does not currently have a leader",
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e5d12f30/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
index cd4ed00..d9d1da1 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
@@ -20,7 +20,6 @@ package org.apache.solr.update.processor;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.time.Instant;
-import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
@@ -386,11 +385,11 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
}
private SolrCmdDistributor.Node lookupShardLeaderOfCollection(String collection) {
- final Collection<Slice> activeSlices = zkController.getClusterState().getCollection(collection).getActiveSlices();
- if (activeSlices.isEmpty()) {
+ final Slice[] activeSlices = zkController.getClusterState().getCollection(collection).getActiveSlicesArr();
+ if (activeSlices.length == 0) {
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Cannot route to collection " + collection);
}
- final Slice slice = activeSlices.iterator().next();
+ final Slice slice = activeSlices[0];
return getLeaderNode(collection, slice);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e5d12f30/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
index 6fc216e..006d6bd 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
@@ -611,10 +611,8 @@ public class CloudSolrClient extends SolrClient {
private Map<String,List<String>> buildUrlMap(DocCollection col) {
Map<String, List<String>> urlMap = new HashMap<>();
- Collection<Slice> slices = col.getActiveSlices();
- Iterator<Slice> sliceIterator = slices.iterator();
- while (sliceIterator.hasNext()) {
- Slice slice = sliceIterator.next();
+ Slice[] slices = col.getActiveSlicesArr();
+ for (Slice slice : slices) {
String name = slice.getName();
List<String> urls = new ArrayList<>();
Replica leader = slice.getLeader();
@@ -1262,7 +1260,7 @@ public class CloudSolrClient extends SolrClient {
NamedList routes = ((CloudSolrClient.RouteResponse)resp).getRouteResponses();
DocCollection coll = getDocCollection(collection, null);
Map<String,String> leaders = new HashMap<String,String>();
- for (Slice slice : coll.getActiveSlices()) {
+ for (Slice slice : coll.getActiveSlicesArr()) {
Replica leader = slice.getLeader();
if (leader != null) {
ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e5d12f30/solr/solrj/src/java/org/apache/solr/client/solrj/io/sql/StatementImpl.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/sql/StatementImpl.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/sql/StatementImpl.java
index a2c06d4..5a94237 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/sql/StatementImpl.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/sql/StatementImpl.java
@@ -78,7 +78,7 @@ class StatementImpl implements Statement {
protected SolrStream constructStream(String sql) throws IOException {
try {
ZkStateReader zkStateReader = this.connection.getClient().getZkStateReader();
- Collection<Slice> slices = CloudSolrStream.getSlices(this.connection.getCollection(), zkStateReader, true);
+ Slice[] slices = CloudSolrStream.getSlices(this.connection.getCollection(), zkStateReader, true);
List<Replica> shuffler = new ArrayList<>();
for(Slice slice : slices) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e5d12f30/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
index 32cf15e..ddd9774 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
@@ -18,7 +18,7 @@ package org.apache.solr.client.solrj.io.stream;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -325,7 +325,7 @@ public class CloudSolrStream extends TupleStream implements Expressible {
}
}
- public static Collection<Slice> getSlices(String collectionName, ZkStateReader zkStateReader, boolean checkAlias) throws IOException {
+ public static Slice[] getSlices(String collectionName, ZkStateReader zkStateReader, boolean checkAlias) throws IOException {
ClusterState clusterState = zkStateReader.getClusterState();
Map<String, DocCollection> collectionsMap = clusterState.getCollectionsMap();
@@ -341,16 +341,16 @@ public class CloudSolrStream extends TupleStream implements Expressible {
List<Slice> slices = collections.stream()
.map(collectionsMap::get)
.filter(Objects::nonNull)
- .flatMap(docCol -> docCol.getActiveSlices().stream())
+ .flatMap(docCol -> Arrays.stream(docCol.getActiveSlicesArr()))
.collect(Collectors.toList());
if (!slices.isEmpty()) {
- return slices;
+ return slices.toArray(new Slice[slices.size()]);
}
// Check collection case insensitive
for(String collectionMapKey : collectionsMap.keySet()) {
if(collectionMapKey.equalsIgnoreCase(collectionName)) {
- return collectionsMap.get(collectionMapKey).getActiveSlices();
+ return collectionsMap.get(collectionMapKey).getActiveSlicesArr();
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e5d12f30/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
index b6ad276..3212dc6 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FeaturesSelectionStream.java
@@ -259,7 +259,7 @@ public class FeaturesSelectionStream extends TupleStream implements Expressible{
try {
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
- Collection<Slice> slices = CloudSolrStream.getSlices(this.collection, zkStateReader, false);
+ Slice[] slices = CloudSolrStream.getSlices(this.collection, zkStateReader, false);
ClusterState clusterState = zkStateReader.getClusterState();
Set<String> liveNodes = clusterState.getLiveNodes();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e5d12f30/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java
index f56431c..dd9be6a 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TextLogitStream.java
@@ -341,7 +341,7 @@ public class TextLogitStream extends TupleStream implements Expressible {
try {
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
- Collection<Slice> slices = CloudSolrStream.getSlices(this.collection, zkStateReader, false);
+ Slice[] slices = CloudSolrStream.getSlices(this.collection, zkStateReader, false);
ClusterState clusterState = zkStateReader.getClusterState();
Set<String> liveNodes = clusterState.getLiveNodes();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e5d12f30/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
index e076186..9af4cbf 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TopicStream.java
@@ -395,7 +395,7 @@ public class TopicStream extends CloudSolrStream implements Expressible {
this.checkpoints = new HashMap<>();
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
- Collection<Slice> slices = CloudSolrStream.getSlices(this.collection, zkStateReader, false);
+ Slice[] slices = CloudSolrStream.getSlices(this.collection, zkStateReader, false);
ClusterState clusterState = zkStateReader.getClusterState();
Set<String> liveNodes = clusterState.getLiveNodes();
@@ -474,7 +474,7 @@ public class TopicStream extends CloudSolrStream implements Expressible {
private void getPersistedCheckpoints() throws IOException {
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
- Collection<Slice> slices = CloudSolrStream.getSlices(checkpointCollection, zkStateReader, false);
+ Slice[] slices = CloudSolrStream.getSlices(checkpointCollection, zkStateReader, false);
ClusterState clusterState = zkStateReader.getClusterState();
Set<String> liveNodes = clusterState.getLiveNodes();
@@ -506,7 +506,7 @@ public class TopicStream extends CloudSolrStream implements Expressible {
protected void constructStreams() throws IOException {
try {
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
- Collection<Slice> slices = CloudSolrStream.getSlices(this.collection, zkStateReader, false);
+ Slice[] slices = CloudSolrStream.getSlices(this.collection, zkStateReader, false);
ModifiableSolrParams mParams = new ModifiableSolrParams(params);
mParams.set(DISTRIB, "false"); // We are the aggregator.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e5d12f30/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
index 288608f..94dd920 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
@@ -133,7 +133,7 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter
CloudSolrClient cloudSolrClient = streamContext.getSolrClientCache().getCloudSolrClient(zkHost);
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
ClusterState clusterState = zkStateReader.getClusterState();
- Collection<Slice> slices = CloudSolrStream.getSlices(collection, zkStateReader, true);
+ Slice[] slices = CloudSolrStream.getSlices(collection, zkStateReader, true);
Set<String> liveNodes = clusterState.getLiveNodes();
for(Slice slice : slices) {
Collection<Replica> replicas = slice.getReplicas();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e5d12f30/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java
index 0910868..5e61bc1 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java
@@ -217,8 +217,8 @@ public class ClusterStateUtil {
}
public static int getLiveAndActiveReplicaCount(ZkStateReader zkStateReader, String collection) {
- Collection<Slice> slices;
- slices = zkStateReader.getClusterState().getCollection(collection).getActiveSlices();
+ Slice[] slices;
+ slices = zkStateReader.getClusterState().getCollection(collection).getActiveSlicesArr();
int liveAndActive = 0;
for (Slice slice : slices) {
for (Replica replica : slice.getReplicas()) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e5d12f30/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java b/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java
index a1cd02c..30778b8 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/CompositeIdRouter.java
@@ -90,7 +90,7 @@ public class CompositeIdRouter extends HashBasedRouter {
Range completeRange = new KeyParser(id).getRange();
List<Slice> targetSlices = new ArrayList<>(1);
- for (Slice slice : collection.getActiveSlices()) {
+ for (Slice slice : collection.getActiveSlicesArr()) {
Range range = slice.getRange();
if (range != null && range.overlaps(completeRange)) {
targetSlices.add(slice);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e5d12f30/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
index 411fe56..ab250a6 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
@@ -58,6 +58,7 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
private final String name;
private final Map<String, Slice> slices;
private final Map<String, Slice> activeSlices;
+ private final Slice[] activeSlicesArr;
private final Map<String, List<Replica>> nodeNameReplicas;
private final Map<String, List<Replica>> nodeNameLeaderReplicas;
private final DocRouter router;
@@ -112,6 +113,7 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
addNodeNameReplica(replica);
}
}
+ this.activeSlicesArr = activeSlices.values().toArray(new Slice[activeSlices.size()]);
this.router = router;
this.znode = znode == null? ZkStateReader.CLUSTER_STATE : znode;
assert name != null && slices != null;
@@ -198,6 +200,13 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
}
/**
+ * Return array of active slices for this collection (performance optimization).
+ */
+ public Slice[] getActiveSlicesArr() {
+ return activeSlicesArr;
+ }
+
+ /**
* Get the map of all slices (sliceName->Slice) for this collection.
*/
public Map<String, Slice> getSlicesMap() {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e5d12f30/solr/solrj/src/java/org/apache/solr/common/cloud/HashBasedRouter.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/HashBasedRouter.java b/solr/solrj/src/java/org/apache/solr/common/cloud/HashBasedRouter.java
index 5e19d38..07f2d35 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/HashBasedRouter.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/HashBasedRouter.java
@@ -59,7 +59,8 @@ public abstract class HashBasedRouter extends DocRouter {
}
protected Slice hashToSlice(int hash, DocCollection collection) {
- for (Slice slice : collection.getActiveSlices()) {
+ final Slice[] slices = collection.getActiveSlicesArr();
+ for (Slice slice : slices) {
Range range = slice.getRange();
if (range != null && range.includes(hash)) return slice;
}