You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2018/04/11 13:39:02 UTC
[1/2] lucene-solr:master: SOLR-12181: Add trigger based on document
count / index size.
Repository: lucene-solr
Updated Branches:
refs/heads/master e99a19755 -> 376f6c494
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/376f6c49/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
index f6762fc..9b3782a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
@@ -47,6 +47,8 @@ import org.apache.solr.client.solrj.cloud.autoscaling.Suggestion;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.cloud.ActionThrottle;
import org.apache.solr.cloud.api.collections.AddReplicaCmd;
import org.apache.solr.cloud.api.collections.Assign;
@@ -57,6 +59,7 @@ import org.apache.solr.cloud.overseer.ClusterStateMutator;
import org.apache.solr.cloud.overseer.CollectionMutator;
import org.apache.solr.cloud.overseer.ZkWriteCommand;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
@@ -241,7 +244,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* @return true if a node existed and was removed
*/
public boolean simRemoveNode(String nodeId) throws Exception {
- lock.lock();
+ lock.lockInterruptibly();
try {
Set<String> collections = new HashSet<>();
// mark every replica on that node as down
@@ -296,14 +299,14 @@ public class SimClusterStateProvider implements ClusterStateProvider {
liveNodes.add(nodeId);
createEphemeralLiveNode(nodeId);
Set<String> collections = new HashSet<>();
- lock.lock();
+ lock.lockInterruptibly();
try {
setReplicaStates(nodeId, Replica.State.RECOVERING, collections);
} finally {
lock.unlock();
}
cloudManager.getTimeSource().sleep(1000);
- lock.lock();
+ lock.lockInterruptibly();
try {
setReplicaStates(nodeId, Replica.State.ACTIVE, collections);
} finally {
@@ -389,7 +392,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
throw new Exception("Wrong node (not " + nodeId + "): " + replicaInfo);
}
- lock.lock();
+ lock.lockInterruptibly();
try {
opDelay(replicaInfo.getCollection(), CollectionParams.CollectionAction.ADDREPLICA.name());
@@ -435,7 +438,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
*/
public void simRemoveReplica(String nodeId, String coreNodeName) throws Exception {
List<ReplicaInfo> replicas = nodeReplicaMap.computeIfAbsent(nodeId, n -> new ArrayList<>());
- lock.lock();
+ lock.lockInterruptibly();
try {
for (int i = 0; i < replicas.size(); i++) {
if (coreNodeName.equals(replicas.get(i).getName())) {
@@ -638,6 +641,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
replicaNum.getAndIncrement());
try {
replicaProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
+ replicaProps.put("SEARCHER.searcher.deletedDocs", 0);
+ replicaProps.put("SEARCHER.searcher.numDocs", 0);
+ replicaProps.put("SEARCHER.searcher.maxDoc", 0);
ReplicaInfo ri = new ReplicaInfo("core_node" + Assign.incAndGetId(stateManager, collectionName, 0),
coreName, collectionName, pos.shard, pos.type, pos.node, replicaProps);
cloudManager.submit(() -> {
@@ -662,6 +668,8 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
});
});
+ // force recreation of collection states
+ collectionsStatesRef.set(null);
simRunLeaderElection(Collections.singleton(collectionName), true);
if (waitForFinalState) {
boolean finished = finalStateLatch.await(cloudManager.getTimeSource().convertDelay(TimeUnit.SECONDS, 60, TimeUnit.MILLISECONDS),
@@ -680,11 +688,11 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* @param async async id
* @param results results of the operation
*/
- public void simDeleteCollection(String collection, String async, NamedList results) throws IOException {
+ public void simDeleteCollection(String collection, String async, NamedList results) throws Exception {
if (async != null) {
results.add(CoreAdminParams.REQUESTID, async);
}
- lock.lock();
+ lock.lockInterruptibly();
try {
collProperties.remove(collection);
sliceProperties.remove(collection);
@@ -722,7 +730,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* Remove all collections.
*/
public void simDeleteAllCollections() throws Exception {
- lock.lock();
+ lock.lockInterruptibly();
try {
nodeReplicaMap.clear();
collProperties.clear();
@@ -797,7 +805,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
String collectionName = message.getStr(COLLECTION_PROP);
String sliceName = message.getStr(SHARD_ID_PROP);
ClusterState clusterState = getClusterState();
- lock.lock();
+ lock.lockInterruptibly();
try {
ZkWriteCommand cmd = new CollectionMutator(cloudManager).createShard(clusterState, message);
if (cmd.noop) {
@@ -865,6 +873,10 @@ public class SimClusterStateProvider implements ClusterStateProvider {
AtomicReference<String> sliceName = new AtomicReference<>();
sliceName.set(message.getStr(SHARD_ID_PROP));
String splitKey = message.getStr("split.key");
+
+ // always invalidate cached collection states to get up-to-date metrics
+ collectionsStatesRef.set(null);
+
ClusterState clusterState = getClusterState();
DocCollection collection = clusterState.getCollection(collectionName);
Slice parentSlice = SplitShardCmd.getParentSlice(clusterState, collectionName, sliceName, splitKey);
@@ -887,6 +899,18 @@ public class SimClusterStateProvider implements ClusterStateProvider {
PolicyHelper.SessionWrapper sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
if (sessionWrapper != null) sessionWrapper.release();
+ // adjust numDocs / deletedDocs / maxDoc
+ Replica leader = parentSlice.getLeader();
+ // XXX leader election may not have happened yet - should we require it?
+ if (leader == null) {
+ leader = parentSlice.getReplicas().iterator().next();
+ }
+ String numDocsStr = leader.getStr("SEARCHER.searcher.numDocs", "0");
+ long numDocs = Long.parseLong(numDocsStr);
+ long newNumDocs = numDocs / subSlices.size();
+ long remainder = numDocs % subSlices.size();
+ String remainderSlice = null;
+
for (ReplicaPosition replicaPosition : replicaPositions) {
String subSliceName = replicaPosition.shard;
String subShardNodeName = replicaPosition.node;
@@ -897,15 +921,32 @@ public class SimClusterStateProvider implements ClusterStateProvider {
replicaProps.put(ZkStateReader.REPLICA_TYPE, replicaPosition.type.toString());
replicaProps.put(ZkStateReader.BASE_URL_PROP, Utils.getBaseUrlForNodeName(subShardNodeName, "http"));
+ long replicasNumDocs = newNumDocs;
+ if (remainderSlice == null) {
+ remainderSlice = subSliceName;
+ }
+ if (remainderSlice.equals(subSliceName)) { // only add to one sub slice
+ replicasNumDocs += remainder;
+ }
+ replicaProps.put("SEARCHER.searcher.numDocs", replicasNumDocs);
+ replicaProps.put("SEARCHER.searcher.maxDoc", replicasNumDocs);
+ replicaProps.put("SEARCHER.searcher.deletedDocs", 0);
+
ReplicaInfo ri = new ReplicaInfo("core_node" + Assign.incAndGetId(stateManager, collectionName, 0),
solrCoreName, collectionName, replicaPosition.shard, replicaPosition.type, subShardNodeName, replicaProps);
simAddReplica(replicaPosition.node, ri, false);
}
// mark the old slice as inactive
- Map<String, Object> props = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
- .computeIfAbsent(sliceName.get(), s -> new ConcurrentHashMap<>());
- props.put(ZkStateReader.STATE_PROP, Slice.State.INACTIVE.toString());
- props.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs()));
+ lock.lockInterruptibly();
+ try {
+ Map<String, Object> props = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
+ .computeIfAbsent(sliceName.get(), s -> new ConcurrentHashMap<>());
+ props.put(ZkStateReader.STATE_PROP, Slice.State.INACTIVE.toString());
+ props.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs()));
+ // XXX also mark replicas as down? currently SplitShardCmd doesn't do this
+ } finally {
+ lock.unlock();
+ }
// add slice props
for (int i = 0; i < subRanges.size(); i++) {
String subSlice = subSlices.get(i);
@@ -915,8 +956,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
sliceProps.put(Slice.RANGE, range);
sliceProps.put(Slice.PARENT, sliceName.get());
sliceProps.put(ZkStateReader.STATE_PROP, Slice.State.ACTIVE.toString());
- props.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs()));
+ sliceProps.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs()));
}
+ collectionsStatesRef.set(null);
simRunLeaderElection(Collections.singleton(collectionName), true);
results.add("success", "");
@@ -945,7 +987,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
opDelay(collectionName, CollectionParams.CollectionAction.DELETESHARD.name());
- lock.lock();
+ lock.lockInterruptibly();
try {
sliceProperties.computeIfAbsent(collectionName, coll -> new ConcurrentHashMap<>()).remove(sliceName);
nodeReplicaMap.forEach((n, replicas) -> {
@@ -967,6 +1009,122 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
/**
+ * Simulate an update by modifying replica metrics.
+ * The following core metrics are updated:
+ * <ul>
+ * <li><code>SEARCHER.searcher.numDocs</code> - increased by added docs, decreased by deleteById and deleteByQuery</li>
+ * <li><code>SEARCHER.searcher.deletedDocs</code> - decreased by deleteById and deleteByQuery by up to <code>numDocs</code></li>
+ * <li><code>SEARCHER.searcher.maxDoc</code> - always increased by the number of added docs.</li>
+ * </ul>
+ * <p>IMPORTANT limitations:</p>
+ * <ul>
+ * <li>document replacements are always counted as new docs</li>
+ * <li>delete by ID always succeeds (unless numDocs == 0)</li>
+ * <li>deleteByQuery is not supported unless the query is <code>*:*</code></li>
+ * </ul>
+ * @param req update request. This request MUST have the <code>collection</code> param set.
+ * @return {@link UpdateResponse}
+ * @throws SolrException on errors, such as nonexistent collection or unsupported deleteByQuery
+ */
+ public UpdateResponse simUpdate(UpdateRequest req) throws SolrException, InterruptedException, IOException {
+ String collection = req.getCollection();
+ if (collection == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection not set");
+ }
+ if (!simListCollections().contains(collection)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection '" + collection + "' doesn't exist");
+ }
+ // always reset first to get the current metrics - it's easier than to keep matching
+ // Replica with ReplicaInfo where the current real counts are stored
+ collectionsStatesRef.set(null);
+ DocCollection coll = getClusterState().getCollection(collection);
+ DocRouter router = coll.getRouter();
+
+ boolean modified = false;
+
+ lock.lockInterruptibly();
+ try {
+ List<String> deletes = req.getDeleteById();
+ if (deletes != null && !deletes.isEmpty()) {
+ for (String id : deletes) {
+ Slice s = router.getTargetSlice(id, null, null, req.getParams(), coll);
+ // NOTE: we don't use getProperty because it uses PROPERTY_PROP_PREFIX
+ String numDocsStr = s.getLeader().getStr("SEARCHER.searcher.numDocs");
+ if (numDocsStr == null) {
+ LOG.debug("-- no docs in " + s.getLeader());
+ continue;
+ }
+ long numDocs = Long.parseLong(numDocsStr);
+ if (numDocs == 0) {
+ LOG.debug("-- attempting to delete nonexistent doc " + id + " from " + s.getLeader());
+ continue;
+ }
+ if (numDocsStr != null) {
+ modified = true;
+ try {
+ simSetShardValue(collection, s.getName(), "SEARCHER.searcher.deletedDocs", 1, true, false);
+ simSetShardValue(collection, s.getName(), "SEARCHER.searcher.numDocs", -1, true, false);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+ }
+ deletes = req.getDeleteQuery();
+ if (deletes != null && !deletes.isEmpty()) {
+ for (String q : deletes) {
+ if (!"*:*".equals(q)) {
+ throw new UnsupportedOperationException("Only '*:*' query is supported in deleteByQuery");
+ }
+ for (Slice s : coll.getSlices()) {
+ String numDocsStr = s.getLeader().getStr("SEARCHER.searcher.numDocs");
+ if (numDocsStr == null) {
+ continue;
+ }
+ long numDocs = Long.parseLong(numDocsStr);
+ if (numDocs == 0) {
+ continue;
+ }
+ modified = true;
+ try {
+ simSetShardValue(collection, s.getName(), "SEARCHER.searcher.deletedDocs", numDocs, false, false);
+ simSetShardValue(collection, s.getName(), "SEARCHER.searcher.numDocs", 0, false, false);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+ }
+ List<SolrInputDocument> docs = req.getDocuments();
+ if (docs != null && !docs.isEmpty()) {
+ for (SolrInputDocument doc : docs) {
+ String id = (String) doc.getFieldValue("id");
+ if (id == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Document without id: " + doc);
+ }
+ Slice s = router.getTargetSlice(id, null, null, req.getParams(), coll);
+ modified = true;
+ try {
+ simSetShardValue(collection, s.getName(), "SEARCHER.searcher.numDocs", 1, true, false);
+ simSetShardValue(collection, s.getName(), "SEARCHER.searcher.maxDoc", 1, true, false);
+ // Policy reuses this value and expects it to be in GB units!!!
+ // the idea here is to increase the index size by 500 bytes with each doc
+ // simSetShardValue(collection, s.getName(), "INDEX.sizeInBytes", 500, true, false);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+ if (modified) {
+ collectionsStatesRef.set(null);
+ }
+ } finally {
+ lock.unlock();
+ }
+ return new UpdateResponse();
+ }
+
+ /**
* Saves cluster properties to clusterprops.json.
* @return current properties
*/
@@ -988,7 +1146,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* @param properties properties to set
*/
public void simSetClusterProperties(Map<String, Object> properties) throws Exception {
- lock.lock();
+ lock.lockInterruptibly();
try {
clusterProperties.clear();
if (properties != null) {
@@ -1007,7 +1165,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* @param value property value
*/
public void simSetClusterProperty(String key, Object value) throws Exception {
- lock.lock();
+ lock.lockInterruptibly();
try {
if (value != null) {
clusterProperties.put(key, value);
@@ -1026,7 +1184,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* @param properties properties
*/
public void simSetCollectionProperties(String coll, Map<String, Object> properties) throws Exception {
- lock.lock();
+ lock.lockInterruptibly();
try {
if (properties == null) {
collProperties.remove(coll);
@@ -1049,7 +1207,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
*/
public void simSetCollectionProperty(String coll, String key, String value) throws Exception {
Map<String, Object> props = collProperties.computeIfAbsent(coll, c -> new HashMap<>());
- lock.lock();
+ lock.lockInterruptibly();
try {
if (value == null) {
props.remove(key);
@@ -1070,7 +1228,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
*/
public void simSetSliceProperties(String coll, String slice, Map<String, Object> properties) throws Exception {
Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(coll, c -> new HashMap<>()).computeIfAbsent(slice, s -> new HashMap<>());
- lock.lock();
+ lock.lockInterruptibly();
try {
sliceProps.clear();
if (properties != null) {
@@ -1089,7 +1247,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* @param value property value
*/
public void simSetCollectionValue(String collection, String key, Object value) throws Exception {
- simSetCollectionValue(collection, key, value, false);
+ simSetCollectionValue(collection, key, value, false, false);
}
/**
@@ -1100,8 +1258,8 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* @param divide if the value is a {@link Number} and this param is true, then the value will be evenly
* divided by the number of replicas.
*/
- public void simSetCollectionValue(String collection, String key, Object value, boolean divide) throws Exception {
- simSetShardValue(collection, null, key, value, divide);
+ public void simSetCollectionValue(String collection, String key, Object value, boolean delta, boolean divide) throws Exception {
+ simSetShardValue(collection, null, key, value, delta, divide);
}
/**
@@ -1112,7 +1270,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* @param value property value
*/
public void simSetShardValue(String collection, String shard, String key, Object value) throws Exception {
- simSetShardValue(collection, shard, key, value, false);
+ simSetShardValue(collection, shard, key, value, false, false);
}
/**
@@ -1121,10 +1279,12 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* @param shard shard name. If null then all shards will be affected.
* @param key property name
* @param value property value
+ * @param delta if true then treat the numeric value as delta to add to the existing value
+ * (or set the value to delta if missing)
* @param divide if the value is a {@link Number} and this is true, then the value will be evenly
* divided by the number of replicas.
*/
- public void simSetShardValue(String collection, String shard, String key, Object value, boolean divide) throws Exception {
+ public void simSetShardValue(String collection, String shard, String key, Object value, boolean delta, boolean divide) throws Exception {
List<ReplicaInfo> infos = new ArrayList<>();
nodeReplicaMap.forEach((n, replicas) -> {
replicas.forEach(r -> {
@@ -1140,14 +1300,38 @@ public class SimClusterStateProvider implements ClusterStateProvider {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection " + collection + " doesn't exist.");
}
if (divide && value != null && (value instanceof Number)) {
- value = ((Number)value).doubleValue() / infos.size();
+ if ((value instanceof Long) || (value instanceof Integer)) {
+ value = ((Number) value).longValue() / infos.size();
+ } else {
+ value = ((Number) value).doubleValue() / infos.size();
+ }
}
for (ReplicaInfo r : infos) {
synchronized (r) {
if (value == null) {
r.getVariables().remove(key);
} else {
- r.getVariables().put(key, value);
+ if (delta) {
+ Object prevValue = r.getVariables().get(key);
+ if (prevValue != null) {
+ if ((prevValue instanceof Number) && (value instanceof Number)) {
+ if (((prevValue instanceof Long) || (prevValue instanceof Integer)) &&
+ ((value instanceof Long) || (value instanceof Integer))) {
+ Long newValue = ((Number)prevValue).longValue() + ((Number)value).longValue();
+ r.getVariables().put(key, newValue);
+ } else {
+ Double newValue = ((Number)prevValue).doubleValue() + ((Number)value).doubleValue();
+ r.getVariables().put(key, newValue);
+ }
+ } else {
+ throw new UnsupportedOperationException("delta cannot be applied to non-numeric values: " + prevValue + " and " + value);
+ }
+ } else {
+ r.getVariables().put(key, value);
+ }
+ } else {
+ r.getVariables().put(key, value);
+ }
}
}
}
@@ -1172,9 +1356,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* List collections.
* @return list of existing collections.
*/
- public List<String> simListCollections() {
+ public List<String> simListCollections() throws InterruptedException {
final Set<String> collections = new HashSet<>();
- lock.lock();
+ lock.lockInterruptibly();
try {
nodeReplicaMap.forEach((n, replicas) -> {
replicas.forEach(ri -> collections.add(ri.getCollection()));
@@ -1216,6 +1400,8 @@ public class SimClusterStateProvider implements ClusterStateProvider {
return state;
}
+ // this method uses a simple cache in collectionsStatesRef. Operations that modify
+ // cluster state should always reset this cache so that the changes become visible
private Map<String, DocCollection> getCollectionStates() {
Map<String, DocCollection> collectionStates = collectionsStatesRef.get();
if (collectionStates != null) {
@@ -1263,7 +1449,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
slices.put(s, slice);
});
Map<String, Object> collProps = collProperties.computeIfAbsent(coll, c -> new ConcurrentHashMap<>());
- DocCollection dc = new DocCollection(coll, slices, collProps, DocRouter.DEFAULT, clusterStateVersion, ZkStateReader.CLUSTER_STATE);
+ Map<String, Object> routerProp = (Map<String, Object>) collProps.getOrDefault(DocCollection.DOC_ROUTER, Collections.singletonMap("name", DocRouter.DEFAULT_NAME));
+ DocRouter router = DocRouter.getDocRouter((String)routerProp.getOrDefault("name", DocRouter.DEFAULT_NAME));
+ DocCollection dc = new DocCollection(coll, slices, collProps, router, clusterStateVersion, ZkStateReader.CLUSTER_STATE);
res.put(coll, dc);
});
collectionsStatesRef.set(res);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/376f6c49/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java
index 6d1f68a..b9169eb 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java
@@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory;
/**
* Simulated {@link NodeStateProvider}.
* Note: in order to setup node-level metrics use {@link #simSetNodeValues(String, Map)}. However, in order
- * to setup core-level metrics use {@link SimClusterStateProvider#simSetCollectionValue(String, String, Object, boolean)}.
+ * to setup core-level metrics use {@link SimClusterStateProvider#simSetCollectionValue(String, String, Object, boolean, boolean)}.
*/
public class SimNodeStateProvider implements NodeStateProvider {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -204,7 +204,7 @@ public class SimNodeStateProvider implements NodeStateProvider {
/**
* Simulate getting replica metrics values. This uses per-replica properties set in
- * {@link SimClusterStateProvider#simSetCollectionValue(String, String, Object, boolean)} and
+ * {@link SimClusterStateProvider#simSetCollectionValue(String, String, Object, boolean, boolean)} and
* similar methods.
* @param node node id
* @param tags metrics names
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/376f6c49/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimSolrCloudTestCase.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimSolrCloudTestCase.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimSolrCloudTestCase.java
index f18234a..757e297 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimSolrCloudTestCase.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimSolrCloudTestCase.java
@@ -22,15 +22,9 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import org.apache.solr.SolrTestCaseJ4;
-import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
@@ -79,59 +73,7 @@ public class SimSolrCloudTestCase extends SolrTestCaseJ4 {
public void tearDown() throws Exception {
super.tearDown();
if (cluster != null) {
- log.info("\n");
- log.info("#############################################");
- log.info("############ FINAL CLUSTER STATS ############");
- log.info("#############################################\n");
- log.info("## Live nodes:\t\t" + cluster.getLiveNodesSet().size());
- int emptyNodes = 0;
- int maxReplicas = 0;
- int minReplicas = Integer.MAX_VALUE;
- Map<String, Map<Replica.State, AtomicInteger>> replicaStates = new TreeMap<>();
- int numReplicas = 0;
- for (String node : cluster.getLiveNodesSet().get()) {
- List<ReplicaInfo> replicas = cluster.getSimClusterStateProvider().simGetReplicaInfos(node);
- numReplicas += replicas.size();
- if (replicas.size() > maxReplicas) {
- maxReplicas = replicas.size();
- }
- if (minReplicas > replicas.size()) {
- minReplicas = replicas.size();
- }
- for (ReplicaInfo ri : replicas) {
- replicaStates.computeIfAbsent(ri.getCollection(), c -> new TreeMap<>())
- .computeIfAbsent(ri.getState(), s -> new AtomicInteger())
- .incrementAndGet();
- }
- if (replicas.isEmpty()) {
- emptyNodes++;
- }
- }
- if (minReplicas == Integer.MAX_VALUE) {
- minReplicas = 0;
- }
- log.info("## Empty nodes:\t" + emptyNodes);
- Set<String> deadNodes = cluster.getSimNodeStateProvider().simGetDeadNodes();
- log.info("## Dead nodes:\t\t" + deadNodes.size());
- deadNodes.forEach(n -> log.info("##\t\t" + n));
- log.info("## Collections:\t" + cluster.getSimClusterStateProvider().simListCollections());
- log.info("## Max replicas per node:\t" + maxReplicas);
- log.info("## Min replicas per node:\t" + minReplicas);
- log.info("## Total replicas:\t\t" + numReplicas);
- replicaStates.forEach((c, map) -> {
- AtomicInteger repCnt = new AtomicInteger();
- map.forEach((s, cnt) -> repCnt.addAndGet(cnt.get()));
- log.info("## * " + c + "\t\t" + repCnt.get());
- map.forEach((s, cnt) -> log.info("##\t\t- " + String.format(Locale.ROOT, "%-12s %4d", s, cnt.get())));
- });
- log.info("######### Final Solr op counts ##########");
- cluster.simGetOpCounts().forEach((k, cnt) -> log.info("##\t\t- " + String.format(Locale.ROOT, "%-14s %4d", k, cnt.get())));
- log.info("######### Autoscaling event counts ###########");
- Map<String, Map<String, AtomicInteger>> counts = cluster.simGetEventCounts();
- counts.forEach((trigger, map) -> {
- log.info("## * Trigger: " + trigger);
- map.forEach((s, cnt) -> log.info("##\t\t- " + String.format(Locale.ROOT, "%-11s %4d", s, cnt.get())));
- });
+ log.info(cluster.dumpClusterState(false));
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/376f6c49/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java
index 14ac40f..129f18c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java
@@ -540,7 +540,7 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
String metricName = "QUERY./select.requestTimes:1minRate";
// simulate search traffic
- cluster.getSimClusterStateProvider().simSetShardValue(collectionName, "shard1", metricName, 40, true);
+ cluster.getSimClusterStateProvider().simSetShardValue(collectionName, "shard1", metricName, 40, false, true);
// now define the trigger. doing it earlier may cause partial events to be generated (where only some
// nodes / replicas exceeded the threshold).
@@ -592,7 +592,19 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
ops.forEach(op -> {
assertEquals(CollectionParams.CollectionAction.ADDREPLICA, op.getAction());
assertEquals(1, op.getHints().size());
- Pair<String, String> hint = (Pair<String, String>)op.getHints().get(Suggester.Hint.COLL_SHARD);
+ Object o = op.getHints().get(Suggester.Hint.COLL_SHARD);
+ // this may be a pair or a HashSet of pairs with size 1
+ Pair<String, String> hint = null;
+ if (o instanceof Pair) {
+ hint = (Pair<String, String>)o;
+ } else if (o instanceof Set) {
+ assertEquals("unexpected number of hints: " + o, 1, ((Set)o).size());
+ o = ((Set)o).iterator().next();
+ assertTrue("unexpected hint: " + o, o instanceof Pair);
+ hint = (Pair<String, String>)o;
+ } else {
+ fail("unexpected hints: " + o);
+ }
assertNotNull(hint);
assertEquals(collectionName, hint.first());
assertEquals("shard1", hint.second());
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/376f6c49/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
index 31e3636..c898dbc 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
@@ -1192,7 +1192,7 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
// solrClient.query(COLL1, query);
// }
- cluster.getSimClusterStateProvider().simSetCollectionValue(COLL1, "QUERY./select.requestTimes:1minRate", 500, true);
+ cluster.getSimClusterStateProvider().simSetCollectionValue(COLL1, "QUERY./select.requestTimes:1minRate", 500, false, true);
boolean await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
assertTrue("The trigger did not fire at all", await);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/376f6c49/solr/solr-ref-guide/src/solrcloud-autoscaling-triggers.adoc
----------------------------------------------------------------------
diff --git a/solr/solr-ref-guide/src/solrcloud-autoscaling-triggers.adoc b/solr/solr-ref-guide/src/solrcloud-autoscaling-triggers.adoc
index 3165669..432a5e9 100644
--- a/solr/solr-ref-guide/src/solrcloud-autoscaling-triggers.adoc
+++ b/solr/solr-ref-guide/src/solrcloud-autoscaling-triggers.adoc
@@ -34,6 +34,8 @@ Currently the following event types (and corresponding trigger implementations)
* `nodeAdded`: generated when a new node joins the cluster
* `nodeLost`: generated when a node leaves the cluster
* `metric`: generated when the configured metric crosses a configured lower or upper threshold value
+* `indexSize`: generated when a shard size (defined as index size in bytes or number of documents)
+exceeds upper or lower threshold values
* `searchRate`: generated when the 1-minute average search rate exceeds configured upper threshold
* `scheduled`: generated according to a scheduled time period such as every 24 hours etc
@@ -105,6 +107,81 @@ This trigger supports the following configuration:
}
----
+== Index Size Trigger
+This trigger can be used for monitoring the size of collection shards, measured either by the
+number of documents in a shard or the physical size of the shard's index in bytes.
+
+When either of the upper thresholds is exceeded the trigger will generate an event with
+a (configurable) requested operation to perform on the offending shards - by default
+this is a SPLITSHARD operation.
+
+Similarly, when either of the lower thresholds is exceeded the trigger will generate an
+event with a (configurable) requested operation to perform on two of the smallest
+shards - by default this is a MERGESHARDS operation (which is currently ignored because
+it's not yet implemented - SOLR-9407)
+
+Additionally, monitoring can be restricted to a list of collections - by default
+all collections are monitored.
+
+This trigger supports the following configuration parameters (all thresholds are exclusive):
+
+`aboveBytes`:: upper threshold in bytes. This value is compared to the `INDEX.sizeInBytes` metric.
+
+`belowBytes`:: lower threshold in bytes. Note that this value should be at least 2x smaller than
+`aboveBytes`
+
+`aboveDocs`:: upper threshold expressed as the number of documents. This value is compared with `SEARCHER.searcher.numDocs` metric.
+Note: due to the way Lucene indexes work a shard may exceed the `aboveBytes` threshold
+even if the number of documents is relatively small, because replaced and deleted documents keep
+occupying disk space until they are actually removed during Lucene index merging.
+
+`belowDocs`:: lower threshold expressed as the number of documents.
+
+`aboveOp`:: operation to request when an upper threshold is exceeded. If not specified the
+default value is `SPLITSHARD`.
+
+`belowOp`:: operation to request when a lower threshold is exceeded. If not specified
+the default value is `MERGESHARDS` (but see the note above).
+
+`collections`:: comma-separated list of collection names that this trigger should monitor. If not
+specified or empty all collections are monitored.
+
+Events generated by this trigger contain additional details about the shards
+that exceeded thresholds and the types of violations (upper / lower bounds, bytes / docs metrics).
+
+.Example:
+This configuration specifies an index size trigger that monitors collections "test1" and "test2",
+with both bytes (1GB) and number of docs (1 mln) upper limits, and a custom `belowOp`
+operation `NONE` (which still can be monitored and acted upon by an appropriate trigger listener):
+
+[source,json]
+----
+{
+ "set-trigger": {
+ "name" : "index_size_trigger",
+ "event" : "indexSize",
+ "collections" : "test1,test2",
+ "aboveBytes" : 1000000000,
+ "aboveDocs" : 1000000000,
+ "belowBytes" : 200000,
+ "belowDocs" : 200000,
+ "belopOp" : "NONE",
+ "waitFor" : "1m",
+ "enabled" : true,
+ "actions" : [
+ {
+ "name" : "compute_plan",
+ "class": "solr.ComputePlanAction"
+ },
+ {
+ "name" : "execute_plan",
+ "class": "solr.ExecutePlanAction"
+ }
+ ]
+ }
+}
+----
+
== Search Rate Trigger
The search rate trigger can be used for monitoring 1-minute average search rates in a selected
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/376f6c49/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
index bee69c8..9496b0f 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
@@ -466,7 +466,10 @@ public class Policy implements MapWriter {
static {
ops.put(CollectionAction.ADDREPLICA, () -> new AddReplicaSuggester());
+ ops.put(CollectionAction.DELETEREPLICA, () -> new UnsupportedSuggester(CollectionAction.DELETEREPLICA));
ops.put(CollectionAction.MOVEREPLICA, () -> new MoveReplicaSuggester());
+ ops.put(CollectionAction.SPLITSHARD, () -> new SplitShardSuggester());
+ ops.put(CollectionAction.MERGESHARDS, () -> new UnsupportedSuggester(CollectionAction.MERGESHARDS));
}
public Map<String, List<Clause>> getPolicies() {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/376f6c49/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java
index 8c1fba3..e1d8281 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java
@@ -66,6 +66,10 @@ public class ReplicaInfo implements MapWriter {
this.node = node;
}
+ public Object clone() {
+ return new ReplicaInfo(name, core, collection, shard, type, node, variables);
+ }
+
@Override
public void writeMap(EntryWriter ew) throws IOException {
ew.put(name, (MapWriter) ew1 -> {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/376f6c49/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/SplitShardSuggester.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/SplitShardSuggester.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/SplitShardSuggester.java
new file mode 100644
index 0000000..2a42d27
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/SplitShardSuggester.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.cloud.autoscaling;
+
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.common.util.Pair;
+
+/**
+ * This suggester produces a SPLITSHARD request using provided {@link org.apache.solr.client.solrj.cloud.autoscaling.Suggester.Hint#COLL_SHARD} value.
+ */
+class SplitShardSuggester extends Suggester {
+
+ @Override
+ SolrRequest init() {
+ Set<Pair<String, String>> shards = (Set<Pair<String, String>>) hints.getOrDefault(Hint.COLL_SHARD, Collections.emptySet());
+ if (shards.isEmpty()) {
+ throw new RuntimeException("split-shard requires 'collection' and 'shard'");
+ }
+ if (shards.size() > 1) {
+ throw new RuntimeException("split-shard requires exactly one pair of 'collection' and 'shard'");
+ }
+ Pair<String, String> collShard = shards.iterator().next();
+ return CollectionAdminRequest.splitShard(collShard.first()).setShardName(collShard.second());
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/376f6c49/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/TriggerEventType.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/TriggerEventType.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/TriggerEventType.java
index 96bc773..a983bf0 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/TriggerEventType.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/TriggerEventType.java
@@ -28,5 +28,6 @@ public enum TriggerEventType {
SEARCHRATE,
INDEXRATE,
INVALID,
- METRIC
+ METRIC,
+ INDEXSIZE
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/376f6c49/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/UnsupportedSuggester.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/UnsupportedSuggester.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/UnsupportedSuggester.java
new file mode 100644
index 0000000..9d44ae4
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/UnsupportedSuggester.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.cloud.autoscaling;
+
+import java.lang.invoke.MethodHandles;
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.common.params.CollectionParams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This suggester simply logs the request but does not produce any suggestions.
+ */
+public class UnsupportedSuggester extends Suggester {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final CollectionParams.CollectionAction action;
+
+ public static UnsupportedSuggester get(Policy.Session session, CollectionParams.CollectionAction action) {
+ UnsupportedSuggester suggester = new UnsupportedSuggester(action);
+ suggester._init(session);
+ return suggester;
+ }
+
+ public UnsupportedSuggester(CollectionParams.CollectionAction action) {
+ this.action = action;
+ }
+
+ @Override
+ public CollectionParams.CollectionAction getAction() {
+ return action;
+ }
+
+ @Override
+ SolrRequest init() {
+ log.warn("Unsupported suggester for action " + action + " with hints " + hints + " - no suggestion available");
+ return null;
+ }
+
+ @Override
+ public SolrRequest getSuggestion() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/376f6c49/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
index f473ee4..6fb348f 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
@@ -119,7 +119,9 @@ public interface CollectionParams {
REPLACENODE(true, LockLevel.NONE),
DELETENODE(true, LockLevel.NONE),
MOCK_REPLICA_TASK(false, LockLevel.REPLICA),
- NONE(false, LockLevel.NONE)
+ NONE(false, LockLevel.NONE),
+ // TODO: not implemented yet
+ MERGESHARDS(true, LockLevel.SHARD)
;
public final boolean isWrite;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/376f6c49/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
----------------------------------------------------------------------
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java b/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
index e0f3d78..a4c3b6d 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
@@ -281,7 +281,7 @@ public class SolrCloudTestCase extends SolrTestCaseJ4 {
/**
* Return a {@link CollectionStatePredicate} that returns true if a collection has the expected
- * number of shards and replicas
+ * number of active shards and active replicas
*/
public static CollectionStatePredicate clusterShape(int expectedShards, int expectedReplicas) {
return (liveNodes, collectionState) -> {
[2/2] lucene-solr:master: SOLR-12181: Add trigger based on document
count / index size.
Posted by ab...@apache.org.
SOLR-12181: Add trigger based on document count / index size.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/376f6c49
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/376f6c49
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/376f6c49
Branch: refs/heads/master
Commit: 376f6c494671ed22034bf56e6005e50b06942f28
Parents: e99a197
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Wed Apr 11 12:35:31 2018 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Wed Apr 11 15:38:54 2018 +0200
----------------------------------------------------------------------
solr/CHANGES.txt | 2 +
.../autoscaling/AutoAddReplicasPlanAction.java | 4 +-
.../solr/cloud/autoscaling/AutoScaling.java | 3 +
.../cloud/autoscaling/ComputePlanAction.java | 21 +-
.../cloud/autoscaling/IndexSizeTrigger.java | 408 ++++++++++++
.../solr/cloud/autoscaling/MetricTrigger.java | 6 +-
.../cloud/autoscaling/SearchRateTrigger.java | 5 +-
.../solr/cloud/autoscaling/TriggerEvent.java | 16 +-
.../org/apache/solr/cloud/CloudTestUtils.java | 20 +-
.../cloud/autoscaling/IndexSizeTriggerTest.java | 647 +++++++++++++++++++
.../cloud/autoscaling/NodeAddedTriggerTest.java | 2 +-
.../ScheduledMaintenanceTriggerTest.java | 2 +-
.../cloud/autoscaling/sim/SimCloudManager.java | 103 ++-
.../sim/SimClusterStateProvider.java | 248 ++++++-
.../autoscaling/sim/SimNodeStateProvider.java | 4 +-
.../autoscaling/sim/SimSolrCloudTestCase.java | 60 +-
.../cloud/autoscaling/sim/TestLargeCluster.java | 16 +-
.../autoscaling/sim/TestTriggerIntegration.java | 2 +-
.../src/solrcloud-autoscaling-triggers.adoc | 77 +++
.../client/solrj/cloud/autoscaling/Policy.java | 3 +
.../solrj/cloud/autoscaling/ReplicaInfo.java | 4 +
.../cloud/autoscaling/SplitShardSuggester.java | 43 ++
.../cloud/autoscaling/TriggerEventType.java | 3 +-
.../cloud/autoscaling/UnsupportedSuggester.java | 59 ++
.../solr/common/params/CollectionParams.java | 4 +-
.../apache/solr/cloud/SolrCloudTestCase.java | 2 +-
26 files changed, 1633 insertions(+), 131 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/376f6c49/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 9cefce2..84cac13 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -88,6 +88,8 @@ New Features
* SOLR-12151: Add abstract MultiSolrCloudTestCase class. (Christine Poerschke)
+* SOLR-12181: Add index size autoscaling trigger, based on document count or size in bytes. (ab)
+
Bug Fixes
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/376f6c49/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanAction.java
index febd6bd..4189aa4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanAction.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanAction.java
@@ -33,7 +33,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
public class AutoAddReplicasPlanAction extends ComputePlanAction {
@Override
- protected Suggester getSuggester(Policy.Session session, TriggerEvent event, SolrCloudManager cloudManager) {
+ protected Suggester getSuggester(Policy.Session session, TriggerEvent event, ActionContext context, SolrCloudManager cloudManager) {
// for backward compatibility
ClusterStateProvider stateProvider = cloudManager.getClusterStateProvider();
String autoAddReplicas = stateProvider.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, (String) null);
@@ -41,7 +41,7 @@ public class AutoAddReplicasPlanAction extends ComputePlanAction {
return NoneSuggester.get(session);
}
- Suggester suggester = super.getSuggester(session, event, cloudManager);
+ Suggester suggester = super.getSuggester(session, event, context, cloudManager);
ClusterState clusterState;
try {
clusterState = stateProvider.getClusterState();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/376f6c49/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
index 68282a7..93f449a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
@@ -180,6 +180,9 @@ public class AutoScaling {
case SCHEDULED:
t = new ScheduledTrigger(name);
break;
+ case INDEXSIZE:
+ t = new IndexSizeTrigger(name);
+ break;
default:
throw new IllegalArgumentException("Unknown event type: " + type + " in trigger: " + name);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/376f6c49/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
index 8f3175c..4a9c744 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
@@ -34,6 +34,7 @@ import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
+import org.apache.solr.client.solrj.cloud.autoscaling.UnsupportedSuggester;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.params.AutoScalingParams;
@@ -88,7 +89,7 @@ public class ComputePlanAction extends TriggerActionBase {
log.trace("-- state: {}", clusterState);
}
try {
- Suggester intialSuggester = getSuggester(session, event, cloudManager);
+ Suggester intialSuggester = getSuggester(session, event, context, cloudManager);
Suggester suggester = intialSuggester;
int maxOperations = getMaxNumOps(event, autoScalingConf, clusterState);
int requestedOperations = getRequestedNumOps(event);
@@ -112,7 +113,7 @@ public class ComputePlanAction extends TriggerActionBase {
if (suggester.getSession() != null) {
session = suggester.getSession();
}
- suggester = getSuggester(session, event, cloudManager);
+ suggester = getSuggester(session, event, context, cloudManager);
// break on first null op
// unless a specific number of ops was requested
@@ -190,7 +191,7 @@ public class ComputePlanAction extends TriggerActionBase {
private static final String START = "__start__";
- protected Suggester getSuggester(Policy.Session session, TriggerEvent event, SolrCloudManager cloudManager) {
+ protected Suggester getSuggester(Policy.Session session, TriggerEvent event, ActionContext context, SolrCloudManager cloudManager) {
Suggester suggester;
switch (event.getEventType()) {
case NODEADDED:
@@ -203,6 +204,7 @@ public class ComputePlanAction extends TriggerActionBase {
break;
case SEARCHRATE:
case METRIC:
+ case INDEXSIZE:
List<TriggerEvent.Op> ops = (List<TriggerEvent.Op>)event.getProperty(TriggerEvent.REQUESTED_OPS, Collections.emptyList());
int start = (Integer)event.getProperty(START, 0);
if (ops.isEmpty() || start >= ops.size()) {
@@ -210,14 +212,15 @@ public class ComputePlanAction extends TriggerActionBase {
}
TriggerEvent.Op op = ops.get(start);
suggester = session.getSuggester(op.getAction());
+ if (suggester instanceof UnsupportedSuggester) {
+ List<TriggerEvent.Op> unsupportedOps = (List<TriggerEvent.Op>)context.getProperties().computeIfAbsent("unsupportedOps", k -> new ArrayList<TriggerEvent.Op>());
+ unsupportedOps.add(op);
+ }
for (Map.Entry<Suggester.Hint, Object> e : op.getHints().entrySet()) {
suggester = suggester.hint(e.getKey(), e.getValue());
}
- if (++start >= ops.size()) {
- event.getProperties().remove(START);
- } else {
- event.getProperties().put(START, start);
- }
+ start++;
+ event.getProperties().put(START, start);
break;
case SCHEDULED:
String preferredOp = (String) event.getProperty(AutoScalingParams.PREFERRED_OP, CollectionParams.CollectionAction.MOVEREPLICA.toLower());
@@ -225,7 +228,7 @@ public class ComputePlanAction extends TriggerActionBase {
suggester = session.getSuggester(action);
break;
default:
- throw new UnsupportedOperationException("No support for events other than nodeAdded, nodeLost, searchRate and metric. Received: " + event.getEventType());
+ throw new UnsupportedOperationException("No support for events other than nodeAdded, nodeLost, searchRate, metric and indexSize. Received: " + event.getEventType());
}
return suggester;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/376f6c49/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java
new file mode 100644
index 0000000..756f88f
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
+import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.util.Pair;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.SolrResourceLoader;
+import org.apache.solr.metrics.SolrCoreMetricManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class IndexSizeTrigger extends TriggerBase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static final String ABOVE_BYTES_PROP = "aboveBytes";
+ public static final String ABOVE_DOCS_PROP = "aboveDocs";
+ public static final String ABOVE_OP_PROP = "aboveOp";
+ public static final String BELOW_BYTES_PROP = "belowBytes";
+ public static final String BELOW_DOCS_PROP = "belowDocs";
+ public static final String BELOW_OP_PROP = "belowOp";
+ public static final String COLLECTIONS_PROP = "collections";
+
+ public static final String BYTES_SIZE_PROP = "__bytes__";
+ public static final String DOCS_SIZE_PROP = "__docs__";
+ public static final String ABOVE_SIZE_PROP = "aboveSize";
+ public static final String BELOW_SIZE_PROP = "belowSize";
+ public static final String VIOLATION_PROP = "violationType";
+
+ public enum Unit { bytes, docs }
+
+ private long aboveBytes, aboveDocs, belowBytes, belowDocs;
+ private CollectionParams.CollectionAction aboveOp, belowOp;
+ private final Set<String> collections = new HashSet<>();
+ private final Map<String, Long> lastEventMap = new ConcurrentHashMap<>();
+
+ public IndexSizeTrigger(String name) {
+ super(TriggerEventType.INDEXSIZE, name);
+ TriggerUtils.validProperties(validProperties,
+ ABOVE_BYTES_PROP, ABOVE_DOCS_PROP, BELOW_BYTES_PROP, BELOW_DOCS_PROP, COLLECTIONS_PROP);
+ }
+
+ @Override
+ public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, Map<String, Object> properties) throws TriggerValidationException {
+ super.configure(loader, cloudManager, properties);
+ String aboveStr = String.valueOf(properties.getOrDefault(ABOVE_BYTES_PROP, Long.MAX_VALUE));
+ String belowStr = String.valueOf(properties.getOrDefault(BELOW_BYTES_PROP, -1));
+ try {
+ aboveBytes = Long.parseLong(aboveStr);
+ if (aboveBytes <= 0) {
+ throw new Exception("value must be > 0");
+ }
+ } catch (Exception e) {
+ throw new TriggerValidationException(getName(), ABOVE_BYTES_PROP, "invalid value '" + aboveStr + "': " + e.toString());
+ }
+ try {
+ belowBytes = Long.parseLong(belowStr);
+ if (belowBytes < 0) {
+ belowBytes = -1;
+ }
+ } catch (Exception e) {
+ throw new TriggerValidationException(getName(), BELOW_BYTES_PROP, "invalid value '" + belowStr + "': " + e.toString());
+ }
+ // below must be at least 2x smaller than above, otherwise splitting a shard
+ // would immediately put the shard below the threshold and cause the mergeshards action
+ if (belowBytes > 0 && (belowBytes * 2 > aboveBytes)) {
+ throw new TriggerValidationException(getName(), BELOW_BYTES_PROP,
+ "invalid value " + belowBytes + ", should be less than half of '" + ABOVE_BYTES_PROP + "' value, which is " + aboveBytes);
+ }
+ // do the same for docs bounds
+ aboveStr = String.valueOf(properties.getOrDefault(ABOVE_DOCS_PROP, Long.MAX_VALUE));
+ belowStr = String.valueOf(properties.getOrDefault(BELOW_DOCS_PROP, -1));
+ try {
+ aboveDocs = Long.parseLong(aboveStr);
+ if (aboveDocs <= 0) {
+ throw new Exception("value must be > 0");
+ }
+ } catch (Exception e) {
+ throw new TriggerValidationException(getName(), ABOVE_DOCS_PROP, "invalid value '" + aboveStr + "': " + e.toString());
+ }
+ try {
+ belowDocs = Long.parseLong(belowStr);
+ if (belowDocs < 0) {
+ belowDocs = -1;
+ }
+ } catch (Exception e) {
+ throw new TriggerValidationException(getName(), BELOW_DOCS_PROP, "invalid value '" + belowStr + "': " + e.toString());
+ }
+ // below must be at least 2x smaller than above, otherwise splitting a shard
+ // would immediately put the shard below the threshold and cause the mergeshards action
+ if (belowDocs > 0 && (belowDocs * 2 > aboveDocs)) {
+ throw new TriggerValidationException(getName(), BELOW_DOCS_PROP,
+ "invalid value " + belowDocs + ", should be less than half of '" + ABOVE_DOCS_PROP + "' value, which is " + aboveDocs);
+ }
+
+ String collectionsString = (String) properties.get(COLLECTIONS_PROP);
+ if (collectionsString != null && !collectionsString.isEmpty()) {
+ collections.addAll(StrUtils.splitSmart(collectionsString, ','));
+ }
+ String aboveOpStr = String.valueOf(properties.getOrDefault(ABOVE_OP_PROP, CollectionParams.CollectionAction.SPLITSHARD.toLower()));
+ // TODO: this is a placeholder until SOLR-9407 is implemented
+ String belowOpStr = String.valueOf(properties.getOrDefault(BELOW_OP_PROP, CollectionParams.CollectionAction.MERGESHARDS.toLower()));
+ aboveOp = CollectionParams.CollectionAction.get(aboveOpStr);
+ if (aboveOp == null) {
+ throw new TriggerValidationException(getName(), ABOVE_OP_PROP, "unrecognized value of " + ABOVE_OP_PROP + ": '" + aboveOpStr + "'");
+ }
+ belowOp = CollectionParams.CollectionAction.get(belowOpStr);
+ if (belowOp == null) {
+ throw new TriggerValidationException(getName(), BELOW_OP_PROP, "unrecognized value of " + BELOW_OP_PROP + ": '" + belowOpStr + "'");
+ }
+ }
+
+ @Override
+ protected Map<String, Object> getState() {
+ Map<String, Object> state = new HashMap<>();
+ state.put("lastEventMap", lastEventMap);
+ return state;
+ }
+
+ @Override
+ protected void setState(Map<String, Object> state) {
+ this.lastEventMap.clear();
+ Map<String, Long> replicaVsTime = (Map<String, Long>)state.get("lastEventMap");
+ if (replicaVsTime != null) {
+ this.lastEventMap.putAll(replicaVsTime);
+ }
+ }
+
+ @Override
+ public void restoreState(AutoScaling.Trigger old) {
+ assert old.isClosed();
+ if (old instanceof IndexSizeTrigger) {
+ } else {
+ throw new SolrException(SolrException.ErrorCode.INVALID_STATE,
+ "Unable to restore state from an unknown type of trigger");
+ }
+ }
+
+ @Override
+ public void run() {
+ synchronized(this) {
+ if (isClosed) {
+ log.warn(getName() + " ran but was already closed");
+ return;
+ }
+ }
+ AutoScaling.TriggerEventProcessor processor = processorRef.get();
+ if (processor == null) {
+ return;
+ }
+
+ // replica name / info + size, retrieved from leaders only
+ Map<String, ReplicaInfo> currentSizes = new HashMap<>();
+
+ try {
+ ClusterState clusterState = cloudManager.getClusterStateProvider().getClusterState();
+ for (String node : clusterState.getLiveNodes()) {
+ Map<String, ReplicaInfo> metricTags = new HashMap<>();
+ // coll, shard, replica
+ Map<String, Map<String, List<ReplicaInfo>>> infos = cloudManager.getNodeStateProvider().getReplicaInfo(node, Collections.emptyList());
+ infos.forEach((coll, shards) -> {
+ if (!collections.isEmpty() && !collections.contains(coll)) {
+ return;
+ }
+ DocCollection docCollection = clusterState.getCollection(coll);
+
+ shards.forEach((sh, replicas) -> {
+ // check only the leader of a replica in active shard
+ Slice s = docCollection.getSlice(sh);
+ if (s.getState() != Slice.State.ACTIVE) {
+ return;
+ }
+ Replica r = s.getLeader();
+ // no leader - don't do anything
+ if (r == null) {
+ return;
+ }
+ // find ReplicaInfo
+ ReplicaInfo info = null;
+ for (ReplicaInfo ri : replicas) {
+ if (r.getCoreName().equals(ri.getCore())) {
+ info = ri;
+ break;
+ }
+ }
+ if (info == null) {
+ // probably replica is not on this node?
+ return;
+ }
+ // we have to translate to the metrics registry name, which uses "_replica_nN" as suffix
+ String replicaName = Utils.parseMetricsReplicaName(coll, info.getCore());
+ if (replicaName == null) { // should never happen???
+ replicaName = info.getName(); // which is actually coreNode name...
+ }
+ String registry = SolrCoreMetricManager.createRegistryName(true, coll, sh, replicaName, null);
+ String tag = "metrics:" + registry + ":INDEX.sizeInBytes";
+ metricTags.put(tag, info);
+ tag = "metrics:" + registry + ":SEARCHER.searcher.numDocs";
+ metricTags.put(tag, info);
+ });
+ });
+ if (metricTags.isEmpty()) {
+ continue;
+ }
+ Map<String, Object> sizes = cloudManager.getNodeStateProvider().getNodeValues(node, metricTags.keySet());
+ sizes.forEach((tag, size) -> {
+ final ReplicaInfo info = metricTags.get(tag);
+ if (info == null) {
+ log.warn("Missing replica info for response tag " + tag);
+ } else {
+ // verify that it's a Number
+ if (!(size instanceof Number)) {
+ log.warn("invalid size value - not a number: '" + size + "' is " + size.getClass().getName());
+ return;
+ }
+
+ ReplicaInfo currentInfo = currentSizes.computeIfAbsent(info.getCore(), k -> (ReplicaInfo)info.clone());
+ if (tag.contains("INDEX")) {
+ currentInfo.getVariables().put(BYTES_SIZE_PROP, ((Number) size).longValue());
+ } else {
+ currentInfo.getVariables().put(DOCS_SIZE_PROP, ((Number) size).longValue());
+ }
+ }
+ });
+ }
+ } catch (IOException e) {
+ log.warn("Error running trigger " + getName(), e);
+ return;
+ }
+
+ long now = cloudManager.getTimeSource().getTimeNs();
+
+ // now check thresholds
+
+ // collection / list(info)
+ Map<String, List<ReplicaInfo>> aboveSize = new HashMap<>();
+ currentSizes.entrySet().stream()
+ .filter(e -> (
+ (Long)e.getValue().getVariable(BYTES_SIZE_PROP) > aboveBytes ||
+ (Long)e.getValue().getVariable(DOCS_SIZE_PROP) > aboveDocs
+ ) && waitForElapsed(e.getKey(), now, lastEventMap))
+ .forEach(e -> {
+ ReplicaInfo info = e.getValue();
+ List<ReplicaInfo> infos = aboveSize.computeIfAbsent(info.getCollection(), c -> new ArrayList<>());
+ if (!infos.contains(info)) {
+ if ((Long)e.getValue().getVariable(BYTES_SIZE_PROP) > aboveBytes) {
+ info.getVariables().put(VIOLATION_PROP, ABOVE_BYTES_PROP);
+ } else {
+ info.getVariables().put(VIOLATION_PROP, ABOVE_DOCS_PROP);
+ }
+ infos.add(info);
+ }
+ });
+ // collection / list(info)
+ Map<String, List<ReplicaInfo>> belowSize = new HashMap<>();
+ currentSizes.entrySet().stream()
+ .filter(e -> (
+ (Long)e.getValue().getVariable(BYTES_SIZE_PROP) < belowBytes ||
+ (Long)e.getValue().getVariable(DOCS_SIZE_PROP) < belowDocs
+ ) && waitForElapsed(e.getKey(), now, lastEventMap))
+ .forEach(e -> {
+ ReplicaInfo info = e.getValue();
+ List<ReplicaInfo> infos = belowSize.computeIfAbsent(info.getCollection(), c -> new ArrayList<>());
+ if (!infos.contains(info)) {
+ if ((Long)e.getValue().getVariable(BYTES_SIZE_PROP) < belowBytes) {
+ info.getVariables().put(VIOLATION_PROP, BELOW_BYTES_PROP);
+ } else {
+ info.getVariables().put(VIOLATION_PROP, BELOW_DOCS_PROP);
+ }
+ infos.add(info);
+ }
+ });
+
+ if (aboveSize.isEmpty() && belowSize.isEmpty()) {
+ return;
+ }
+
+ // find the earliest time when a condition was exceeded
+ final AtomicLong eventTime = new AtomicLong(now);
+
+ // calculate ops
+ final List<TriggerEvent.Op> ops = new ArrayList<>();
+ aboveSize.forEach((coll, replicas) -> {
+ replicas.forEach(r -> {
+ TriggerEvent.Op op = new TriggerEvent.Op(aboveOp);
+ op.addHint(Suggester.Hint.COLL_SHARD, new Pair<>(coll, r.getShard()));
+ ops.add(op);
+ Long time = lastEventMap.get(r.getCore());
+ if (time != null && eventTime.get() > time) {
+ eventTime.set(time);
+ }
+ });
+ });
+ belowSize.forEach((coll, replicas) -> {
+ if (replicas.size() < 2) {
+ return;
+ }
+ // sort by increasing size
+ replicas.sort((r1, r2) -> {
+ // XXX this is not quite correct - if BYTES_SIZE_PROP decided that replica got here
+ // then we should be sorting by BYTES_SIZE_PROP. However, since DOCS and BYTES are
+ // loosely correlated it's simpler to sort just by docs (which better reflects the "too small"
+ // condition than index size, due to possibly existing deleted docs that still occupy space)
+ long delta = (Long) r1.getVariable(DOCS_SIZE_PROP) - (Long) r2.getVariable(DOCS_SIZE_PROP);
+ if (delta > 0) {
+ return 1;
+ } else if (delta < 0) {
+ return -1;
+ } else {
+ return 0;
+ }
+ });
+
+ // TODO: MERGESHARDS is not implemented yet. For now take the top two smallest shards
+ // TODO: but in the future we probably need to get ones with adjacent ranges.
+
+ // TODO: generate as many MERGESHARDS as needed to consume all belowSize shards
+ TriggerEvent.Op op = new TriggerEvent.Op(belowOp);
+ op.addHint(Suggester.Hint.COLL_SHARD, new Pair(coll, replicas.get(0).getShard()));
+ op.addHint(Suggester.Hint.COLL_SHARD, new Pair(coll, replicas.get(1).getShard()));
+ ops.add(op);
+ Long time = lastEventMap.get(replicas.get(0).getCore());
+ if (time != null && eventTime.get() > time) {
+ eventTime.set(time);
+ }
+ time = lastEventMap.get(replicas.get(1).getCore());
+ if (time != null && eventTime.get() > time) {
+ eventTime.set(time);
+ }
+ });
+
+ if (ops.isEmpty()) {
+ return;
+ }
+ if (processor.process(new IndexSizeEvent(getName(), eventTime.get(), ops, aboveSize, belowSize))) {
+ // update last event times
+ aboveSize.forEach((coll, replicas) -> {
+ replicas.forEach(r -> lastEventMap.put(r.getCore(), now));
+ });
+ belowSize.forEach((coll, replicas) -> {
+ lastEventMap.put(replicas.get(0).getCore(), now);
+ lastEventMap.put(replicas.get(1).getCore(), now);
+ });
+ }
+ }
+
+ private boolean waitForElapsed(String name, long now, Map<String, Long> lastEventMap) {
+ Long lastTime = lastEventMap.computeIfAbsent(name, s -> now);
+ long elapsed = TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS);
+ log.trace("name={}, lastTime={}, elapsed={}", name, lastTime, elapsed);
+ if (TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS) < getWaitForSecond()) {
+ return false;
+ }
+ return true;
+ }
+
+ public static class IndexSizeEvent extends TriggerEvent {
+ public IndexSizeEvent(String source, long eventTime, List<Op> ops, Map<String, List<ReplicaInfo>> aboveSize,
+ Map<String, List<ReplicaInfo>> belowSize) {
+ super(TriggerEventType.INDEXSIZE, source, eventTime, null);
+ properties.put(TriggerEvent.REQUESTED_OPS, ops);
+ properties.put(ABOVE_SIZE_PROP, aboveSize);
+ properties.put(BELOW_SIZE_PROP, belowSize);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/376f6c49/solr/core/src/java/org/apache/solr/cloud/autoscaling/MetricTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/MetricTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/MetricTrigger.java
index 9fdf8dc..9058a9a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/MetricTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/MetricTrigger.java
@@ -203,12 +203,12 @@ public class MetricTrigger extends TriggerBase {
List<Op> ops = new ArrayList<>(hotNodes.size());
for (String n : hotNodes.keySet()) {
Op op = new Op(CollectionParams.CollectionAction.get(preferredOp));
- op.setHint(Suggester.Hint.SRC_NODE, n);
+ op.addHint(Suggester.Hint.SRC_NODE, n);
if (!collection.equals(Policy.ANY)) {
if (!shard.equals(Policy.ANY)) {
- op.setHint(Suggester.Hint.COLL_SHARD, new Pair<>(collection, shard));
+ op.addHint(Suggester.Hint.COLL_SHARD, new Pair<>(collection, shard));
} else {
- op.setHint(Suggester.Hint.COLL, collection);
+ op.addHint(Suggester.Hint.COLL, collection);
}
}
ops.add(op);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/376f6c49/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java
index 00bc6d8..02a2d0c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java
@@ -181,10 +181,11 @@ public class SearchRateTrigger extends TriggerBase {
} else {
Map<String, List<ReplicaInfo>> perCollection = collectionRates.computeIfAbsent(info.getCollection(), s -> new HashMap<>());
List<ReplicaInfo> perShard = perCollection.computeIfAbsent(info.getShard(), s -> new ArrayList<>());
- info.getVariables().put(AutoScalingParams.RATE, rate);
+ info = (ReplicaInfo)info.clone();
+ info.getVariables().put(AutoScalingParams.RATE, ((Number)rate).doubleValue());
perShard.add(info);
AtomicDouble perNode = nodeRates.computeIfAbsent(node, s -> new AtomicDouble());
- perNode.addAndGet((Double)rate);
+ perNode.addAndGet(((Number)rate).doubleValue());
}
});
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/376f6c49/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEvent.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEvent.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEvent.java
index e4a4b3d..907309d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEvent.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEvent.java
@@ -17,9 +17,13 @@
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
@@ -49,11 +53,17 @@ public class TriggerEvent implements MapWriter {
public Op(CollectionParams.CollectionAction action, Suggester.Hint hint, Object hintValue) {
this.action = action;
- this.hints.put(hint, hintValue);
+ addHint(hint, hintValue);
}
- public void setHint(Suggester.Hint hint, Object value) {
- hints.put(hint, value);
+ public void addHint(Suggester.Hint hint, Object value) {
+ hint.validator.accept(value);
+ if (hint.multiValued) {
+ Collection<?> values = value instanceof Collection ? (Collection) value : Collections.singletonList(value);
+ ((Set) hints.computeIfAbsent(hint, h -> new HashSet<>())).addAll(values);
+ } else {
+ hints.put(hint, value == null ? null : String.valueOf(value));
+ }
}
public CollectionParams.CollectionAction getAction() {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/376f6c49/solr/core/src/test/org/apache/solr/cloud/CloudTestUtils.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/CloudTestUtils.java b/solr/core/src/test/org/apache/solr/cloud/CloudTestUtils.java
index 04c90b1..5590252 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CloudTestUtils.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CloudTestUtils.java
@@ -19,6 +19,7 @@ package org.apache.solr.cloud;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
+import java.util.Collection;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -113,19 +114,30 @@ public class CloudTestUtils {
* number of shards and replicas
*/
public static CollectionStatePredicate clusterShape(int expectedShards, int expectedReplicas) {
+ return clusterShape(expectedShards, expectedReplicas, false);
+ }
+
+ public static CollectionStatePredicate clusterShape(int expectedShards, int expectedReplicas, boolean withInactive) {
return (liveNodes, collectionState) -> {
- if (collectionState == null)
+ if (collectionState == null) {
+ log.debug("-- null collection");
return false;
- if (collectionState.getSlices().size() != expectedShards)
+ }
+ Collection<Slice> slices = withInactive ? collectionState.getSlices() : collectionState.getActiveSlices();
+ if (slices.size() != expectedShards) {
+ log.debug("-- wrong number of active slices, expected=" + expectedShards + ", found=" + collectionState.getSlices().size());
return false;
- for (Slice slice : collectionState) {
+ }
+ for (Slice slice : slices) {
int activeReplicas = 0;
for (Replica replica : slice) {
if (replica.isActive(liveNodes))
activeReplicas++;
}
- if (activeReplicas != expectedReplicas)
+ if (activeReplicas != expectedReplicas) {
+ log.debug("-- wrong number of active replicas in slice " + slice.getName() + ", expected=" + expectedReplicas + ", found=" + activeReplicas);
return false;
+ }
}
return true;
};
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/376f6c49/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java
new file mode 100644
index 0000000..79dd019
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/IndexSizeTriggerTest.java
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.lucene.util.TestUtil;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.cloud.CloudTestUtils;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.cloud.autoscaling.sim.SimCloudManager;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Pair;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.SolrResourceLoader;
+import org.apache.solr.util.LogLevel;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
+import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
+
+/**
+ *
+ */
+@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG")
+public class IndexSizeTriggerTest extends SolrCloudTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static SolrCloudManager cloudManager;
+ private static SolrClient solrClient;
+ private static TimeSource timeSource;
+ private static SolrResourceLoader loader;
+
+ private static int SPEED;
+
+ private AutoScaling.TriggerEventProcessor noFirstRunProcessor = event -> {
+ fail("Did not expect the processor to fire on first run! event=" + event);
+ return true;
+ };
+ private static final long WAIT_FOR_DELTA_NANOS = TimeUnit.MILLISECONDS.toNanos(2);
+
+ static Map<String, List<CapturedEvent>> listenerEvents = new ConcurrentHashMap<>();
+ static CountDownLatch listenerCreated = new CountDownLatch(1);
+ static CountDownLatch finished = new CountDownLatch(1);
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(2)
+ .addConfig("conf", configset("cloud-minimal"))
+ .configure();
+ if (random().nextBoolean()) {
+ cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
+ solrClient = cluster.getSolrClient();
+ loader = cluster.getJettySolrRunner(0).getCoreContainer().getResourceLoader();
+ SPEED = 1;
+ } else {
+ SPEED = 50;
+ cloudManager = SimCloudManager.createCluster(2, TimeSource.get("simTime:" + SPEED));
+ // wait for defaults to be applied - due to accelerated time sometimes we may miss this
+ cloudManager.getTimeSource().sleep(10000);
+ AutoScalingConfig cfg = cloudManager.getDistribStateManager().getAutoScalingConfig();
+ assertFalse("autoscaling config is empty", cfg.isEmpty());
+ solrClient = ((SimCloudManager)cloudManager).simGetSolrClient();
+ loader = ((SimCloudManager) cloudManager).getLoader();
+ }
+ timeSource = cloudManager.getTimeSource();
+ }
+
+ @After
+ public void restoreDefaults() throws Exception {
+ if (cloudManager instanceof SimCloudManager) {
+ log.info(((SimCloudManager) cloudManager).dumpClusterState(true));
+ ((SimCloudManager) cloudManager).getSimClusterStateProvider().simDeleteAllCollections();
+ ((SimCloudManager) cloudManager).simResetOpCounts();
+ } else {
+ cluster.deleteAllCollections();
+ }
+ cloudManager.getDistribStateManager().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), -1);
+ cloudManager.getTimeSource().sleep(5000);
+ listenerEvents.clear();
+ listenerCreated = new CountDownLatch(1);
+ finished = new CountDownLatch(1);
+ }
+
+ @AfterClass
+ public static void teardown() throws Exception {
+ if (cloudManager instanceof SimCloudManager) {
+ cloudManager.close();
+ }
+ solrClient = null;
+ cloudManager = null;
+ }
+
+ @Test
+ public void testTrigger() throws Exception {
+ String collectionName = "testTrigger_collection";
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
+ "conf", 2, 2).setMaxShardsPerNode(2);
+ create.process(solrClient);
+ CloudTestUtils.waitForState(cloudManager, "failed to create " + collectionName, collectionName,
+ CloudTestUtils.clusterShape(2, 2));
+
+ long waitForSeconds = 3 + random().nextInt(5);
+ Map<String, Object> props = createTriggerProps(waitForSeconds);
+ try (IndexSizeTrigger trigger = new IndexSizeTrigger("index_size_trigger")) {
+ trigger.configure(loader, cloudManager, props);
+ trigger.init();
+ trigger.setProcessor(noFirstRunProcessor);
+ trigger.run();
+
+ for (int i = 0; i < 25; i++) {
+ SolrInputDocument doc = new SolrInputDocument("id", "id-" + i);
+ solrClient.add(collectionName, doc);
+ }
+ solrClient.commit(collectionName);
+
+ AtomicBoolean fired = new AtomicBoolean(false);
+ AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
+ trigger.setProcessor(event -> {
+ if (fired.compareAndSet(false, true)) {
+ eventRef.set(event);
+ long currentTimeNanos = timeSource.getTimeNs();
+ long eventTimeNanos = event.getEventTime();
+ long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+ if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+ fail("processor was fired before the configured waitFor period: currentTimeNanos=" + currentTimeNanos + ", eventTimeNanos=" + eventTimeNanos + ",waitForNanos=" + waitForNanos);
+ }
+ } else {
+ fail("IndexSizeTrigger was fired more than once!");
+ }
+ return true;
+ });
+ trigger.run();
+ TriggerEvent ev = eventRef.get();
+ // waitFor delay - should not produce any event yet
+ assertNull("waitFor not elapsed but produced an event", ev);
+ timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
+ trigger.run();
+ ev = eventRef.get();
+ assertNotNull("should have fired an event", ev);
+ List<TriggerEvent.Op> ops = (List<TriggerEvent.Op>) ev.getProperty(TriggerEvent.REQUESTED_OPS);
+ assertNotNull("should contain requestedOps", ops);
+ assertEquals("number of ops", 2, ops.size());
+ boolean shard1 = false;
+ boolean shard2 = false;
+ for (TriggerEvent.Op op : ops) {
+ assertEquals(CollectionParams.CollectionAction.SPLITSHARD, op.getAction());
+ Set<Pair<String, String>> hints = (Set<Pair<String, String>>)op.getHints().get(Suggester.Hint.COLL_SHARD);
+ assertNotNull("hints", hints);
+ assertEquals("hints", 1, hints.size());
+ Pair<String, String> p = hints.iterator().next();
+ assertEquals(collectionName, p.first());
+ if (p.second().equals("shard1")) {
+ shard1 = true;
+ } else if (p.second().equals("shard2")) {
+ shard2 = true;
+ } else {
+ fail("unexpected shard name " + p.second());
+ }
+ }
+ assertTrue("shard1 should be split", shard1);
+ assertTrue("shard2 should be split", shard2);
+ }
+ }
+
+ public static class CapturingTriggerListener extends TriggerListenerBase {
+ @Override
+ public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) throws TriggerValidationException {
+ super.configure(loader, cloudManager, config);
+ listenerCreated.countDown();
+ }
+
+ @Override
+ public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
+ ActionContext context, Throwable error, String message) {
+ List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
+ CapturedEvent ev = new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message);
+ log.info("=======> " + ev);
+ lst.add(ev);
+ }
+ }
+
+ public static class FinishedProcessingListener extends TriggerListenerBase {
+
+ @Override
+ public void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName, ActionContext context, Throwable error, String message) throws Exception {
+ finished.countDown();
+ }
+ }
+
+ @Test
+ public void testSplitIntegration() throws Exception {
+ String collectionName = "testSplitIntegration_collection";
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
+ "conf", 2, 2).setMaxShardsPerNode(2);
+ create.process(solrClient);
+ CloudTestUtils.waitForState(cloudManager, "failed to create " + collectionName, collectionName,
+ CloudTestUtils.clusterShape(2, 2));
+
+ long waitForSeconds = 3 + random().nextInt(5);
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'index_size_trigger'," +
+ "'event' : 'indexSize'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'aboveDocs' : 10," +
+ "'belowDocs' : 4," +
+ "'enabled' : true," +
+ "'actions' : [{'name' : 'compute_plan', 'class' : 'solr.ComputePlanAction'}," +
+ "{'name' : 'execute_plan', 'class' : '" + ExecutePlanAction.class.getName() + "'}]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ String setListenerCommand = "{" +
+ "'set-listener' : " +
+ "{" +
+ "'name' : 'capturing'," +
+ "'trigger' : 'index_size_trigger'," +
+ "'stage' : ['STARTED','ABORTED','SUCCEEDED','FAILED']," +
+ "'beforeAction' : ['compute_plan','execute_plan']," +
+ "'afterAction' : ['compute_plan','execute_plan']," +
+ "'class' : '" + CapturingTriggerListener.class.getName() + "'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ setListenerCommand = "{" +
+ "'set-listener' : " +
+ "{" +
+ "'name' : 'finished'," +
+ "'trigger' : 'index_size_trigger'," +
+ "'stage' : ['SUCCEEDED']," +
+ "'class' : '" + FinishedProcessingListener.class.getName() + "'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+
+ for (int i = 0; i < 25; i++) {
+ SolrInputDocument doc = new SolrInputDocument("id", "id-" + i);
+ solrClient.add(collectionName, doc);
+ }
+ solrClient.commit(collectionName);
+
+ timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
+
+ boolean await = finished.await(60000 / SPEED, TimeUnit.MILLISECONDS);
+ assertTrue("did not finish processing in time", await);
+ CloudTestUtils.waitForState(cloudManager, collectionName, 10, TimeUnit.SECONDS, CloudTestUtils.clusterShape(4, 2));
+ assertEquals(1, listenerEvents.size());
+ List<CapturedEvent> events = listenerEvents.get("capturing");
+ assertNotNull("'capturing' events not found", events);
+ assertEquals("events: " + events, 6, events.size());
+ assertEquals(TriggerEventProcessorStage.STARTED, events.get(0).stage);
+ assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(1).stage);
+ assertEquals(TriggerEventProcessorStage.AFTER_ACTION, events.get(2).stage);
+ assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(3).stage);
+ assertEquals(TriggerEventProcessorStage.AFTER_ACTION, events.get(4).stage);
+ assertEquals(TriggerEventProcessorStage.SUCCEEDED, events.get(5).stage);
+ // check ops
+ List<TriggerEvent.Op> ops = (List<TriggerEvent.Op>) events.get(4).event.getProperty(TriggerEvent.REQUESTED_OPS);
+ assertNotNull("should contain requestedOps", ops);
+ assertEquals("number of ops", 2, ops.size());
+ boolean shard1 = false;
+ boolean shard2 = false;
+ for (TriggerEvent.Op op : ops) {
+ assertEquals(CollectionParams.CollectionAction.SPLITSHARD, op.getAction());
+ Set<Pair<String, String>> hints = (Set<Pair<String, String>>)op.getHints().get(Suggester.Hint.COLL_SHARD);
+ assertNotNull("hints", hints);
+ assertEquals("hints", 1, hints.size());
+ Pair<String, String> p = hints.iterator().next();
+ assertEquals(collectionName, p.first());
+ if (p.second().equals("shard1")) {
+ shard1 = true;
+ } else if (p.second().equals("shard2")) {
+ shard2 = true;
+ } else {
+ fail("unexpected shard name " + p.second());
+ }
+ }
+ assertTrue("shard1 should be split", shard1);
+ assertTrue("shard2 should be split", shard2);
+
+ }
+
+ @Test
+ public void testMergeIntegration() throws Exception {
+ String collectionName = "testMergeIntegration_collection";
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
+ "conf", 2, 2).setMaxShardsPerNode(2);
+ create.process(solrClient);
+ CloudTestUtils.waitForState(cloudManager, "failed to create " + collectionName, collectionName,
+ CloudTestUtils.clusterShape(2, 2));
+
+ for (int i = 0; i < 10; i++) {
+ SolrInputDocument doc = new SolrInputDocument("id", "id-" + (i * 100));
+ solrClient.add(collectionName, doc);
+ }
+ solrClient.commit(collectionName);
+
+ long waitForSeconds = 3 + random().nextInt(5);
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'index_size_trigger'," +
+ "'event' : 'indexSize'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'aboveDocs' : 40," +
+ "'belowDocs' : 4," +
+ "'enabled' : true," +
+ "'actions' : [{'name' : 'compute_plan', 'class' : 'solr.ComputePlanAction'}," +
+ "{'name' : 'execute_plan', 'class' : '" + ExecutePlanAction.class.getName() + "'}]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ String setListenerCommand = "{" +
+ "'set-listener' : " +
+ "{" +
+ "'name' : 'capturing'," +
+ "'trigger' : 'index_size_trigger'," +
+ "'stage' : ['STARTED','ABORTED','SUCCEEDED','FAILED']," +
+ "'beforeAction' : ['compute_plan','execute_plan']," +
+ "'afterAction' : ['compute_plan','execute_plan']," +
+ "'class' : '" + CapturingTriggerListener.class.getName() + "'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ setListenerCommand = "{" +
+ "'set-listener' : " +
+ "{" +
+ "'name' : 'finished'," +
+ "'trigger' : 'index_size_trigger'," +
+ "'stage' : ['SUCCEEDED']," +
+ "'class' : '" + FinishedProcessingListener.class.getName() + "'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ // delete some docs to trigger a merge
+ for (int i = 0; i < 5; i++) {
+ solrClient.deleteById(collectionName, "id-" + (i * 100));
+ }
+ solrClient.commit(collectionName);
+
+ timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
+
+ boolean await = finished.await(60000 / SPEED, TimeUnit.MILLISECONDS);
+ assertTrue("did not finish processing in time", await);
+ assertEquals(1, listenerEvents.size());
+ List<CapturedEvent> events = listenerEvents.get("capturing");
+ assertNotNull("'capturing' events not found", events);
+ assertEquals("events: " + events, 6, events.size());
+ assertEquals(TriggerEventProcessorStage.STARTED, events.get(0).stage);
+ assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(1).stage);
+ assertEquals(TriggerEventProcessorStage.AFTER_ACTION, events.get(2).stage);
+ assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(3).stage);
+ assertEquals(TriggerEventProcessorStage.AFTER_ACTION, events.get(4).stage);
+ assertEquals(TriggerEventProcessorStage.SUCCEEDED, events.get(5).stage);
+ // check ops
+ List<TriggerEvent.Op> ops = (List<TriggerEvent.Op>) events.get(4).event.getProperty(TriggerEvent.REQUESTED_OPS);
+ assertNotNull("should contain requestedOps", ops);
+ assertTrue("number of ops: " + ops, ops.size() > 0);
+ for (TriggerEvent.Op op : ops) {
+ assertEquals(CollectionParams.CollectionAction.MERGESHARDS, op.getAction());
+ Set<Pair<String, String>> hints = (Set<Pair<String, String>>)op.getHints().get(Suggester.Hint.COLL_SHARD);
+ assertNotNull("hints", hints);
+ assertEquals("hints", 2, hints.size());
+ Pair<String, String> p = hints.iterator().next();
+ assertEquals(collectionName, p.first());
+ }
+
+ // TODO: fix this once MERGESHARDS is supported
+ List<TriggerEvent.Op> unsupportedOps = (List<TriggerEvent.Op>)events.get(2).context.get("properties.unsupportedOps");
+ assertNotNull("should have unsupportedOps", unsupportedOps);
+ assertEquals(unsupportedOps.toString() + "\n" + ops, ops.size(), unsupportedOps.size());
+ unsupportedOps.forEach(op -> assertEquals(CollectionParams.CollectionAction.MERGESHARDS, op.getAction()));
+ }
+
+ @Test
+ public void testMixedBounds() throws Exception {
+ if (cloudManager instanceof SimCloudManager) {
+ log.warn("Requires SOLR-12208");
+ return;
+ }
+
+ String collectionName = "testMixedBounds_collection";
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
+ "conf", 2, 2).setMaxShardsPerNode(2);
+ create.process(solrClient);
+ CloudTestUtils.waitForState(cloudManager, "failed to create " + collectionName, collectionName,
+ CloudTestUtils.clusterShape(2, 2));
+
+ for (int j = 0; j < 10; j++) {
+ UpdateRequest ureq = new UpdateRequest();
+ ureq.setParam("collection", collectionName);
+ for (int i = 0; i < 100; i++) {
+ SolrInputDocument doc = new SolrInputDocument("id", "id-" + (i * 100) + "-" + j);
+ doc.addField("foo", TestUtil.randomSimpleString(random(), 130, 130));
+ ureq.add(doc);
+ }
+ solrClient.request(ureq);
+ }
+ solrClient.commit(collectionName);
+
+ long waitForSeconds = 3 + random().nextInt(5);
+
+ // the trigger is initially disabled so that we have time to add listeners
+ // and have them capture all events once the trigger is enabled
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'index_size_trigger'," +
+ "'event' : 'indexSize'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ // don't hit this limit when indexing
+ "'aboveDocs' : 10000," +
+ // hit this limit when deleting
+ "'belowDocs' : 100," +
+ // hit this limit when indexing
+ "'aboveBytes' : 150000," +
+ // don't hit this limit when deleting
+ "'belowBytes' : 10," +
+ "'enabled' : false," +
+ "'actions' : [{'name' : 'compute_plan', 'class' : 'solr.ComputePlanAction'}," +
+ "{'name' : 'execute_plan', 'class' : '" + ExecutePlanAction.class.getName() + "'}]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ String setListenerCommand = "{" +
+ "'set-listener' : " +
+ "{" +
+ "'name' : 'capturing'," +
+ "'trigger' : 'index_size_trigger'," +
+ "'stage' : ['STARTED','ABORTED','SUCCEEDED','FAILED']," +
+ "'beforeAction' : ['compute_plan','execute_plan']," +
+ "'afterAction' : ['compute_plan','execute_plan']," +
+ "'class' : '" + CapturingTriggerListener.class.getName() + "'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ setListenerCommand = "{" +
+ "'set-listener' : " +
+ "{" +
+ "'name' : 'finished'," +
+ "'trigger' : 'index_size_trigger'," +
+ "'stage' : ['SUCCEEDED']," +
+ "'class' : '" + FinishedProcessingListener.class.getName() + "'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ // now enable the trigger
+ String resumeTriggerCommand = "{" +
+ "'resume-trigger' : {" +
+ "'name' : 'index_size_trigger'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, resumeTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
+
+ boolean await = finished.await(90000 / SPEED, TimeUnit.MILLISECONDS);
+ assertTrue("did not finish processing in time", await);
+ assertEquals(1, listenerEvents.size());
+ List<CapturedEvent> events = listenerEvents.get("capturing");
+ assertNotNull("'capturing' events not found", events);
+ assertEquals("events: " + events, 6, events.size());
+ assertEquals(TriggerEventProcessorStage.STARTED, events.get(0).stage);
+ assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(1).stage);
+ assertEquals(TriggerEventProcessorStage.AFTER_ACTION, events.get(2).stage);
+ assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(3).stage);
+ assertEquals(TriggerEventProcessorStage.AFTER_ACTION, events.get(4).stage);
+ assertEquals(TriggerEventProcessorStage.SUCCEEDED, events.get(5).stage);
+
+ // collection should have 2 inactive and 4 active shards
+ CloudTestUtils.waitForState(cloudManager, "failed to create " + collectionName, collectionName,
+ CloudTestUtils.clusterShape(6, 2, true));
+
+ // check ops
+ List<TriggerEvent.Op> ops = (List<TriggerEvent.Op>) events.get(4).event.getProperty(TriggerEvent.REQUESTED_OPS);
+ assertNotNull("should contain requestedOps", ops);
+ assertEquals("number of ops", 2, ops.size());
+ boolean shard1 = false;
+ boolean shard2 = false;
+ for (TriggerEvent.Op op : ops) {
+ assertEquals(CollectionParams.CollectionAction.SPLITSHARD, op.getAction());
+ Set<Pair<String, String>> hints = (Set<Pair<String, String>>)op.getHints().get(Suggester.Hint.COLL_SHARD);
+ assertNotNull("hints", hints);
+ assertEquals("hints", 1, hints.size());
+ Pair<String, String> p = hints.iterator().next();
+ assertEquals(collectionName, p.first());
+ if (p.second().equals("shard1")) {
+ shard1 = true;
+ } else if (p.second().equals("shard2")) {
+ shard2 = true;
+ } else {
+ fail("unexpected shard name " + p.second());
+ }
+ }
+ assertTrue("shard1 should be split", shard1);
+ assertTrue("shard2 should be split", shard2);
+
+ // now delete most of docs to trigger belowDocs condition
+ listenerEvents.clear();
+ finished = new CountDownLatch(1);
+
+ // suspend the trigger first so that we can safely delete all docs
+ String suspendTriggerCommand = "{" +
+ "'suspend-trigger' : {" +
+ "'name' : 'index_size_trigger'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ for (int j = 0; j < 8; j++) {
+ UpdateRequest ureq = new UpdateRequest();
+ ureq.setParam("collection", collectionName);
+ for (int i = 0; i < 95; i++) {
+ ureq.deleteById("id-" + (i * 100) + "-" + j);
+ }
+ solrClient.request(ureq);
+ }
+ solrClient.commit(collectionName);
+
+ // resume trigger
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, resumeTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
+
+ await = finished.await(90000 / SPEED, TimeUnit.MILLISECONDS);
+ assertTrue("did not finish processing in time", await);
+ assertEquals(1, listenerEvents.size());
+ events = listenerEvents.get("capturing");
+ assertNotNull("'capturing' events not found", events);
+ assertEquals("events: " + events, 6, events.size());
+ assertEquals(TriggerEventProcessorStage.STARTED, events.get(0).stage);
+ assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(1).stage);
+ assertEquals(TriggerEventProcessorStage.AFTER_ACTION, events.get(2).stage);
+ assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, events.get(3).stage);
+ assertEquals(TriggerEventProcessorStage.AFTER_ACTION, events.get(4).stage);
+ assertEquals(TriggerEventProcessorStage.SUCCEEDED, events.get(5).stage);
+
+ // check ops
+ ops = (List<TriggerEvent.Op>) events.get(4).event.getProperty(TriggerEvent.REQUESTED_OPS);
+ assertNotNull("should contain requestedOps", ops);
+ assertTrue("number of ops: " + ops, ops.size() > 0);
+ for (TriggerEvent.Op op : ops) {
+ assertEquals(CollectionParams.CollectionAction.MERGESHARDS, op.getAction());
+ Set<Pair<String, String>> hints = (Set<Pair<String, String>>)op.getHints().get(Suggester.Hint.COLL_SHARD);
+ assertNotNull("hints", hints);
+ assertEquals("hints", 2, hints.size());
+ Pair<String, String> p = hints.iterator().next();
+ assertEquals(collectionName, p.first());
+ }
+
+ // TODO: fix this once MERGESHARDS is supported
+ List<TriggerEvent.Op> unsupportedOps = (List<TriggerEvent.Op>)events.get(2).context.get("properties.unsupportedOps");
+ assertNotNull("should have unsupportedOps", unsupportedOps);
+ assertEquals(unsupportedOps.toString() + "\n" + ops, ops.size(), unsupportedOps.size());
+ unsupportedOps.forEach(op -> assertEquals(CollectionParams.CollectionAction.MERGESHARDS, op.getAction()));
+ }
+
+ private Map<String, Object> createTriggerProps(long waitForSeconds) {
+ Map<String, Object> props = new HashMap<>();
+ props.put("event", "indexSize");
+ props.put("waitFor", waitForSeconds);
+ props.put("enabled", true);
+ props.put(IndexSizeTrigger.ABOVE_DOCS_PROP, 10);
+ props.put(IndexSizeTrigger.BELOW_DOCS_PROP, 2);
+ List<Map<String, String>> actions = new ArrayList<>(3);
+ Map<String, String> map = new HashMap<>(2);
+ map.put("name", "compute_plan");
+ map.put("class", "solr.ComputePlanAction");
+ actions.add(map);
+ map = new HashMap<>(2);
+ map.put("name", "execute_plan");
+ map.put("class", "solr.ExecutePlanAction");
+ actions.add(map);
+ props.put("actions", actions);
+ return props;
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/376f6c49/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
index 2d084b8..cd52785 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
@@ -91,7 +91,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
- fail("NodeAddedListener was fired before the configured waitFor period: currentTimeNanos=" + currentTimeNanos + ", eventTimeNanos=" + eventTimeNanos + ",waitForNanos=" + waitForNanos);
+ fail("processor was fired before the configured waitFor period: currentTimeNanos=" + currentTimeNanos + ", eventTimeNanos=" + eventTimeNanos + ",waitForNanos=" + waitForNanos);
}
} else {
fail("NodeAddedTrigger was fired more than once!");
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/376f6c49/solr/core/src/test/org/apache/solr/cloud/autoscaling/ScheduledMaintenanceTriggerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/ScheduledMaintenanceTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ScheduledMaintenanceTriggerTest.java
index ffcab4d..164db8f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/ScheduledMaintenanceTriggerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ScheduledMaintenanceTriggerTest.java
@@ -165,7 +165,7 @@ public class ScheduledMaintenanceTriggerTest extends SolrCloudTestCase {
.setShardName("shard1");
split1.process(solrClient);
CloudTestUtils.waitForState(cloudManager, "failed to split " + collection1, collection1,
- CloudTestUtils.clusterShape(3, 1));
+ CloudTestUtils.clusterShape(3, 1, true));
String setListenerCommand = "{" +
"'set-listener' : " +
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/376f6c49/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
index 04dc96f..9641552 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
@@ -24,8 +24,10 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -42,8 +44,11 @@ import org.apache.solr.client.solrj.cloud.DistributedQueueFactory;
import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.NodeStateProvider;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.RequestStatusState;
@@ -55,6 +60,7 @@ import org.apache.solr.cloud.autoscaling.OverseerTriggerThread;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.rule.ImplicitSnitch;
@@ -240,6 +246,67 @@ public class SimCloudManager implements SolrCloudManager {
return values;
}
+ public String dumpClusterState(boolean withCollections) throws Exception {
+ StringBuilder sb = new StringBuilder();
+ sb.append("#######################################\n");
+ sb.append("############ CLUSTER STATE ############\n");
+ sb.append("#######################################\n");
+ sb.append("## Live nodes:\t\t" + getLiveNodesSet().size() + "\n");
+ int emptyNodes = 0;
+ int maxReplicas = 0;
+ int minReplicas = Integer.MAX_VALUE;
+ Map<String, Map<Replica.State, AtomicInteger>> replicaStates = new TreeMap<>();
+ int numReplicas = 0;
+ for (String node : getLiveNodesSet().get()) {
+ List<ReplicaInfo> replicas = getSimClusterStateProvider().simGetReplicaInfos(node);
+ numReplicas += replicas.size();
+ if (replicas.size() > maxReplicas) {
+ maxReplicas = replicas.size();
+ }
+ if (minReplicas > replicas.size()) {
+ minReplicas = replicas.size();
+ }
+ for (ReplicaInfo ri : replicas) {
+ replicaStates.computeIfAbsent(ri.getCollection(), c -> new TreeMap<>())
+ .computeIfAbsent(ri.getState(), s -> new AtomicInteger())
+ .incrementAndGet();
+ }
+ if (replicas.isEmpty()) {
+ emptyNodes++;
+ }
+ }
+ if (minReplicas == Integer.MAX_VALUE) {
+ minReplicas = 0;
+ }
+ sb.append("## Empty nodes:\t" + emptyNodes + "\n");
+ Set<String> deadNodes = getSimNodeStateProvider().simGetDeadNodes();
+ sb.append("## Dead nodes:\t\t" + deadNodes.size() + "\n");
+ deadNodes.forEach(n -> sb.append("##\t\t" + n + "\n"));
+ sb.append("## Collections:\t" + getSimClusterStateProvider().simListCollections() + "\n");
+ if (withCollections) {
+ ClusterState state = clusterStateProvider.getClusterState();
+ state.forEachCollection(coll -> sb.append(coll.toString() + "\n"));
+ }
+ sb.append("## Max replicas per node:\t" + maxReplicas + "\n");
+ sb.append("## Min replicas per node:\t" + minReplicas + "\n");
+ sb.append("## Total replicas:\t\t" + numReplicas + "\n");
+ replicaStates.forEach((c, map) -> {
+ AtomicInteger repCnt = new AtomicInteger();
+ map.forEach((s, cnt) -> repCnt.addAndGet(cnt.get()));
+ sb.append("## * " + c + "\t\t" + repCnt.get() + "\n");
+ map.forEach((s, cnt) -> sb.append("##\t\t- " + String.format(Locale.ROOT, "%-12s %4d", s, cnt.get()) + "\n"));
+ });
+ sb.append("######### Solr op counts ##########\n");
+ simGetOpCounts().forEach((k, cnt) -> sb.append("##\t\t- " + String.format(Locale.ROOT, "%-14s %4d", k, cnt.get()) + "\n"));
+ sb.append("######### Autoscaling event counts ###########\n");
+ Map<String, Map<String, AtomicInteger>> counts = simGetEventCounts();
+ counts.forEach((trigger, map) -> {
+ sb.append("## * Trigger: " + trigger + "\n");
+ map.forEach((s, cnt) -> sb.append("##\t\t- " + String.format(Locale.ROOT, "%-11s %4d", s, cnt.get()) + "\n"));
+ });
+ return sb.toString();
+ }
+
/**
* Get the instance of {@link SolrResourceLoader} that is used by the cluster components.
*/
@@ -333,6 +400,17 @@ public class SimCloudManager implements SolrCloudManager {
return new SolrClient() {
@Override
public NamedList<Object> request(SolrRequest request, String collection) throws SolrServerException, IOException {
+ if (collection != null) {
+ if (request instanceof AbstractUpdateRequest) {
+ ((AbstractUpdateRequest)request).setParam("collection", collection);
+ } else if (request instanceof QueryRequest) {
+ ModifiableSolrParams params = new ModifiableSolrParams(request.getParams());
+ params.set("collection", collection);
+ request = new QueryRequest(params);
+ } else {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "when collection != null only UpdateRequest and QueryRequest are supported: request=" + request + ", collection=" + collection);
+ }
+ }
SolrResponse rsp = SimCloudManager.this.request(request);
return rsp.getResponse();
}
@@ -508,14 +586,17 @@ public class SimCloudManager implements SolrCloudManager {
incrementCount("update");
// support only updates to the system collection
UpdateRequest ureq = (UpdateRequest)req;
- if (ureq.getCollection() == null || !ureq.getCollection().equals(CollectionAdminParams.SYSTEM_COLL)) {
- throw new UnsupportedOperationException("Only .system updates are supported but got: " + req);
- }
- List<SolrInputDocument> docs = ureq.getDocuments();
- if (docs != null) {
- systemColl.addAll(docs);
+ String collection = ureq.getCollection();
+ if (collection != null && !collection.equals(CollectionAdminParams.SYSTEM_COLL)) {
+ // simulate an update
+ return clusterStateProvider.simUpdate(ureq);
+ } else {
+ List<SolrInputDocument> docs = ureq.getDocuments();
+ if (docs != null) {
+ systemColl.addAll(docs);
+ }
+ return new UpdateResponse();
}
- return new UpdateResponse();
}
// support only a specific subset of collection admin ops
if (!(req instanceof CollectionAdminRequest)) {
@@ -560,8 +641,12 @@ public class SimCloudManager implements SolrCloudManager {
}
break;
case DELETE:
- clusterStateProvider.simDeleteCollection(req.getParams().get(CommonParams.NAME),
- req.getParams().get(CommonAdminParams.ASYNC), results);
+ try {
+ clusterStateProvider.simDeleteCollection(req.getParams().get(CommonParams.NAME),
+ req.getParams().get(CommonAdminParams.ASYNC), results);
+ } catch (Exception e) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
+ }
break;
case LIST:
results.add("collections", clusterStateProvider.simListCollections());