You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2018/09/27 16:40:51 UTC
[2/2] lucene-solr:master: SOLR-12709: Add TestSimExtremeIndexing for
testing simulated large indexing jobs. Several important improvements to the
simulator.
SOLR-12709: Add TestSimExtremeIndexing for testing simulated large indexing jobs.
Several important improvements to the simulator.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/2369c896
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/2369c896
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/2369c896
Branch: refs/heads/master
Commit: 2369c8963412773592098475bdd8af1da81e3ac5
Parents: c587410
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Thu Sep 27 12:12:54 2018 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Thu Sep 27 14:49:43 2018 +0200
----------------------------------------------------------------------
.../cloud/autoscaling/ComputePlanAction.java | 8 +-
.../cloud/autoscaling/IndexSizeTrigger.java | 12 +-
.../apache/solr/metrics/SolrMetricManager.java | 2 +-
.../org/apache/solr/cloud/CloudTestUtils.java | 6 +-
.../cloud/autoscaling/IndexSizeTriggerTest.java | 10 +-
.../cloud/autoscaling/sim/SimCloudManager.java | 66 +-
.../sim/SimClusterStateProvider.java | 680 ++++++++++++++-----
.../autoscaling/sim/SimDistribStateManager.java | 7 +
.../autoscaling/sim/SimNodeStateProvider.java | 21 +-
.../sim/TestSimExecutePlanAction.java | 4 +-
.../autoscaling/sim/TestSimExtremeIndexing.java | 163 +++++
.../autoscaling/sim/TestSimNodeLostTrigger.java | 2 +-
.../autoscaling/sim/TestSimPolicyCloud.java | 10 +-
.../sim/TestSimTriggerIntegration.java | 8 +-
.../client/solrj/cloud/autoscaling/Policy.java | 35 +-
.../solrj/cloud/autoscaling/PolicyHelper.java | 4 +-
.../solrj/cloud/autoscaling/ReplicaInfo.java | 4 +-
17 files changed, 808 insertions(+), 234 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2369c896/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
index 6bad63d..7103bf5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
@@ -136,7 +136,7 @@ public class ComputePlanAction extends TriggerActionBase {
continue;
}
}
- log.info("Computed Plan: {}", operation.getParams());
+ log.debug("Computed Plan: {}", operation.getParams());
if (!collections.isEmpty()) {
String coll = operation.getParams().get(CoreAdminParams.COLLECTION);
if (coll != null && !collections.contains(coll)) {
@@ -175,7 +175,11 @@ public class ComputePlanAction extends TriggerActionBase {
clusterState.forEachCollection(coll -> {
Integer rf = coll.getReplicationFactor();
if (rf == null) {
- rf = coll.getReplicas().size() / coll.getSlices().size();
+ if (coll.getSlices().isEmpty()) {
+ rf = 1; // ???
+ } else {
+ rf = coll.getReplicas().size() / coll.getSlices().size();
+ }
}
totalRF.addAndGet(rf * coll.getSlices().size());
});
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2369c896/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java
index 3f2ea8a..6129cc7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -431,8 +432,15 @@ public class IndexSizeTrigger extends TriggerBase {
Map<String, List<ReplicaInfo>> belowSize) {
super(TriggerEventType.INDEXSIZE, source, eventTime, null);
properties.put(TriggerEvent.REQUESTED_OPS, ops);
- properties.put(ABOVE_SIZE_PROP, aboveSize);
- properties.put(BELOW_SIZE_PROP, belowSize);
+ // avoid passing very large amounts of data here - just use replica names
+ TreeMap<String, String> above = new TreeMap<>();
+ aboveSize.forEach((coll, replicas) ->
+ replicas.forEach(r -> above.put(r.getCore(), "docs=" + r.getVariable(DOCS_SIZE_PROP) + ", bytes=" + r.getVariable(BYTES_SIZE_PROP))));
+ properties.put(ABOVE_SIZE_PROP, above);
+ TreeMap<String, String> below = new TreeMap<>();
+ belowSize.forEach((coll, replicas) ->
+ replicas.forEach(r -> below.put(r.getCore(), "docs=" + r.getVariable(DOCS_SIZE_PROP) + ", bytes=" + r.getVariable(BYTES_SIZE_PROP))));
+ properties.put(BELOW_SIZE_PROP, below);
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2369c896/solr/core/src/java/org/apache/solr/metrics/SolrMetricManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/metrics/SolrMetricManager.java b/solr/core/src/java/org/apache/solr/metrics/SolrMetricManager.java
index f1b7923..e9cb111 100644
--- a/solr/core/src/java/org/apache/solr/metrics/SolrMetricManager.java
+++ b/solr/core/src/java/org/apache/solr/metrics/SolrMetricManager.java
@@ -791,7 +791,7 @@ public class SolrMetricManager {
*/
public static String getRegistryName(SolrInfoBean.Group group, String... names) {
String fullName;
- String prefix = REGISTRY_NAME_PREFIX + group.toString() + ".";
+ String prefix = new StringBuilder(REGISTRY_NAME_PREFIX).append(group.name()).append('.').toString();
// check for existing prefix and group
if (names != null && names.length > 0 && names[0] != null && names[0].startsWith(prefix)) {
// assume the first segment already was expanded
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2369c896/solr/core/src/test/org/apache/solr/cloud/CloudTestUtils.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/CloudTestUtils.java b/solr/core/src/test/org/apache/solr/cloud/CloudTestUtils.java
index b67b551..eb50b96 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CloudTestUtils.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CloudTestUtils.java
@@ -109,7 +109,7 @@ public class CloudTestUtils {
log.trace("-- still not matching predicate: {}", state);
}
}
- throw new TimeoutException("last state: " + coll);
+ throw new TimeoutException("last ClusterState: " + state + ", last coll state: " + coll);
}
/**
@@ -141,13 +141,13 @@ public class CloudTestUtils {
}
Collection<Slice> slices = withInactive ? collectionState.getSlices() : collectionState.getActiveSlices();
if (slices.size() != expectedShards) {
- log.trace("-- wrong number of active slices, expected={}, found={}", expectedShards, collectionState.getSlices().size());
+ log.trace("-- wrong number of slices, expected={}, found={}: {}", expectedShards, collectionState.getSlices().size(), collectionState.getSlices());
return false;
}
Set<String> leaderless = new HashSet<>();
for (Slice slice : slices) {
int activeReplicas = 0;
- if (requireLeaders && slice.getLeader() == null) {
+ if (requireLeaders && slice.getState() != Slice.State.INACTIVE && slice.getLeader() == null) {
leaderless.add(slice.getName());
continue;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2369c896/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java
index faabda1..fd93d03 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java
@@ -93,7 +93,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
configureCluster(2)
.addConfig("conf", configset("cloud-minimal"))
.configure();
- if (random().nextBoolean() || true) {
+ if (random().nextBoolean()) {
cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
solrClient = cluster.getSolrClient();
loader = cluster.getJettySolrRunner(0).getCoreContainer().getResourceLoader();
@@ -190,7 +190,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
assertNotNull("should have fired an event", ev);
List<TriggerEvent.Op> ops = (List<TriggerEvent.Op>) ev.getProperty(TriggerEvent.REQUESTED_OPS);
assertNotNull("should contain requestedOps", ops);
- assertEquals("number of ops", 2, ops.size());
+ assertEquals("number of ops: " + ops, 2, ops.size());
boolean shard1 = false;
boolean shard2 = false;
for (TriggerEvent.Op op : ops) {
@@ -361,7 +361,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
CloudTestUtils.waitForState(cloudManager, "failed to create " + collectionName, collectionName,
CloudTestUtils.clusterShape(2, 2, false, true));
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < 20; i++) {
SolrInputDocument doc = new SolrInputDocument("id", "id-" + (i * 100));
solrClient.add(collectionName, doc);
}
@@ -412,7 +412,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
assertEquals(response.get("result").toString(), "success");
// delete some docs to trigger a merge
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < 15; i++) {
solrClient.deleteById(collectionName, "id-" + (i * 100));
}
solrClient.commit(collectionName);
@@ -425,7 +425,7 @@ public class IndexSizeTriggerTest extends SolrCloudTestCase {
"}";
req = createAutoScalingRequest(SolrRequest.METHOD.POST, resumeTriggerCommand);
response = solrClient.request(req);
- assertEquals(response.get("result").toString(), "success");
+ assertEquals("success", response.get("result").toString());
timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2369c896/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
index 1f0b6cf..53e2c7e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
@@ -31,12 +31,14 @@ import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import com.carrotsearch.randomizedtesting.RandomizedContext;
import com.codahale.metrics.jvm.ClassLoadingGaugeSet;
import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
@@ -50,6 +52,7 @@ import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.NodeStateProvider;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
+import org.apache.solr.client.solrj.cloud.autoscaling.Variable;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@@ -118,6 +121,7 @@ public class SimCloudManager implements SolrCloudManager {
private final String metricTag;
private final List<SolrInputDocument> systemColl = Collections.synchronizedList(new ArrayList<>());
+ private final Map<String, Map<String, AtomicInteger>> eventCounts = new ConcurrentHashMap<>();
private final MockSearchableSolrClient solrClient;
private final Map<String, AtomicLong> opCounts = new ConcurrentSkipListMap<>();
@@ -129,9 +133,11 @@ public class SimCloudManager implements SolrCloudManager {
private MetricsHandler metricsHandler;
private MetricsHistoryHandler metricsHistoryHandler;
private TimeSource timeSource;
+ private boolean useSystemCollection = true;
private static int nodeIdPort = 10000;
- public static int DEFAULT_DISK = 1024; // 1000 GiB
+ public static int DEFAULT_FREE_DISK = 1024; // 1000 GiB
+ public static int DEFAULT_TOTAL_DISK = 10240; // 10 TiB
public static long DEFAULT_IDX_SIZE_BYTES = 10240; // 10 kiB
/**
@@ -201,7 +207,14 @@ public class SimCloudManager implements SolrCloudManager {
request = new QueryRequest(params);
} else {
// search request
- return super.request(request, collection);
+ if (collection.equals(CollectionAdminParams.SYSTEM_COLL)) {
+ return super.request(request, collection);
+ } else {
+ // forward it
+ ModifiableSolrParams params = new ModifiableSolrParams(request.getParams());
+ params.set("collection", collection);
+ request = new QueryRequest(params);
+ }
}
} else {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "when collection != null only UpdateRequest and QueryRequest are supported: request=" + request + ", collection=" + collection);
@@ -306,7 +319,8 @@ public class SimCloudManager implements SolrCloudManager {
values.put(ImplicitSnitch.PORT, port);
values.put(ImplicitSnitch.NODE, nodeId);
values.put(ImplicitSnitch.CORES, 0);
- values.put(ImplicitSnitch.DISK, DEFAULT_DISK);
+ values.put(ImplicitSnitch.DISK, DEFAULT_FREE_DISK);
+ values.put(Variable.Type.TOTALDISK.tagName, DEFAULT_TOTAL_DISK);
values.put(ImplicitSnitch.SYSLOADAVG, 1.0);
values.put(ImplicitSnitch.HEAPUSAGE, 123450000);
values.put("sysprop.java.version", System.getProperty("java.version"));
@@ -353,7 +367,13 @@ public class SimCloudManager implements SolrCloudManager {
Set<String> deadNodes = getSimNodeStateProvider().simGetDeadNodes();
sb.append("## Dead nodes:\t\t" + deadNodes.size() + "\n");
deadNodes.forEach(n -> sb.append("##\t\t" + n + "\n"));
- sb.append("## Collections:\t" + getSimClusterStateProvider().simListCollections() + "\n");
+ sb.append("## Collections:\n");
+ clusterStateProvider.simGetCollectionStats().forEach((coll, stats) -> {
+ sb.append("## * ").append(coll).append('\n');
+ stats.forEach((k, v) -> {
+ sb.append("## " + k + "\t" + v + "\n");
+ });
+ });
if (withCollections) {
ClusterState state = clusterStateProvider.getClusterState();
state.forEachCollection(coll -> sb.append(coll.toString() + "\n"));
@@ -386,6 +406,13 @@ public class SimCloudManager implements SolrCloudManager {
}
/**
+ * Get the source of randomness (usually initialized by the test suite).
+ */
+ public Random getRandom() {
+ return RandomizedContext.current().getRandom();
+ }
+
+ /**
* Add a new node and initialize its node values (metrics). The
* /live_nodes list is updated with the new node id.
* @return new node id
@@ -448,6 +475,10 @@ public class SimCloudManager implements SolrCloudManager {
}
}
+ public void simSetUseSystemCollection(boolean useSystemCollection) {
+ this.useSystemCollection = useSystemCollection;
+ }
+
/**
* Clear the (simulated) .system collection.
*/
@@ -464,17 +495,7 @@ public class SimCloudManager implements SolrCloudManager {
}
public Map<String, Map<String, AtomicInteger>> simGetEventCounts() {
- TreeMap<String, Map<String, AtomicInteger>> counts = new TreeMap<>();
- synchronized (systemColl) {
- for (SolrInputDocument d : systemColl) {
- if (!"autoscaling_event".equals(d.getFieldValue("type"))) {
- continue;
- }
- counts.computeIfAbsent((String)d.getFieldValue("event.source_s"), s -> new TreeMap<>())
- .computeIfAbsent((String)d.getFieldValue("stage_s"), s -> new AtomicInteger())
- .incrementAndGet();
- }
- }
+ TreeMap<String, Map<String, AtomicInteger>> counts = new TreeMap<>(eventCounts);
return counts;
}
@@ -705,6 +726,9 @@ public class SimCloudManager implements SolrCloudManager {
rsp.setResponse(queryResponse.getValues());
log.trace("-- response: {}", rsp);
return rsp;
+ } else if (req instanceof QueryRequest) {
+ incrementCount("query");
+ return clusterStateProvider.simQuery((QueryRequest)req);
}
}
if (req instanceof UpdateRequest) {
@@ -715,7 +739,17 @@ public class SimCloudManager implements SolrCloudManager {
if (collection == null || collection.equals(CollectionAdminParams.SYSTEM_COLL)) {
List<SolrInputDocument> docs = ureq.getDocuments();
if (docs != null) {
- systemColl.addAll(docs);
+ if (useSystemCollection) {
+ systemColl.addAll(docs);
+ }
+ for (SolrInputDocument d : docs) {
+ if (!"autoscaling_event".equals(d.getFieldValue("type"))) {
+ continue;
+ }
+ eventCounts.computeIfAbsent((String)d.getFieldValue("event.source_s"), s -> new TreeMap<>())
+ .computeIfAbsent((String)d.getFieldValue("stage_s"), s -> new AtomicInteger())
+ .incrementAndGet();
+ }
}
return new UpdateResponse();
} else {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2369c896/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
index 08ce6bf..18e20e0 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
@@ -27,20 +27,25 @@ import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
+import com.google.common.util.concurrent.AtomicDouble;
+import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
@@ -51,7 +56,9 @@ import org.apache.solr.client.solrj.cloud.autoscaling.Variable;
import org.apache.solr.client.solrj.cloud.autoscaling.Variable.Type;
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
+import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.cloud.ActionThrottle;
import org.apache.solr.cloud.CloudTestUtils;
@@ -65,6 +72,7 @@ import org.apache.solr.cloud.api.collections.SplitShardCmd;
import org.apache.solr.cloud.overseer.ClusterStateMutator;
import org.apache.solr.cloud.overseer.CollectionMutator;
import org.apache.solr.cloud.overseer.ZkWriteCommand;
+import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
@@ -79,6 +87,7 @@ import org.apache.solr.common.cloud.rule.ImplicitSnitch;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CommonAdminParams;
+import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
@@ -118,11 +127,14 @@ public class SimClusterStateProvider implements ClusterStateProvider {
public static final long DEFAULT_DOC_SIZE_BYTES = 500;
+ private static final String BUFFERED_UPDATES = "__buffered_updates__";
+
private final LiveNodesSet liveNodes;
private final SimDistribStateManager stateManager;
private final SimCloudManager cloudManager;
private final Map<String, List<ReplicaInfo>> nodeReplicaMap = new ConcurrentHashMap<>();
+ private final Map<String, Map<String, List<ReplicaInfo>>> colShardReplicaMap = new ConcurrentHashMap<>();
private final Map<String, Object> clusterProperties = new ConcurrentHashMap<>();
private final Map<String, Map<String, Object>> collProperties = new ConcurrentHashMap<>();
private final Map<String, Map<String, Map<String, Object>>> sliceProperties = new ConcurrentHashMap<>();
@@ -145,6 +157,8 @@ public class SimClusterStateProvider implements ClusterStateProvider {
private AtomicReference<Map<String, DocCollection>> collectionsStatesRef = new AtomicReference<>();
private AtomicBoolean saveClusterState = new AtomicBoolean();
+ private Random bulkUpdateRandom = new Random(0);
+
private transient boolean closed;
/**
@@ -228,6 +242,14 @@ public class SimClusterStateProvider implements ClusterStateProvider {
/**
* Get random node id.
+ * @return one of the live nodes
+ */
+ public String simGetRandomNode() {
+ return simGetRandomNode(cloudManager.getRandom());
+ }
+
+ /**
+ * Get random node id.
* @param random instance of random.
* @return one of the live nodes
*/
@@ -506,12 +528,15 @@ public class SimClusterStateProvider implements ClusterStateProvider {
replicaInfo.getVariables().put(ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
// add a property expected in Policy calculations, if missing
if (replicaInfo.getVariable(Type.CORE_IDX.metricsAttribute) == null) {
- replicaInfo.getVariables().put(Type.CORE_IDX.metricsAttribute, SimCloudManager.DEFAULT_IDX_SIZE_BYTES);
+ replicaInfo.getVariables().put(Type.CORE_IDX.metricsAttribute, new AtomicLong(SimCloudManager.DEFAULT_IDX_SIZE_BYTES));
replicaInfo.getVariables().put(Variable.coreidxsize,
- Type.CORE_IDX.convertVal(SimCloudManager.DEFAULT_IDX_SIZE_BYTES));
+ new AtomicDouble((Double)Type.CORE_IDX.convertVal(SimCloudManager.DEFAULT_IDX_SIZE_BYTES)));
}
replicas.add(replicaInfo);
+ colShardReplicaMap.computeIfAbsent(replicaInfo.getCollection(), c -> new ConcurrentHashMap<>())
+ .computeIfAbsent(replicaInfo.getShard(), s -> new ArrayList<>())
+ .add(replicaInfo);
Map<String, Object> values = cloudManager.getSimNodeStateProvider().simGetAllNodeValues()
.computeIfAbsent(nodeId, id -> new ConcurrentHashMap<>(SimCloudManager.createNodeValues(id)));
@@ -523,7 +548,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.CORES, cores + 1);
Integer disk = (Integer)values.get(ImplicitSnitch.DISK);
if (disk == null) {
- disk = SimCloudManager.DEFAULT_DISK;
+ disk = SimCloudManager.DEFAULT_FREE_DISK;
}
cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.DISK, disk - 1);
// fake metrics
@@ -533,7 +558,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
cloudManager.getMetricManager().registry(registry).counter("UPDATE./update.requests");
cloudManager.getMetricManager().registry(registry).counter("QUERY./select.requests");
cloudManager.getMetricManager().registerGauge(null, registry,
- () -> replicaInfo.getVariable(Type.CORE_IDX.metricsAttribute),
+ () -> ((Number)replicaInfo.getVariable(Type.CORE_IDX.metricsAttribute)).longValue(),
"", true, "INDEX.sizeInBytes");
// at this point nuke our cached DocCollection state
collectionsStatesRef.set(null);
@@ -559,6 +584,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
for (int i = 0; i < replicas.size(); i++) {
if (coreNodeName.equals(replicas.get(i).getName())) {
ReplicaInfo ri = replicas.remove(i);
+ colShardReplicaMap.computeIfAbsent(ri.getCollection(), c -> new ConcurrentHashMap<>())
+ .computeIfAbsent(ri.getShard(), s -> new ArrayList<>())
+ .remove(ri);
collectionsStatesRef.set(null);
opDelay(ri.getCollection(), CollectionParams.CollectionAction.DELETEREPLICA.name());
@@ -598,6 +626,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
int version = oldData != null ? oldData.getVersion() : -1;
Assert.assertEquals(clusterStateVersion, version + 1);
stateManager.setData(ZkStateReader.CLUSTER_STATE, data, version);
+ log.debug("** saved cluster state version " + version);
clusterStateVersion++;
} catch (Exception e) {
throw new IOException(e);
@@ -635,15 +664,22 @@ public class SimClusterStateProvider implements ClusterStateProvider {
return;
}
dc.getSlices().forEach(s -> {
+ if (s.getState() == Slice.State.INACTIVE) {
+ log.trace("-- slice state is {}, skip leader election {} / {}", s.getState(), dc.getName(), s.getName());
+ return;
+ }
+ if (s.getState() != Slice.State.ACTIVE) {
+ log.trace("-- slice state is {}, but I will run leader election {} / {}", s.getState(), dc.getName(), s.getName());
+ }
if (s.getLeader() != null) {
- log.debug("-- already has leader {} / {}", dc.getName(), s.getName());
+ log.trace("-- already has leader {} / {}", dc.getName(), s.getName());
return;
}
if (s.getReplicas().isEmpty()) {
- log.debug("-- no replicas in {} / {}", dc.getName(), s.getName());
+ log.trace("-- no replicas in {} / {}", dc.getName(), s.getName());
return;
}
- log.debug("-- submit leader election for {} / {}", dc.getName(), s.getName());
+ log.trace("-- submit leader election for {} / {}", dc.getName(), s.getName());
cloudManager.submit(() -> {
simRunLeaderElection(dc.getName(), s, saveClusterState);
return true;
@@ -656,9 +692,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
AtomicBoolean stateChanged = new AtomicBoolean(Boolean.FALSE);
Replica leader = s.getLeader();
if (leader == null || !liveNodes.contains(leader.getNodeName())) {
- log.debug("Running leader election for {} / {}", collection, s.getName());
+ log.trace("Running leader election for {} / {}", collection, s.getName());
if (s.getReplicas().isEmpty()) { // no replicas - punt
- log.debug("-- no replicas in {} / {}", collection, s.getName());
+ log.trace("-- no replicas in {} / {}", collection, s.getName());
return;
}
ActionThrottle lt = getThrottle(collection, s.getName());
@@ -692,7 +728,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
});
if (alreadyHasLeader.get()) {
- log.debug("-- already has leader {} / {}: {}", collection, s.getName(), s);
+ log.trace("-- already has leader {} / {}: {}", collection, s.getName(), s);
return;
}
if (active.isEmpty()) {
@@ -718,11 +754,11 @@ public class SimClusterStateProvider implements ClusterStateProvider {
synchronized (ri) {
ri.getVariables().put(ZkStateReader.LEADER_PROP, "true");
}
+ log.debug("-- elected new leader for {} / {}: {}", collection, s.getName(), ri);
stateChanged.set(true);
- log.debug("-- elected new leader for " + collection + " / " + s.getName() + ": " + ri.getName());
}
} else {
- log.debug("-- already has leader for {} / {}", collection, s.getName());
+ log.trace("-- already has leader for {} / {}", collection, s.getName());
}
if (stateChanged.get() || saveState) {
collectionsStatesRef.set(null);
@@ -810,9 +846,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
collection.getReplicas().size() + 1);
try {
replicaProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
- replicaProps.put("SEARCHER.searcher.deletedDocs", 0);
- replicaProps.put("SEARCHER.searcher.numDocs", 0);
- replicaProps.put("SEARCHER.searcher.maxDoc", 0);
+ replicaProps.put("SEARCHER.searcher.deletedDocs", new AtomicLong(0));
+ replicaProps.put("SEARCHER.searcher.numDocs", new AtomicLong(0));
+ replicaProps.put("SEARCHER.searcher.maxDoc", new AtomicLong(0));
ReplicaInfo ri = new ReplicaInfo("core_node" + Assign.incAndGetId(stateManager, withCollection, 0),
coreName, withCollection, withCollectionShard, pos.type, pos.node, replicaProps);
cloudManager.submit(() -> {
@@ -833,9 +869,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
replicaNum.getAndIncrement());
try {
replicaProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
- replicaProps.put("SEARCHER.searcher.deletedDocs", 0);
- replicaProps.put("SEARCHER.searcher.numDocs", 0);
- replicaProps.put("SEARCHER.searcher.maxDoc", 0);
+ replicaProps.put("SEARCHER.searcher.deletedDocs", new AtomicLong(0));
+ replicaProps.put("SEARCHER.searcher.numDocs", new AtomicLong(0));
+ replicaProps.put("SEARCHER.searcher.maxDoc", new AtomicLong(0));
ReplicaInfo ri = new ReplicaInfo("core_node" + Assign.incAndGetId(stateManager, collectionName, 0),
coreName, collectionName, pos.shard, pos.type, pos.node, replicaProps);
cloudManager.submit(() -> {
@@ -900,6 +936,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
collProperties.remove(collection);
sliceProperties.remove(collection);
leaderThrottles.remove(collection);
+ colShardReplicaMap.remove(collection);
opDelay(collection, CollectionParams.CollectionAction.DELETE.name());
@@ -942,6 +979,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
lock.lockInterruptibly();
try {
nodeReplicaMap.clear();
+ colShardReplicaMap.clear();
collProperties.clear();
sliceProperties.clear();
leaderThrottles.clear();
@@ -1086,12 +1124,24 @@ public class SimClusterStateProvider implements ClusterStateProvider {
sliceName.set(message.getStr(SHARD_ID_PROP));
String splitKey = message.getStr("split.key");
- // always invalidate cached collection states to get up-to-date metrics
- collectionsStatesRef.set(null);
-
ClusterState clusterState = getClusterState();
DocCollection collection = clusterState.getCollection(collectionName);
Slice parentSlice = SplitShardCmd.getParentSlice(clusterState, collectionName, sliceName, splitKey);
+ Replica leader = parentSlice.getLeader();
+ // XXX leader election may not have happened yet - should we require it?
+ if (leader == null) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Shard " + collectionName +
+ " / " + sliceName.get() + " has no leader and can't be split");
+ }
+ // start counting buffered updates
+ Map<String, Object> props = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
+ .computeIfAbsent(sliceName.get(), ss -> new ConcurrentHashMap<>());
+ if (props.containsKey(BUFFERED_UPDATES)) {
+ log.debug("--- SOLR-12729: Overlapping splitShard commands for {} / {}", collectionName, sliceName.get());
+ return;
+ }
+ props.put(BUFFERED_UPDATES, new AtomicLong());
+
List<DocRouter.Range> subRanges = new ArrayList<>();
List<String> subSlices = new ArrayList<>();
List<String> subShardNames = new ArrayList<>();
@@ -1117,12 +1167,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
if (sessionWrapper != null) sessionWrapper.release();
// adjust numDocs / deletedDocs / maxDoc
- Replica leader = parentSlice.getLeader();
- // XXX leader election may not have happened yet - should we require it?
- if (leader == null) {
- leader = parentSlice.getReplicas().iterator().next();
- }
- String numDocsStr = leader.getStr("SEARCHER.searcher.numDocs", "0");
+ String numDocsStr = String.valueOf(getReplicaInfo(leader).getVariable("SEARCHER.searcher.numDocs", "0"));
long numDocs = Long.parseLong(numDocsStr);
long newNumDocs = numDocs / subSlices.size();
long remainderDocs = numDocs % subSlices.size();
@@ -1130,10 +1175,23 @@ public class SimClusterStateProvider implements ClusterStateProvider {
long remainderIndexSize = SimCloudManager.DEFAULT_IDX_SIZE_BYTES + remainderDocs * DEFAULT_DOC_SIZE_BYTES;
String remainderSlice = null;
+ // add slice props
+ for (int i = 0; i < subRanges.size(); i++) {
+ String subSlice = subSlices.get(i);
+ DocRouter.Range range = subRanges.get(i);
+ Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
+ .computeIfAbsent(subSlice, ss -> new ConcurrentHashMap<>());
+ sliceProps.put(Slice.RANGE, range);
+ sliceProps.put(Slice.PARENT, sliceName.get());
+ sliceProps.put(ZkStateReader.STATE_PROP, Slice.State.CONSTRUCTION.toString());
+ sliceProps.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs()));
+ }
+ // add replicas
for (ReplicaPosition replicaPosition : replicaPositions) {
String subSliceName = replicaPosition.shard;
String subShardNodeName = replicaPosition.node;
- String solrCoreName = collectionName + "_" + subSliceName + "_replica" + (replicaPosition.index);
+// String solrCoreName = collectionName + "_" + subSliceName + "_replica_n" + (replicaPosition.index);
+ String solrCoreName = Assign.buildSolrCoreName(collectionName, subSliceName, replicaPosition.type, Assign.incAndGetId(stateManager, collectionName, 0));
Map<String, Object> replicaProps = new HashMap<>();
replicaProps.put(ZkStateReader.SHARD_ID_PROP, replicaPosition.shard);
replicaProps.put(ZkStateReader.NODE_NAME_PROP, replicaPosition.node);
@@ -1149,43 +1207,75 @@ public class SimClusterStateProvider implements ClusterStateProvider {
replicasNumDocs += remainderDocs;
replicasIndexSize += remainderIndexSize;
}
- replicaProps.put("SEARCHER.searcher.numDocs", replicasNumDocs);
- replicaProps.put("SEARCHER.searcher.maxDoc", replicasNumDocs);
- replicaProps.put("SEARCHER.searcher.deletedDocs", 0);
- replicaProps.put(Type.CORE_IDX.metricsAttribute, replicasIndexSize);
- replicaProps.put(Variable.coreidxsize, Type.CORE_IDX.convertVal(replicasIndexSize));
+ replicaProps.put("SEARCHER.searcher.numDocs", new AtomicLong(replicasNumDocs));
+ replicaProps.put("SEARCHER.searcher.maxDoc", new AtomicLong(replicasNumDocs));
+ replicaProps.put("SEARCHER.searcher.deletedDocs", new AtomicLong(0));
+ replicaProps.put(Type.CORE_IDX.metricsAttribute, new AtomicLong(replicasIndexSize));
+ replicaProps.put(Variable.coreidxsize, new AtomicDouble((Double)Type.CORE_IDX.convertVal(replicasIndexSize)));
ReplicaInfo ri = new ReplicaInfo("core_node" + Assign.incAndGetId(stateManager, collectionName, 0),
solrCoreName, collectionName, replicaPosition.shard, replicaPosition.type, subShardNodeName, replicaProps);
simAddReplica(replicaPosition.node, ri, false);
}
- // mark the old slice as inactive
+ simRunLeaderElection(Collections.singleton(collectionName), true);
+
+ // delay it once again to better simulate replica recoveries
+ //opDelay(collectionName, CollectionParams.CollectionAction.SPLITSHARD.name());
+
+ CloudTestUtils.waitForState(cloudManager, collectionName, 30, TimeUnit.SECONDS, (liveNodes, state) -> {
+ for (String subSlice : subSlices) {
+ Slice s = state.getSlice(subSlice);
+ if (s.getLeader() == null) {
+ log.debug("** no leader in {} / {}", collectionName, s);
+ return false;
+ }
+ if (s.getReplicas().size() < repFactor) {
+ log.debug("** expected {} repFactor but there are {} replicas", repFactor, s.getReplicas().size());
+ return false;
+ }
+ }
+ return true;
+ });
+ // mark the new slices as active and the old slice as inactive
+ log.trace("-- switching slice states after split shard: collection={}, parent={}, subSlices={}", collectionName,
+ sliceName.get(), subSlices);
lock.lockInterruptibly();
try {
- Map<String, Object> props = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
+ Map<String, Object> sProps = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
.computeIfAbsent(sliceName.get(), s -> new ConcurrentHashMap<>());
- props.put(ZkStateReader.STATE_PROP, Slice.State.INACTIVE.toString());
- props.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs()));
+ sProps.put(ZkStateReader.STATE_PROP, Slice.State.INACTIVE.toString());
+ sProps.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs()));
+ AtomicLong bufferedUpdates = (AtomicLong)sProps.remove(BUFFERED_UPDATES);
+ if (bufferedUpdates.get() > 0) {
+ // apply buffered updates
+ long perShard = bufferedUpdates.get() / subSlices.size();
+ long remainder = bufferedUpdates.get() % subSlices.size();
+ log.debug("-- applying {} buffered docs from {} / {}, perShard={}, remainder={}", bufferedUpdates.get(),
+ collectionName, parentSlice.getName(), perShard, remainder);
+ for (int i = 0; i < subSlices.size(); i++) {
+ String sub = subSlices.get(i);
+ long numUpdates = perShard;
+ if (i == 0) {
+ numUpdates += remainder;
+ }
+ simSetShardValue(collectionName, sub, "SEARCHER.searcher.numDocs", numUpdates, true, false);
+ simSetShardValue(collectionName, sub, "SEARCHER.searcher.maxDoc", numUpdates, true, false);
+ }
+ }
// XXX also mark replicas as down? currently SplitShardCmd doesn't do this
+ for (String s : subSlices) {
+ Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
+ .computeIfAbsent(s, ss -> new ConcurrentHashMap<>());
+ sliceProps.put(ZkStateReader.STATE_PROP, Slice.State.ACTIVE.toString());
+ sliceProps.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs()));
+ }
+
// invalidate cached state
collectionsStatesRef.set(null);
} finally {
lock.unlock();
}
- // add slice props
- for (int i = 0; i < subRanges.size(); i++) {
- String subSlice = subSlices.get(i);
- DocRouter.Range range = subRanges.get(i);
- Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
- .computeIfAbsent(subSlice, ss -> new ConcurrentHashMap<>());
- sliceProps.put(Slice.RANGE, range);
- sliceProps.put(Slice.PARENT, sliceName.get());
- sliceProps.put(ZkStateReader.STATE_PROP, Slice.State.ACTIVE.toString());
- sliceProps.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs()));
- }
- collectionsStatesRef.set(null);
- simRunLeaderElection(Collections.singleton(collectionName), true);
results.add("success", "");
}
@@ -1216,7 +1306,8 @@ public class SimClusterStateProvider implements ClusterStateProvider {
lock.lockInterruptibly();
try {
- sliceProperties.computeIfAbsent(collectionName, coll -> new ConcurrentHashMap<>()).remove(sliceName);
+ sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>()).remove(sliceName);
+ colShardReplicaMap.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>()).remove(sliceName);
nodeReplicaMap.forEach((n, replicas) -> {
Iterator<ReplicaInfo> it = replicas.iterator();
while (it.hasNext()) {
@@ -1237,7 +1328,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
public void createSystemCollection() throws IOException {
try {
- if (simListCollections().contains(CollectionAdminParams.SYSTEM_COLL)) {
+ if (colShardReplicaMap.containsKey(CollectionAdminParams.SYSTEM_COLL)) {
return;
}
ZkNodeProps props = new ZkNodeProps(
@@ -1278,7 +1369,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
if (collection == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection not set");
}
- if (!simListCollections().contains(collection)) {
+ if (!colShardReplicaMap.containsKey(collection)) {
if (CollectionAdminParams.SYSTEM_COLL.equals(collection)) {
// auto-create
createSystemCollection();
@@ -1286,126 +1377,257 @@ public class SimClusterStateProvider implements ClusterStateProvider {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection '" + collection + "' doesn't exist");
}
}
- // always reset first to get the current metrics - it's easier than to keep matching
- // Replica with ReplicaInfo where the current real counts are stored
- collectionsStatesRef.set(null);
+
DocCollection coll = getClusterState().getCollection(collection);
DocRouter router = coll.getRouter();
-
- boolean modified = false;
-
- lock.lockInterruptibly();
- try {
- List<String> deletes = req.getDeleteById();
- if (deletes != null && !deletes.isEmpty()) {
- for (String id : deletes) {
- Slice s = router.getTargetSlice(id, null, null, req.getParams(), coll);
- // NOTE: we don't use getProperty because it uses PROPERTY_PROP_PREFIX
+ List<String> deletes = req.getDeleteById();
+ if (deletes != null && !deletes.isEmpty()) {
+ for (String id : deletes) {
+ Slice s = router.getTargetSlice(id, null, null, req.getParams(), coll);
+ Replica leader = s.getLeader();
+ if (leader == null) {
+ log.debug("-- no leader in " + s);
+ continue;
+ }
+ cloudManager.getMetricManager().registry(createRegistryName(collection, s.getName(), leader)).counter("UPDATE./update.requests").inc();
+ ReplicaInfo ri = getReplicaInfo(leader);
+ Number numDocs = (Number)ri.getVariable("SEARCHER.searcher.numDocs");
+ if (numDocs == null || numDocs.intValue() <= 0) {
+ log.debug("-- attempting to delete nonexistent doc " + id + " from " + s.getLeader());
+ continue;
+ }
+ AtomicLong bufferedUpdates = (AtomicLong)sliceProperties.get(collection).get(s.getName()).get(BUFFERED_UPDATES);
+ if (bufferedUpdates != null) {
+ if (bufferedUpdates.get() > 0) {
+ bufferedUpdates.decrementAndGet();
+ } else {
+ log.debug("-- attempting to delete nonexistent buffered doc " + id + " from " + s.getLeader());
+ }
+ continue;
+ }
+ lock.lockInterruptibly();
+ try {
+ simSetShardValue(collection, s.getName(), "SEARCHER.searcher.deletedDocs", 1, true, false);
+ simSetShardValue(collection, s.getName(), "SEARCHER.searcher.numDocs", -1, true, false);
+ Number indexSize = (Number)ri.getVariable(Type.CORE_IDX.metricsAttribute);
+ if (indexSize != null && indexSize.longValue() > SimCloudManager.DEFAULT_IDX_SIZE_BYTES) {
+ indexSize = indexSize.longValue() - DEFAULT_DOC_SIZE_BYTES;
+ simSetShardValue(collection, s.getName(), Type.CORE_IDX.metricsAttribute,
+ new AtomicLong(indexSize.longValue()), false, false);
+ simSetShardValue(collection, s.getName(), Variable.coreidxsize,
+ new AtomicDouble((Double)Type.CORE_IDX.convertVal(indexSize)), false, false);
+ } else {
+ throw new Exception("unexpected indexSize ri=" + ri);
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+ deletes = req.getDeleteQuery();
+ if (deletes != null && !deletes.isEmpty()) {
+ for (String q : deletes) {
+ if (!"*:*".equals(q)) {
+ throw new UnsupportedOperationException("Only '*:*' query is supported in deleteByQuery");
+ }
+ for (Slice s : coll.getSlices()) {
Replica leader = s.getLeader();
if (leader == null) {
log.debug("-- no leader in " + s);
continue;
}
+
cloudManager.getMetricManager().registry(createRegistryName(collection, s.getName(), leader)).counter("UPDATE./update.requests").inc();
ReplicaInfo ri = getReplicaInfo(leader);
Number numDocs = (Number)ri.getVariable("SEARCHER.searcher.numDocs");
- if (numDocs == null || numDocs.intValue() <= 0) {
- log.debug("-- attempting to delete nonexistent doc " + id + " from " + s.getLeader());
+ if (numDocs == null || numDocs.intValue() == 0) {
continue;
}
- modified = true;
+ lock.lockInterruptibly();
try {
- simSetShardValue(collection, s.getName(), "SEARCHER.searcher.deletedDocs", 1, true, false);
- simSetShardValue(collection, s.getName(), "SEARCHER.searcher.numDocs", -1, true, false);
- Number indexSize = (Number)ri.getVariable(Type.CORE_IDX.metricsAttribute);
- if (indexSize != null && indexSize.longValue() > SimCloudManager.DEFAULT_IDX_SIZE_BYTES) {
- indexSize = indexSize.longValue() - DEFAULT_DOC_SIZE_BYTES;
- simSetShardValue(collection, s.getName(), Type.CORE_IDX.metricsAttribute,
- indexSize.intValue(), false, false);
- simSetShardValue(collection, s.getName(), Variable.coreidxsize,
- Type.CORE_IDX.convertVal(indexSize), false, false);
- } else {
- throw new Exception("unexpected indexSize ri=" + ri);
- }
+ simSetShardValue(collection, s.getName(), "SEARCHER.searcher.deletedDocs", new AtomicLong(numDocs.longValue()), false, false);
+ simSetShardValue(collection, s.getName(), "SEARCHER.searcher.numDocs", new AtomicLong(0), false, false);
+ simSetShardValue(collection, s.getName(), Type.CORE_IDX.metricsAttribute,
+ new AtomicLong(SimCloudManager.DEFAULT_IDX_SIZE_BYTES), false, false);
+ simSetShardValue(collection, s.getName(), Variable.coreidxsize,
+ new AtomicDouble((Double)Type.CORE_IDX.convertVal(SimCloudManager.DEFAULT_IDX_SIZE_BYTES)), false, false);
} catch (Exception e) {
throw new IOException(e);
+ } finally {
+ lock.unlock();
}
}
}
- deletes = req.getDeleteQuery();
- if (deletes != null && !deletes.isEmpty()) {
- for (String q : deletes) {
- if (!"*:*".equals(q)) {
- throw new UnsupportedOperationException("Only '*:*' query is supported in deleteByQuery");
+ }
+ List<SolrInputDocument> docs = req.getDocuments();
+ int docCount = 0;
+ Iterator<SolrInputDocument> it = null;
+ if (docs != null) {
+ docCount = docs.size();
+ } else {
+ it = req.getDocIterator();
+ if (it != null) {
+ while (it.hasNext()) {
+ it.next();
+ docCount++;
+ }
+ }
+ }
+ if (docCount > 0) {
+ // this approach to updating counters and metrics drastically increases performance
+ // of bulk updates, because simSetShardValue is relatively costly
+
+ Map<String, AtomicLong> docUpdates = new HashMap<>();
+ Map<String, Map<String, AtomicLong>> metricUpdates = new HashMap<>();
+
+ // XXX don't add more than 2bln docs in one request
+ boolean modified = false;
+ lock.lockInterruptibly();
+ try {
+ coll = getClusterState().getCollection(collection);
+ Slice[] slices = coll.getActiveSlicesArr();
+ if (slices.length == 0) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection without slices");
+ }
+ int[] perSlice = new int[slices.length];
+
+ if (it != null) {
+ // BULK UPDATE: simulate random doc assignment without actually calling DocRouter,
+ // which adds significant overhead
+
+ int totalAdded = 0;
+ for (int i = 0; i < slices.length; i++) {
+ Slice s = slices[i];
+ long count = (long) docCount * ((long) s.getRange().max - (long) s.getRange().min) / 0x100000000L;
+ perSlice[i] = (int) count;
+ totalAdded += perSlice[i];
+ }
+ // loss of precision due to integer math
+ int diff = docCount - totalAdded;
+ if (diff > 0) {
+ // spread the remainder more or less equally
+ int perRemain = diff / slices.length;
+ int remainder = diff % slices.length;
+ int remainderSlice = slices.length > 1 ? bulkUpdateRandom.nextInt(slices.length) : 0;
+ for (int i = 0; i < slices.length; i++) {
+ perSlice[i] += perRemain;
+ if (i == remainderSlice) {
+ perSlice[i] += remainder;
+ }
+ }
}
- for (Slice s : coll.getSlices()) {
+ for (int i = 0; i < slices.length; i++) {
+ Slice s = slices[i];
Replica leader = s.getLeader();
if (leader == null) {
log.debug("-- no leader in " + s);
continue;
}
-
- cloudManager.getMetricManager().registry(createRegistryName(collection, s.getName(), leader)).counter("UPDATE./update.requests").inc();
- ReplicaInfo ri = getReplicaInfo(leader);
- Number numDocs = (Number)ri.getVariable("SEARCHER.searcher.numDocs");
- if (numDocs == null || numDocs.intValue() == 0) {
+ metricUpdates.computeIfAbsent(s.getName(), sh -> new HashMap<>())
+ .computeIfAbsent(leader.getCoreName(), cn -> new AtomicLong())
+ .addAndGet(perSlice[i]);
+ modified = true;
+ AtomicLong bufferedUpdates = (AtomicLong)sliceProperties.get(collection).get(s.getName()).get(BUFFERED_UPDATES);
+ if (bufferedUpdates != null) {
+ bufferedUpdates.addAndGet(perSlice[i]);
+ continue;
+ }
+ docUpdates.computeIfAbsent(s.getName(), sh -> new AtomicLong())
+ .addAndGet(perSlice[i]);
+ }
+ } else {
+ // SMALL UPDATE: use exact assignment via DocRouter
+ for (SolrInputDocument doc : docs) {
+ String id = (String) doc.getFieldValue("id");
+ if (id == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Document without id: " + doc);
+ }
+ Slice s = coll.getRouter().getTargetSlice(id, doc, null, null, coll);
+ Replica leader = s.getLeader();
+ if (leader == null) {
+ log.debug("-- no leader in " + s);
continue;
}
+ metricUpdates.computeIfAbsent(s.getName(), sh -> new HashMap<>())
+ .computeIfAbsent(leader.getCoreName(), cn -> new AtomicLong())
+ .incrementAndGet();
modified = true;
- try {
- simSetShardValue(collection, s.getName(), "SEARCHER.searcher.deletedDocs", numDocs, false, false);
- simSetShardValue(collection, s.getName(), "SEARCHER.searcher.numDocs", 0, false, false);
- simSetShardValue(collection, s.getName(), Type.CORE_IDX.metricsAttribute,
- SimCloudManager.DEFAULT_IDX_SIZE_BYTES, false, false);
- simSetShardValue(collection, s.getName(), Variable.coreidxsize,
- Type.CORE_IDX.convertVal(SimCloudManager.DEFAULT_IDX_SIZE_BYTES), false, false);
- } catch (Exception e) {
- throw new IOException(e);
+ AtomicLong bufferedUpdates = (AtomicLong)sliceProperties.get(collection).get(s.getName()).get(BUFFERED_UPDATES);
+ if (bufferedUpdates != null) {
+ bufferedUpdates.incrementAndGet();
+ continue;
}
+ docUpdates.computeIfAbsent(s.getName(), sh -> new AtomicLong())
+ .incrementAndGet();
}
}
- }
- List<SolrInputDocument> docs = req.getDocuments();
- if (docs != null && !docs.isEmpty()) {
- for (SolrInputDocument doc : docs) {
- String id = (String) doc.getFieldValue("id");
- if (id == null) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Document without id: " + doc);
- }
- Slice s = router.getTargetSlice(id, null, null, req.getParams(), coll);
- Replica leader = s.getLeader();
- if (leader == null) {
- log.debug("-- no leader in " + s);
- continue;
- }
- cloudManager.getMetricManager().registry(createRegistryName(collection, s.getName(), leader)).counter("UPDATE./update.requests").inc();
- modified = true;
- try {
- simSetShardValue(collection, s.getName(), "SEARCHER.searcher.numDocs", 1, true, false);
- simSetShardValue(collection, s.getName(), "SEARCHER.searcher.maxDoc", 1, true, false);
- ReplicaInfo ri = getReplicaInfo(leader);
- Number indexSize = (Number)ri.getVariable(Type.CORE_IDX.metricsAttribute);
- // for each new document increase the size by DEFAULT_DOC_SIZE_BYTES
- indexSize = indexSize.longValue() + DEFAULT_DOC_SIZE_BYTES;
- simSetShardValue(collection, s.getName(), Type.CORE_IDX.metricsAttribute,
- indexSize.longValue(), false, false);
- simSetShardValue(collection, s.getName(), Variable.coreidxsize,
- Type.CORE_IDX.convertVal(indexSize), false, false);
- } catch (Exception e) {
- throw new IOException(e);
- }
+ if (modified) {
+ docUpdates.forEach((sh, count) -> {
+ try {
+ simSetShardValue(collection, sh, "SEARCHER.searcher.numDocs", count.get(), true, false);
+ simSetShardValue(collection, sh, "SEARCHER.searcher.maxDoc", count.get(), true, false);
+ // for each new document increase the size by DEFAULT_DOC_SIZE_BYTES
+ simSetShardValue(collection, sh, Type.CORE_IDX.metricsAttribute,
+ DEFAULT_DOC_SIZE_BYTES * count.get(), true, false);
+ simSetShardValue(collection, sh, Variable.coreidxsize,
+ Type.CORE_IDX.convertVal(DEFAULT_DOC_SIZE_BYTES * count.get()), true, false);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ metricUpdates.forEach((sh, cores) -> {
+ cores.forEach((core, count) -> {
+ String registry = SolrMetricManager.getRegistryName(SolrInfoBean.Group.core, collection, sh,
+ Utils.parseMetricsReplicaName(collection, core));
+ cloudManager.getMetricManager().registry(registry).counter("UPDATE./update.requests").inc(count.get());
+ });
+ });
}
+ } finally {
+ lock.unlock();
}
- if (modified) {
- collectionsStatesRef.set(null);
- }
- } finally {
- lock.unlock();
}
return new UpdateResponse();
}
+ public QueryResponse simQuery(QueryRequest req) throws SolrException, InterruptedException, IOException {
+ ensureNotClosed();
+ String collection = req.getCollection();
+ if (collection == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection not set");
+ }
+ if (!colShardReplicaMap.containsKey(collection)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection does not exist");
+ }
+ String query = req.getParams().get(CommonParams.Q);
+ if (query == null || !query.equals("*:*")) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Only '*:*' query is supported");
+ }
+ ClusterState clusterState = getClusterState();
+ DocCollection coll = clusterState.getCollection(collection);
+ AtomicLong count = new AtomicLong();
+ for (Slice s : coll.getActiveSlicesArr()) {
+ Replica r = s.getLeader();
+ if (r == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, collection + "/" + s.getName() + " has no leader");
+ }
+ ReplicaInfo ri = getReplicaInfo(r);
+ Number numDocs = (Number)ri.getVariable("SEARCHER.searcher.numDocs", 0L);
+ count.addAndGet(numDocs.longValue());
+ }
+ QueryResponse rsp = new QueryResponse();
+ NamedList<Object> values = new NamedList<>();
+ values.add("responseHeader", new NamedList<>());
+ SolrDocumentList docs = new SolrDocumentList();
+ docs.setNumFound(count.get());
+ values.add("response", docs);
+ rsp.setResponse(values);
+ return rsp;
+ }
+
private static String createRegistryName(String collection, String shard, Replica r) {
return SolrMetricManager.getRegistryName(SolrInfoBean.Group.core, collection, shard,
Utils.parseMetricsReplicaName(collection, r.getCoreName()));
@@ -1572,17 +1794,15 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* divided by the number of replicas.
*/
public void simSetShardValue(String collection, String shard, String key, Object value, boolean delta, boolean divide) throws Exception {
- List<ReplicaInfo> infos = new ArrayList<>();
- nodeReplicaMap.forEach((n, replicas) -> {
- replicas.forEach(r -> {
- if (r.getCollection().equals(collection)) {
- if (shard != null && !shard.equals(r.getShard())) {
- return;
- }
- infos.add(r);
- }
- });
- });
+ final List<ReplicaInfo> infos;
+ if (shard == null) {
+ infos = new ArrayList<>();
+ colShardReplicaMap.computeIfAbsent(collection, c -> new ConcurrentHashMap<>())
+ .forEach((sh, replicas) -> infos.addAll(replicas));
+ } else {
+ infos = colShardReplicaMap.computeIfAbsent(collection, c -> new ConcurrentHashMap<>())
+ .computeIfAbsent(shard, s -> new ArrayList<>());
+ }
if (infos.isEmpty()) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection " + collection + " doesn't exist (shard=" + shard + ").");
}
@@ -1602,22 +1822,50 @@ public class SimClusterStateProvider implements ClusterStateProvider {
Object prevValue = r.getVariables().get(key);
if (prevValue != null) {
if ((prevValue instanceof Number) && (value instanceof Number)) {
- if (((prevValue instanceof Long) || (prevValue instanceof Integer)) &&
+ if (((prevValue instanceof Long) || (prevValue instanceof Integer) ||
+ (prevValue instanceof AtomicLong) || (prevValue instanceof AtomicInteger)) &&
((value instanceof Long) || (value instanceof Integer))) {
- Long newValue = ((Number)prevValue).longValue() + ((Number)value).longValue();
- r.getVariables().put(key, newValue);
+ long newValue = ((Number)prevValue).longValue() + ((Number)value).longValue();
+ // minimize object allocations
+ if (prevValue instanceof AtomicLong) {
+ ((AtomicLong)prevValue).set(newValue);
+ } else if (prevValue instanceof AtomicInteger) {
+ ((AtomicInteger)prevValue).set(((Number)prevValue).intValue() + ((Number)value).intValue());
+ } else {
+ r.getVariables().put(key, newValue);
+ }
} else {
- Double newValue = ((Number)prevValue).doubleValue() + ((Number)value).doubleValue();
- r.getVariables().put(key, newValue);
+ double newValue = ((Number)prevValue).doubleValue() + ((Number)value).doubleValue();
+ if (prevValue instanceof AtomicDouble) {
+ ((AtomicDouble)prevValue).set(newValue);
+ } else {
+ r.getVariables().put(key, newValue);
+ }
}
} else {
throw new UnsupportedOperationException("delta cannot be applied to non-numeric values: " + prevValue + " and " + value);
}
} else {
- r.getVariables().put(key, value);
+ if (value instanceof Integer) {
+ r.getVariables().put(key, new AtomicInteger((Integer)value));
+ } else if (value instanceof Long) {
+ r.getVariables().put(key, new AtomicLong((Long)value));
+ } else if (value instanceof Double) {
+ r.getVariables().put(key, new AtomicDouble((Double)value));
+ } else {
+ r.getVariables().put(key, value);
+ }
}
} else {
- r.getVariables().put(key, value);
+ if (value instanceof Integer) {
+ r.getVariables().put(key, new AtomicInteger((Integer)value));
+ } else if (value instanceof Long) {
+ r.getVariables().put(key, new AtomicLong((Long)value));
+ } else if (value instanceof Double) {
+ r.getVariables().put(key, new AtomicDouble((Double)value));
+ } else {
+ r.getVariables().put(key, value);
+ }
}
}
}
@@ -1639,21 +1887,128 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
}
+ public List<ReplicaInfo> simGetReplicaInfos(String collection, String shard) {
+ List<ReplicaInfo> replicas = colShardReplicaMap.computeIfAbsent(collection, c -> new ConcurrentHashMap<>())
+ .computeIfAbsent(shard, s -> new ArrayList<>());
+ if (replicas == null) {
+ return Collections.emptyList();
+ } else {
+ // make a defensive copy to avoid ConcurrentModificationException
+ return Arrays.asList(replicas.toArray(new ReplicaInfo[replicas.size()]));
+ }
+ }
+
/**
* List collections.
* @return list of existing collections.
*/
public List<String> simListCollections() throws InterruptedException {
- final Set<String> collections = new HashSet<>();
+ return new ArrayList<>(colShardReplicaMap.keySet());
+ }
+
+ public Map<String, Map<String, Object>> simGetCollectionStats() throws IOException, InterruptedException {
+ Map<String, Map<String, Object>> stats = new TreeMap<>();
lock.lockInterruptibly();
try {
- nodeReplicaMap.forEach((n, replicas) -> {
- replicas.forEach(ri -> collections.add(ri.getCollection()));
+ collectionsStatesRef.set(null);
+ ClusterState state = getClusterState();
+ state.forEachCollection(coll -> {
+ Map<String, Object> perColl = new LinkedHashMap<>();
+ stats.put(coll.getName(), perColl);
+ perColl.put("shardsTotal", coll.getSlices().size());
+ Map<String, AtomicInteger> shardState = new TreeMap<>();
+ int noLeader = 0;
+
+ SummaryStatistics docs = new SummaryStatistics();
+ SummaryStatistics bytes = new SummaryStatistics();
+ SummaryStatistics inactiveDocs = new SummaryStatistics();
+ SummaryStatistics inactiveBytes = new SummaryStatistics();
+
+ long deletedDocs = 0;
+ long bufferedDocs = 0;
+ int totalReplicas = 0;
+ int activeReplicas = 0;
+
+ for (Slice s : coll.getSlices()) {
+ shardState.computeIfAbsent(s.getState().toString(), st -> new AtomicInteger())
+ .incrementAndGet();
+ totalReplicas += s.getReplicas().size();
+ if (s.getState() != Slice.State.ACTIVE) {
+ if (!s.getReplicas().isEmpty()) {
+ ReplicaInfo ri = getReplicaInfo(s.getReplicas().iterator().next());
+ if (ri != null) {
+ Number numDocs = (Number)ri.getVariable("SEARCHER.searcher.numDocs");
+ Number numBytes = (Number)ri.getVariable("INDEX.sizeInBytes");
+ if (numDocs != null) {
+ inactiveDocs.addValue(numDocs.doubleValue());
+ }
+ if (numBytes != null) {
+ inactiveBytes.addValue(numBytes.doubleValue());
+ }
+ }
+ }
+ continue;
+ }
+ AtomicLong buffered = (AtomicLong)sliceProperties.get(coll.getName()).get(s.getName()).get(BUFFERED_UPDATES);
+ if (buffered != null) {
+ bufferedDocs += buffered.get();
+ }
+ activeReplicas += s.getReplicas().size();
+ Replica leader = s.getLeader();
+ if (leader == null) {
+ noLeader++;
+ if (!s.getReplicas().isEmpty()) {
+ leader = s.getReplicas().iterator().next();
+ }
+ }
+ ReplicaInfo ri = null;
+ if (leader != null) {
+ ri = getReplicaInfo(leader);
+ if (ri == null) {
+ log.warn("Unknown ReplicaInfo for {}", leader);
+ }
+ }
+ if (ri != null) {
+ Number numDocs = (Number)ri.getVariable("SEARCHER.searcher.numDocs");
+ Number delDocs = (Number)ri.getVariable("SEARCHER.searcher.deleteDocs");
+ Number numBytes = (Number)ri.getVariable("INDEX.sizeInBytes");
+ if (numDocs != null) {
+ docs.addValue(numDocs.doubleValue());
+ }
+ if (delDocs != null) {
+ deletedDocs += delDocs.longValue();
+ }
+ if (numBytes != null) {
+ bytes.addValue(numBytes.doubleValue());
+ }
+ }
+ }
+ perColl.put("shardsState", shardState);
+ perColl.put(" shardsWithoutLeader", noLeader);
+ perColl.put("totalReplicas", totalReplicas);
+ perColl.put(" activeReplicas", activeReplicas);
+ perColl.put(" inactiveReplicas", totalReplicas - activeReplicas);
+ long totalDocs = (long)docs.getSum() + bufferedDocs;
+ perColl.put("totalActiveDocs", String.format(Locale.ROOT, "%,d", totalDocs));
+ perColl.put(" bufferedDocs", String.format(Locale.ROOT, "%,d", bufferedDocs));
+ perColl.put(" maxActiveSliceDocs", String.format(Locale.ROOT, "%,d", (long)docs.getMax()));
+ perColl.put(" minActiveSliceDocs", String.format(Locale.ROOT, "%,d", (long)docs.getMin()));
+ perColl.put(" avgActiveSliceDocs", String.format(Locale.ROOT, "%,.0f", docs.getMean()));
+ perColl.put("totalInactiveDocs", String.format(Locale.ROOT, "%,d", (long)inactiveDocs.getSum()));
+ perColl.put(" maxInactiveSliceDocs", String.format(Locale.ROOT, "%,d", (long)inactiveDocs.getMax()));
+ perColl.put(" minInactiveSliceDocs", String.format(Locale.ROOT, "%,d", (long)inactiveDocs.getMin()));
+ perColl.put(" avgInactiveSliceDocs", String.format(Locale.ROOT, "%,.0f", inactiveDocs.getMean()));
+ perColl.put("totalActiveBytes", String.format(Locale.ROOT, "%,d", (long)bytes.getSum()));
+ perColl.put(" maxActiveSliceBytes", String.format(Locale.ROOT, "%,d", (long)bytes.getMax()));
+ perColl.put(" minActiveSliceBytes", String.format(Locale.ROOT, "%,d", (long)bytes.getMin()));
+ perColl.put(" avgActiveSliceBytes", String.format(Locale.ROOT, "%,.0f", bytes.getMean()));
+ perColl.put("totalInactiveBytes", String.format(Locale.ROOT, "%,d", (long)inactiveBytes.getSum()));
+ perColl.put(" maxInactiveSliceBytes", String.format(Locale.ROOT, "%,d", (long)inactiveBytes.getMax()));
+ perColl.put(" minInactiveSliceBytes", String.format(Locale.ROOT, "%,d", (long)inactiveBytes.getMin()));
+ perColl.put(" avgInactiveSliceBytes", String.format(Locale.ROOT, "%,.0f", inactiveBytes.getMean()));
+ perColl.put("totalActiveDeletedDocs", String.format(Locale.ROOT, "%,d", deletedDocs));
});
- // check collProps and sliceProps too
- collProperties.forEach((coll, props) -> collections.add(coll));
- sliceProperties.forEach((coll, slices) -> collections.add(coll));
- return new ArrayList<>(collections);
+ return stats;
} finally {
lock.unlock();
}
@@ -1700,6 +2055,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
lock.lock();
collectionsStatesRef.set(null);
saveClusterState.set(true);
+ log.debug("** creating new collection states");
try {
Map<String, Map<String, Map<String, Replica>>> collMap = new HashMap<>();
nodeReplicaMap.forEach((n, replicas) -> {
@@ -1741,7 +2097,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
Map<String, Object> collProps = collProperties.computeIfAbsent(coll, c -> new ConcurrentHashMap<>());
Map<String, Object> routerProp = (Map<String, Object>) collProps.getOrDefault(DocCollection.DOC_ROUTER, Collections.singletonMap("name", DocRouter.DEFAULT_NAME));
DocRouter router = DocRouter.getDocRouter((String)routerProp.getOrDefault("name", DocRouter.DEFAULT_NAME));
- DocCollection dc = new DocCollection(coll, slices, collProps, router, clusterStateVersion, ZkStateReader.CLUSTER_STATE);
+ DocCollection dc = new DocCollection(coll, slices, collProps, router, clusterStateVersion + 1, ZkStateReader.CLUSTER_STATE);
res.put(coll, dc);
});
collectionsStatesRef.set(res);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2369c896/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java
index 1e99ff2..2b8940a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java
@@ -214,6 +214,8 @@ public class SimDistribStateManager implements DistribStateManager {
private final String id;
private final Node root;
+ private int juteMaxbuffer = 0xfffff;
+
public SimDistribStateManager() {
this(null);
}
@@ -226,6 +228,8 @@ public class SimDistribStateManager implements DistribStateManager {
this.id = IdUtils.timeRandomId();
this.root = root != null ? root : createNewRootNode();
watchersPool = ExecutorUtil.newMDCAwareFixedThreadPool(10, new DefaultSolrThreadFactory("sim-watchers"));
+ String bufferSize = System.getProperty("jute.maxbuffer", Integer.toString(0xffffff));
+ juteMaxbuffer = Integer.parseInt(bufferSize);
}
public SimDistribStateManager(ActionThrottle actionThrottle, ActionError actionError) {
@@ -493,6 +497,9 @@ public class SimDistribStateManager implements DistribStateManager {
@Override
public void setData(String path, byte[] data, int version) throws NoSuchElementException, BadVersionException, IOException {
+ if (data.length > juteMaxbuffer) {
+ throw new IOException("Len error " + data.length);
+ }
multiLock.lock();
Node n = null;
try {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2369c896/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java
index 9673fa7..5f9293b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java
@@ -233,6 +233,7 @@ public class SimNodeStateProvider implements NodeStateProvider {
}
private static final Pattern REGISTRY_PATTERN = Pattern.compile("^solr\\.core\\.([\\w.-_]+?)\\.(shard[\\d_]+?)\\.(replica.*)");
+ private static final Pattern METRIC_KEY_PATTERN = Pattern.compile("^metrics:([^:]+?):([^:]+?)(:([^:]+))?$");
/**
* Simulate getting replica metrics values. This uses per-replica properties set in
* {@link SimClusterStateProvider#simSetCollectionValue(String, String, Object, boolean, boolean)} and
@@ -245,33 +246,31 @@ public class SimNodeStateProvider implements NodeStateProvider {
if (!liveNodesSet.contains(node)) {
throw new RuntimeException("non-live node " + node);
}
- List<ReplicaInfo> replicas = clusterStateProvider.simGetReplicaInfos(node);
- if (replicas == null || replicas.isEmpty()) {
- return Collections.emptyMap();
- }
Map<String, Object> values = new HashMap<>();
for (String tag : tags) {
- String[] parts = tag.split(":");
- if (parts.length < 3 || !parts[0].equals("metrics")) {
+ Matcher m = METRIC_KEY_PATTERN.matcher(tag);
+ if (!m.matches() || m.groupCount() < 2) {
log.warn("Invalid metrics: tag: " + tag);
continue;
}
- if (!parts[1].startsWith("solr.core.")) {
+ String registryName = m.group(1);
+ String key = m.group(3) != null ? m.group(2) + m.group(3) : m.group(2);
+ if (!registryName.startsWith("solr.core.")) {
// skip - this is probably solr.node or solr.jvm metric
continue;
}
- Matcher m = REGISTRY_PATTERN.matcher(parts[1]);
+ m = REGISTRY_PATTERN.matcher(registryName);
if (!m.matches()) {
- log.warn("Invalid registry name: " + parts[1]);
+ log.warn("Invalid registry name: " + registryName);
continue;
}
String collection = m.group(1);
String shard = m.group(2);
String replica = m.group(3);
- String key = parts.length > 3 ? parts[2] + ":" + parts[3] : parts[2];
+ List<ReplicaInfo> replicas = clusterStateProvider.simGetReplicaInfos(collection, shard);
replicas.forEach(r -> {
- if (r.getCollection().equals(collection) && r.getShard().equals(shard) && r.getCore().endsWith(replica)) {
+ if (r.getNode().equals(node) && r.getCore().endsWith(replica)) {
Object value = r.getVariables().get(key);
if (value != null) {
values.put(tag, value);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2369c896/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimExecutePlanAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimExecutePlanAction.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimExecutePlanAction.java
index e593695..ea753bc 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimExecutePlanAction.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimExecutePlanAction.java
@@ -92,7 +92,7 @@ public class TestSimExecutePlanAction extends SimSolrCloudTestCase {
log.info("Collection ready after " + CloudTestUtils.waitForState(cluster, collectionName, 120, TimeUnit.SECONDS,
CloudTestUtils.clusterShape(1, 2, false, true)) + "ms");
- String sourceNodeName = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+ String sourceNodeName = cluster.getSimClusterStateProvider().simGetRandomNode();
ClusterState clusterState = cluster.getClusterStateProvider().getClusterState();
DocCollection docCollection = clusterState.getCollection(collectionName);
List<Replica> replicas = docCollection.getReplicas(sourceNodeName);
@@ -181,7 +181,7 @@ public class TestSimExecutePlanAction extends SimSolrCloudTestCase {
CloudTestUtils.waitForState(cluster, "Timed out waiting for replicas of new collection to be active",
collectionName, CloudTestUtils.clusterShape(1, 2, false, true));
- String sourceNodeName = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+ String sourceNodeName = cluster.getSimClusterStateProvider().simGetRandomNode();
ClusterState clusterState = cluster.getClusterStateProvider().getClusterState();
DocCollection docCollection = clusterState.getCollection(collectionName);
List<Replica> replicas = docCollection.getReplicas(sourceNodeName);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2369c896/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimExtremeIndexing.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimExtremeIndexing.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimExtremeIndexing.java
new file mode 100644
index 0000000..ab5295e
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimExtremeIndexing.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Iterator;
+import java.util.Locale;
+
+import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.CloudTestUtils;
+import org.apache.solr.cloud.autoscaling.ExecutePlanAction;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.SolrInputField;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.util.LogLevel;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
+
+/**
+ *
+ */
+@TimeoutSuite(millis = 48 * 3600 * 1000)
+@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.cloud.autoscaling.NodeLostTrigger=INFO;org.apache.client.solrj.cloud.autoscaling=DEBUG;org.apache.solr.cloud.autoscaling.ComputePlanAction=INFO;org.apache.solr.cloud.autoscaling.ExecutePlanAction=DEBUG;org.apache.solr.cloud.autoscaling.ScheduledTriggers=DEBUG")
+//@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.cloud.autoscaling.NodeLostTrigger=INFO;org.apache.client.solrj.cloud.autoscaling=DEBUG;org.apache.solr.cloud.CloudTestUtils=TRACE")
+public class TestSimExtremeIndexing extends SimSolrCloudTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final int SPEED = 100;
+ // use higher speed for larger scale tests
+ // private static final int SPEED = 500;
+ private static final int NUM_NODES = 200;
+
+ private static final long BATCH_SIZE = 200000;
+
+ private static final long NUM_BATCHES = 5000;
+ // ... or use this for a 1 trillion docs test
+ // private static final long NUM_BATCHES = 5000000;
+
+ // tweak this threshold to test the number of splits
+ private static final long ABOVE_SIZE = 20000000;
+
+
+ private static TimeSource timeSource;
+ private static SolrClient solrClient;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(NUM_NODES, TimeSource.get("simTime:" + SPEED));
+ timeSource = cluster.getTimeSource();
+ solrClient = cluster.simGetSolrClient();
+ cluster.simSetUseSystemCollection(false);
+ }
+
+ @AfterClass
+ public static void tearDownCluster() throws Exception {
+ solrClient = null;
+ }
+
+ @Test
+ public void testScaleUp() throws Exception {
+ String collectionName = "testScaleUp_collection";
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
+ "conf", 2, 2).setMaxShardsPerNode(10);
+ create.process(solrClient);
+ CloudTestUtils.waitForState(cluster, "failed to create " + collectionName, collectionName,
+ CloudTestUtils.clusterShape(2, 2, false, true));
+
+ //long waitForSeconds = 3 + random().nextInt(5);
+ long waitForSeconds = 1;
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'scaleUpTrigger'," +
+ "'event' : 'indexSize'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'aboveDocs' : " + ABOVE_SIZE + "," +
+ "'enabled' : true," +
+ "'actions' : [{'name' : 'compute_plan', 'class' : 'solr.ComputePlanAction'}," +
+ "{'name' : 'execute_plan', 'class' : '" + ExecutePlanAction.class.getName() + "'}]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ long batchSize = BATCH_SIZE;
+ for (long i = 0; i < NUM_BATCHES; i++) {
+ addDocs(collectionName, i * batchSize, batchSize);
+ log.info(String.format(Locale.ROOT, "#### Total docs so far: %,d", ((i + 1) * batchSize)));
+ timeSource.sleep(waitForSeconds);
+ }
+ timeSource.sleep(60000);
+ QueryResponse rsp = solrClient.query(collectionName, params(CommonParams.Q, "*:*"));
+ SolrDocumentList docs = rsp.getResults();
+ assertNotNull(docs);
+ assertEquals(docs.toString(), batchSize * NUM_BATCHES, docs.getNumFound());
+ }
+
+ private void addDocs(String collection, long start, long count) throws Exception {
+ UpdateRequest ureq = new UpdateRequest();
+ ureq.setParam("collection", collection);
+ ureq.setDocIterator(new FakeDocIterator(start, count));
+ solrClient.request(ureq);
+ }
+
+ // lightweight generator of fake documents
+ // NOTE: this iterator only ever returns the same document, which works ok
+ // for our "index update" simulation. Obviously don't use this for real indexing.
+ private static class FakeDocIterator implements Iterator<SolrInputDocument> {
+ final SolrInputDocument doc = new SolrInputDocument();
+ final SolrInputField idField = new SolrInputField("id");
+
+ final long start, count;
+
+ long current, max;
+
+ FakeDocIterator(long start, long count) {
+ this.start = start;
+ this.count = count;
+ current = start;
+ max = start + count;
+ doc.put("id", idField);
+ idField.setValue("foo");
+ }
+
+ @Override
+ public boolean hasNext() {
+ return current < max;
+ }
+
+ @Override
+ public SolrInputDocument next() {
+ current++;
+ return doc;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2369c896/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimNodeLostTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimNodeLostTrigger.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimNodeLostTrigger.java
index 4ad0623..c1c5f4c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimNodeLostTrigger.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimNodeLostTrigger.java
@@ -131,7 +131,7 @@ public class TestSimNodeLostTrigger extends SimSolrCloudTestCase {
trigger.setProcessor(noFirstRunProcessor);
trigger.run();
- String lostNode = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+ String lostNode = cluster.getSimClusterStateProvider().simGetRandomNode();
cluster.simRemoveNode(lostNode, false);
AtomicBoolean fired = new AtomicBoolean(false);
trigger.setProcessor(event -> {