You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2018/01/10 11:13:43 UTC
lucene-solr:jira/solr-11714: SOLR-11714 Explicitly specify the list
of requested ops for triggers where effects cannot be immediately predicted
by the Policy framework. Move this logic from ComputePlanAction to the
respective triggers.
Repository: lucene-solr
Updated Branches:
refs/heads/jira/solr-11714 [created] ed7b49706
SOLR-11714 Explicitly specify the list of requested ops for triggers where
effects cannot be immediately predicted by the Policy framework. Move this
logic from ComputePlanAction to the respective triggers.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/ed7b4970
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/ed7b4970
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/ed7b4970
Branch: refs/heads/jira/solr-11714
Commit: ed7b49706fc0e0fe5b933f7327c90339a23e94a4
Parents: 6a55def
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Wed Jan 10 12:13:20 2018 +0100
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Wed Jan 10 12:13:20 2018 +0100
----------------------------------------------------------------------
.../cloud/autoscaling/ComputePlanAction.java | 125 +++++++++++--------
.../solr/cloud/autoscaling/MetricTrigger.java | 25 +++-
.../cloud/autoscaling/SearchRateTrigger.java | 57 ++++++++-
.../solr/cloud/autoscaling/TriggerEvent.java | 50 ++++++++
.../autoscaling/TriggerIntegrationTest.java | 108 +++++++++-------
.../sim/SimClusterStateProvider.java | 3 +
.../autoscaling/sim/TestTriggerIntegration.java | 30 ++++-
.../cloud/autoscaling/AddReplicaSuggester.java | 47 +++----
.../solr/common/params/AutoScalingParams.java | 1 +
9 files changed, 318 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ed7b4970/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
index b1e33e1..dcc22eb 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
@@ -19,17 +19,17 @@ package org.apache.solr.cloud.autoscaling;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
-import java.util.HashSet;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.NoneSuggester;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
-import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
import org.apache.solr.common.SolrException;
@@ -63,16 +63,41 @@ public class ComputePlanAction extends TriggerActionBase {
}
PolicyHelper.SessionWrapper sessionWrapper = PolicyHelper.getSession(cloudManager);
Policy.Session session = sessionWrapper.get();
+ ClusterState clusterState = cloudManager.getClusterStateProvider().getClusterState();
if (log.isTraceEnabled()) {
- ClusterState state = cloudManager.getClusterStateProvider().getClusterState();
log.trace("-- session: {}", session);
- log.trace("-- state: {}", state);
+ log.trace("-- state: {}", clusterState);
}
try {
Suggester suggester = getSuggester(session, event, cloudManager);
- while (true) {
+ int maxOperations = getMaxNumOps(event, autoScalingConf, clusterState);
+ int requestedOperations = getRequestedNumOps(event);
+ if (requestedOperations > maxOperations) {
+ log.warn("Requested number of operations {} higher than maximum {}, adjusting...",
+ requestedOperations, maxOperations);
+ }
+ int opCount = 0;
+ int opLimit = maxOperations;
+ if (requestedOperations > 0) {
+ opLimit = requestedOperations;
+ }
+ do {
SolrRequest operation = suggester.getSuggestion();
- if (operation == null) break;
+ opCount++;
+ // prepare suggester for the next iteration
+ session = suggester.getSession();
+ suggester = getSuggester(session, event, cloudManager);
+
+ // break on first null op
+ // unless a specific number of ops was requested
+ if (operation == null) {
+ if (requestedOperations < 0) {
+ break;
+ } else {
+ log.info("Computed plan empty, remained " + (opCount - opLimit) + " requested ops to try.");
+ continue;
+ }
+ }
log.info("Computed Plan: {}", operation.getParams());
Map<String, Object> props = context.getProperties();
props.compute("operations", (k, v) -> {
@@ -81,15 +106,14 @@ public class ComputePlanAction extends TriggerActionBase {
operations.add(operation);
return operations;
});
- session = suggester.getSession();
- suggester = getSuggester(session, event, cloudManager);
- }
+ } while (opCount < opLimit);
} finally {
releasePolicySession(sessionWrapper, session);
}
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Unexpected exception while processing event: " + event, e); }
+ "Unexpected exception while processing event: " + event, e);
+ }
}
private void releasePolicySession(PolicyHelper.SessionWrapper sessionWrapper, Policy.Session session) {
@@ -98,6 +122,33 @@ public class ComputePlanAction extends TriggerActionBase {
}
+ protected int getMaxNumOps(TriggerEvent event, AutoScalingConfig autoScalingConfig, ClusterState clusterState) {
+ // estimate a maximum default limit that should be sufficient for most purposes:
+ // number of nodes * total number of replicas * 3
+ AtomicInteger totalRF = new AtomicInteger();
+ clusterState.forEachCollection(coll -> totalRF.addAndGet(coll.getReplicationFactor() * coll.getSlices().size()));
+ int totalMax = clusterState.getLiveNodes().size() * totalRF.get() * 3;
+ int maxOp = (Integer)autoScalingConfig.getProperties().getOrDefault(AutoScalingParams.MAX_COMPUTE_OPERATIONS, totalMax);
+ Object o = event.getProperty(AutoScalingParams.MAX_COMPUTE_OPERATIONS, maxOp);
+ try {
+ return Integer.parseInt(String.valueOf(o));
+ } catch (Exception e) {
+ log.warn("Invalid '" + AutoScalingParams.MAX_COMPUTE_OPERATIONS + "' event property: " + o + ", using default " + maxOp);
+ return maxOp;
+ }
+ }
+
+ protected int getRequestedNumOps(TriggerEvent event) {
+ Collection<TriggerEvent.Op> ops = (Collection<TriggerEvent.Op>)event.getProperty(TriggerEvent.REQUESTED_OPS, Collections.emptyList());
+ if (ops.isEmpty()) {
+ return -1;
+ } else {
+ return ops.size();
+ }
+ }
+
+ private static final String START = "__start__";
+
protected Suggester getSuggester(Policy.Session session, TriggerEvent event, SolrCloudManager cloudManager) {
Suggester suggester;
switch (event.getEventType()) {
@@ -110,51 +161,21 @@ public class ComputePlanAction extends TriggerActionBase {
.hint(Suggester.Hint.SRC_NODE, event.getProperty(TriggerEvent.NODE_NAMES));
break;
case SEARCHRATE:
- Map<String, Map<String, Double>> hotShards = (Map<String, Map<String, Double>>)event.getProperty(AutoScalingParams.SHARD);
- Map<String, Double> hotCollections = (Map<String, Double>)event.getProperty(AutoScalingParams.COLLECTION);
- List<ReplicaInfo> hotReplicas = (List<ReplicaInfo>)event.getProperty(AutoScalingParams.REPLICA);
- Map<String, Double> hotNodes = (Map<String, Double>)event.getProperty(AutoScalingParams.NODE);
-
- if (hotShards.isEmpty() && hotCollections.isEmpty() && hotReplicas.isEmpty()) {
- // node -> MOVEREPLICA
- if (hotNodes.isEmpty()) {
- log.warn("Neither hot replicas / collection nor nodes are reported in event: " + event);
- return NoneSuggester.INSTANCE;
- }
- suggester = session.getSuggester(CollectionParams.CollectionAction.MOVEREPLICA);
- for (String node : hotNodes.keySet()) {
- suggester = suggester.hint(Suggester.Hint.SRC_NODE, node);
- }
- } else {
- // collection || shard || replica -> ADDREPLICA
- suggester = session.getSuggester(CollectionParams.CollectionAction.ADDREPLICA);
- Set<Pair> collectionShards = new HashSet<>();
- hotShards.forEach((coll, shards) -> shards.forEach((s, r) -> collectionShards.add(new Pair(coll, s))));
- for (Pair<String, String> colShard : collectionShards) {
- suggester = suggester.hint(Suggester.Hint.COLL_SHARD, colShard);
- }
- }
- break;
case METRIC:
- Map<String, Number> sourceNodes = (Map<String, Number>) event.getProperty(AutoScalingParams.NODE);
- String collection = (String) event.getProperty(AutoScalingParams.COLLECTION);
- String shard = (String) event.getProperty(AutoScalingParams.SHARD);
- String preferredOp = (String) event.getProperty(PREFERRED_OP);
- if (sourceNodes.isEmpty()) {
- log.warn("No nodes reported in event: " + event);
+ List<TriggerEvent.Op> ops = (List<TriggerEvent.Op>)event.getProperty(TriggerEvent.REQUESTED_OPS, Collections.emptyList());
+ int start = (Integer)event.getProperty(START, 0);
+ if (ops.isEmpty() || start >= ops.size()) {
return NoneSuggester.INSTANCE;
}
- CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(preferredOp == null ? CollectionParams.CollectionAction.MOVEREPLICA.toLower() : preferredOp);
- suggester = session.getSuggester(action);
- for (String node : sourceNodes.keySet()) {
- suggester = suggester.hint(Suggester.Hint.SRC_NODE, node);
+ TriggerEvent.Op op = ops.get(start);
+ suggester = session.getSuggester(op.getAction());
+ for (Map.Entry<Suggester.Hint, Object> e : op.getHints().entrySet()) {
+ suggester = suggester.hint(e.getKey(), e.getValue());
}
- if (collection != null) {
- if (shard == null) {
- suggester = suggester.hint(Suggester.Hint.COLL, collection);
- } else {
- suggester = suggester.hint(Suggester.Hint.COLL_SHARD, new Pair(collection, shard));
- }
+ if (++start >= ops.size()) {
+ event.getProperties().remove(START);
+ } else {
+ event.getProperties().put(START, start);
}
break;
default:
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ed7b4970/solr/core/src/java/org/apache/solr/cloud/autoscaling/MetricTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/MetricTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/MetricTrigger.java
index 531e4e6..3af8954 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/MetricTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/MetricTrigger.java
@@ -18,9 +18,11 @@
package org.apache.solr.cloud.autoscaling;
import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -30,12 +32,15 @@ import java.util.stream.Collectors;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.AutoScalingParams;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.util.Pair;
import org.apache.solr.core.SolrResourceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,7 +70,7 @@ public class MetricTrigger extends TriggerBase {
throw new IllegalArgumentException("When 'shard' is other than #ANY then collection name must be also other than #ANY");
}
node = (String) properties.getOrDefault(AutoScalingParams.NODE, Policy.ANY);
- preferredOp = (String) properties.getOrDefault(PREFERRED_OP, null);
+ preferredOp = (String) properties.getOrDefault(PREFERRED_OP, CollectionParams.CollectionAction.MOVEREPLICA.toLower());
}
@Override
@@ -182,9 +187,23 @@ public class MetricTrigger extends TriggerBase {
if (!shard.equals(Policy.ANY)) {
properties.put(AutoScalingParams.SHARD, shard);
}
- if (preferredOp != null) {
- properties.put(PREFERRED_OP, preferredOp);
+ properties.put(PREFERRED_OP, preferredOp);
+
+ // specify requested ops
+ List<Op> ops = new ArrayList<>(hotNodes.size());
+ for (String n : hotNodes.keySet()) {
+ Op op = new Op(CollectionParams.CollectionAction.get(preferredOp));
+ op.setHint(Suggester.Hint.SRC_NODE, n);
+ if (!collection.equals(Policy.ANY)) {
+ if (!shard.equals(Policy.ANY)) {
+ op.setHint(Suggester.Hint.COLL_SHARD, new Pair<>(collection, shard));
+ } else {
+ op.setHint(Suggester.Hint.COLL, collection);
+ }
+ }
+ ops.add(op);
}
+ properties.put(TriggerEvent.REQUESTED_OPS, ops);
}
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ed7b4970/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java
index ec3110e..a163493 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java
@@ -31,9 +31,12 @@ import com.google.common.util.concurrent.AtomicDouble;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.AutoScalingParams;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.metrics.SolrCoreMetricManager;
@@ -145,12 +148,14 @@ public class SearchRateTrigger extends TriggerBase {
Map<String, Map<String, List<ReplicaInfo>>> collectionRates = new HashMap<>();
Map<String, AtomicDouble> nodeRates = new HashMap<>();
+ Map<String, Integer> replicationFactors = new HashMap<>();
for (String node : cloudManager.getClusterStateProvider().getLiveNodes()) {
Map<String, ReplicaInfo> metricTags = new HashMap<>();
// coll, shard, replica
Map<String, Map<String, List<ReplicaInfo>>> infos = cloudManager.getNodeStateProvider().getReplicaInfo(node, Collections.emptyList());
infos.forEach((coll, shards) -> {
+ replicationFactors.computeIfAbsent(coll, c -> shards.size());
shards.forEach((sh, replicas) -> {
replicas.forEach(replica -> {
// we have to translate to the metrics registry name, which uses "_replica_nN" as suffix
@@ -261,7 +266,41 @@ public class SearchRateTrigger extends TriggerBase {
}
});
- if (processor.process(new SearchRateEvent(getName(), eventTime.get(), hotNodes, hotCollections, hotShards, hotReplicas))) {
+ // calculate the number of replicas to add to each hot shard, based on how much the rate was
+ // exceeded - but within limits.
+ final List<TriggerEvent.Op> ops = new ArrayList<>();
+ if (hotShards.isEmpty() && hotCollections.isEmpty() && hotReplicas.isEmpty()) {
+ // move replicas around
+ hotNodes.forEach((n, r) -> {
+ ops.add(new TriggerEvent.Op(CollectionParams.CollectionAction.MOVEREPLICA, Suggester.Hint.SRC_NODE, n));
+ });
+ } else {
+ // add replicas
+ Map<String, Map<String, List<Pair<String, String>>>> hints = new HashMap<>();
+
+ hotShards.forEach((coll, shards) -> shards.forEach((s, r) -> {
+ List<Pair<String, String>> perShard = hints
+ .computeIfAbsent(coll, c -> new HashMap<>())
+ .computeIfAbsent(s, sh -> new ArrayList<>());
+ addHints(coll, s, r, replicationFactors.get(coll), perShard);
+ }));
+ hotReplicas.forEach(ri -> {
+ double r = (Double)ri.getVariable(AutoScalingParams.RATE);
+ // add only if not already accounted for in hotShards
+ List<Pair<String, String>> perShard = hints
+ .computeIfAbsent(ri.getCollection(), c -> new HashMap<>())
+ .computeIfAbsent(ri.getShard(), sh -> new ArrayList<>());
+ if (perShard.isEmpty()) {
+ addHints(ri.getCollection(), ri.getShard(), r, replicationFactors.get(ri.getCollection()), perShard);
+ }
+ });
+
+ hints.values().forEach(m -> m.values().forEach(lst -> lst.forEach(p -> {
+ ops.add(new TriggerEvent.Op(CollectionParams.CollectionAction.ADDREPLICA, Suggester.Hint.COLL_SHARD, p));
+ })));
+ }
+
+ if (processor.process(new SearchRateEvent(getName(), eventTime.get(), ops, hotNodes, hotCollections, hotShards, hotReplicas))) {
// update lastEvent times
hotNodes.keySet().forEach(node -> lastNodeEvent.put(node, now));
hotCollections.keySet().forEach(coll -> lastCollectionEvent.put(coll, now));
@@ -271,6 +310,19 @@ public class SearchRateTrigger extends TriggerBase {
}
}
+ private void addHints(String collection, String shard, double r, int replicationFactor, List<Pair<String, String>> hints) {
+ int numReplicas = (int)Math.round((r - rate) / (double) replicationFactor);
+ if (numReplicas < 1) {
+ numReplicas = 1;
+ }
+ if (numReplicas > 3) {
+ numReplicas = 3;
+ }
+ for (int i = 0; i < numReplicas; i++) {
+ hints.add(new Pair(collection, shard));
+ }
+ }
+
private boolean waitForElapsed(String name, long now, Map<String, Long> lastEventMap) {
Long lastTime = lastEventMap.computeIfAbsent(name, s -> now);
long elapsed = TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS);
@@ -282,10 +334,11 @@ public class SearchRateTrigger extends TriggerBase {
}
public static class SearchRateEvent extends TriggerEvent {
- public SearchRateEvent(String source, long eventTime, Map<String, Double> hotNodes,
+ public SearchRateEvent(String source, long eventTime, List<Op> ops, Map<String, Double> hotNodes,
Map<String, Double> hotCollections,
Map<String, Map<String, Double>> hotShards, List<ReplicaInfo> hotReplicas) {
super(TriggerEventType.SEARCHRATE, source, eventTime, null);
+ properties.put(TriggerEvent.REQUESTED_OPS, ops);
properties.put(AutoScalingParams.COLLECTION, hotCollections);
properties.put(AutoScalingParams.SHARD, hotShards);
properties.put(AutoScalingParams.REPLICA, hotReplicas);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ed7b4970/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEvent.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEvent.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEvent.java
index 38be54a..fe11cf9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEvent.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEvent.java
@@ -17,11 +17,14 @@
package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
+import java.util.EnumMap;
import java.util.HashMap;
import java.util.Map;
+import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.common.MapWriter;
+import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.util.Utils;
import org.apache.solr.util.IdUtils;
@@ -33,6 +36,41 @@ public class TriggerEvent implements MapWriter {
public static final String REPLAYING = "replaying";
public static final String NODE_NAMES = "nodeNames";
public static final String EVENT_TIMES = "eventTimes";
+ public static final String REQUESTED_OPS = "requestedOps";
+
+ public static final class Op {
+ private final CollectionParams.CollectionAction action;
+ private final EnumMap<Suggester.Hint, Object> hints = new EnumMap<>(Suggester.Hint.class);
+
+ public Op(CollectionParams.CollectionAction action) {
+ this.action = action;
+ }
+
+ public Op(CollectionParams.CollectionAction action, Suggester.Hint hint, Object hintValue) {
+ this.action = action;
+ this.hints.put(hint, hintValue);
+ }
+
+ public void setHint(Suggester.Hint hint, Object value) {
+ hints.put(hint, value);
+ }
+
+ public CollectionParams.CollectionAction getAction() {
+ return action;
+ }
+
+ public EnumMap<Suggester.Hint, Object> getHints() {
+ return hints;
+ }
+
+ @Override
+ public String toString() {
+ return "Op{" +
+ "action=" + action +
+ ", hints=" + hints +
+ '}';
+ }
+ }
protected final String id;
protected final String source;
@@ -94,6 +132,18 @@ public class TriggerEvent implements MapWriter {
}
/**
+ * Get a named event property or default value if missing.
+ */
+ public Object getProperty(String name, Object defaultValue) {
+ Object v = properties.get(name);
+ if (v == null) {
+ return defaultValue;
+ } else {
+ return v;
+ }
+ }
+
+ /**
* Event type.
*/
public TriggerEventType getEventType() {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ed7b4970/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
index 639f240..eccbcd2 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
@@ -1398,7 +1398,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
}
@Test
- @AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-11714")
+ //@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-11714")
public void testSearchRate() throws Exception {
// start a few more jetty-s
for (int i = 0; i < 3; i++) {
@@ -1446,9 +1446,19 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
assertTrue("The trigger did not fire at all", await);
// wait for listener to capture the SUCCEEDED stage
- Thread.sleep(2000);
- assertEquals(listenerEvents.toString(), 1, listenerEvents.get("srt").size());
- CapturedEvent ev = listenerEvents.get("srt").get(0);
+ Thread.sleep(5000);
+ List<CapturedEvent> events = listenerEvents.get("srt");
+ assertEquals(listenerEvents.toString(), 4, events.size());
+ assertEquals("AFTER_ACTION", events.get(0).stage.toString());
+ assertEquals("compute", events.get(0).actionName);
+ assertEquals("AFTER_ACTION", events.get(1).stage.toString());
+ assertEquals("execute", events.get(1).actionName);
+ assertEquals("AFTER_ACTION", events.get(2).stage.toString());
+ assertEquals("test", events.get(2).actionName);
+ assertEquals("SUCCEEDED", events.get(3).stage.toString());
+ assertNull(events.get(3).actionName);
+
+ CapturedEvent ev = events.get(0);
long now = timeSource.getTime();
// verify waitFor
assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
@@ -1482,6 +1492,14 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
assertEquals(collectionRate, totalNodeRate.get(), 5.0);
assertEquals(collectionRate, totalShardRate.get(), 5.0);
assertEquals(collectionRate, totalReplicaRate.get(), 5.0);
+
+ // check operations
+ List<Map<String, Object>> ops = (List<Map<String, Object>>)ev.context.get("properties.operations");
+ assertNotNull(ops);
+ assertTrue(ops.size() > 1);
+ for (Map<String, Object> m : ops) {
+ assertEquals("ADDREPLICA", m.get("params.action"));
+ }
}
@Test
@@ -1567,46 +1585,46 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
// todo uncomment the following code once SOLR-11714 is fixed
// find a new replica and create its metric name
-// replica = docCollection.getSlice(shardId).getReplicas().iterator().next();
-// coreName = replica.getCoreName();
-// replicaName = Utils.parseMetricsReplicaName(collectionName, coreName);
-// registry = SolrCoreMetricManager.createRegistryName(true, collectionName, shardId, replicaName, null);
-// tag = "metrics:" + registry + ":INDEX.sizeInBytes";
-//
-// setTriggerCommand = "{" +
-// "'set-trigger' : {" +
-// "'name' : 'metric_trigger'," +
-// "'event' : 'metric'," +
-// "'waitFor' : '" + waitForSeconds + "s'," +
-// "'enabled' : true," +
-// "'metric': '" + tag + "'" +
-// "'above' : 100.0," +
-// "'collection': '" + collectionName + "'" +
-// "'shard':'" + shardId + "'" +
-// "'preferredOperation':'addreplica'" +
-// "'actions' : [" +
-// "{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
-// "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
-// "{'name':'test','class':'" + TestSearchRateAction.class.getName() + "'}" +
-// "]" +
-// "}}";
-// req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
-// response = solrClient.request(req);
-// assertEquals(response.get("result").toString(), "success");
-//
-// triggerFiredLatch = new CountDownLatch(1);
-// listenerEvents.clear();
-// await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
-// assertTrue("The trigger did not fire at all", await);
-// // wait for listener to capture the SUCCEEDED stage
-// Thread.sleep(2000);
-// assertEquals(listenerEvents.toString(), 4, listenerEvents.get("srt").size());
-// ev = listenerEvents.get("srt").get(0);
-// now = timeSource.getTime();
-// // verify waitFor
-// assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
-// assertEquals(collectionName, ev.event.getProperties().get("collection"));
-// docCollection = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
-// assertEquals(3, docCollection.getReplicas().size());
+ replica = docCollection.getSlice(shardId).getReplicas().iterator().next();
+ coreName = replica.getCoreName();
+ replicaName = Utils.parseMetricsReplicaName(collectionName, coreName);
+ registry = SolrCoreMetricManager.createRegistryName(true, collectionName, shardId, replicaName, null);
+ tag = "metrics:" + registry + ":INDEX.sizeInBytes";
+
+ setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'metric_trigger'," +
+ "'event' : 'metric'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'metric': '" + tag + "'" +
+ "'above' : 100.0," +
+ "'collection': '" + collectionName + "'" +
+ "'shard':'" + shardId + "'" +
+ "'preferredOperation':'addreplica'" +
+ "'actions' : [" +
+ "{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
+ "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
+ "{'name':'test','class':'" + TestSearchRateAction.class.getName() + "'}" +
+ "]" +
+ "}}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ triggerFiredLatch = new CountDownLatch(1);
+ listenerEvents.clear();
+ await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ // wait for listener to capture the SUCCEEDED stage
+ Thread.sleep(2000);
+ assertEquals(listenerEvents.toString(), 4, listenerEvents.get("srt").size());
+ ev = listenerEvents.get("srt").get(0);
+ now = timeSource.getTime();
+ // verify waitFor
+ assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
+ assertEquals(collectionName, ev.event.getProperties().get("collection"));
+ docCollection = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
+ assertEquals(3, docCollection.getReplicas().size());
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ed7b4970/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
index 22f9fb9..a5c4443 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
@@ -324,6 +324,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* @param results result of the operation
*/
public void simAddReplica(ZkNodeProps message, NamedList results) throws Exception {
+ if (message.getStr(CommonAdminParams.ASYNC) != null) {
+ results.add(CoreAdminParams.REQUESTID, message.getStr(CommonAdminParams.ASYNC));
+ }
ClusterState clusterState = getClusterState();
DocCollection coll = clusterState.getCollection(message.getStr(ZkStateReader.COLLECTION_PROP));
AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ed7b4970/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
index 3a118f2..ce1cb46 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
@@ -41,6 +41,8 @@ import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.autoscaling.ActionContext;
+import org.apache.solr.cloud.autoscaling.ComputePlanAction;
+import org.apache.solr.cloud.autoscaling.ExecutePlanAction;
import org.apache.solr.cloud.autoscaling.NodeLostTrigger;
import org.apache.solr.cloud.autoscaling.ScheduledTriggers;
import org.apache.solr.cloud.autoscaling.TriggerActionBase;
@@ -1147,6 +1149,8 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
"'enabled' : true," +
"'rate' : 1.0," +
"'actions' : [" +
+ "{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}" +
+ "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}" +
"{'name':'test','class':'" + TestSearchRateAction.class.getName() + "'}" +
"]" +
"}}";
@@ -1160,6 +1164,7 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
"'name' : 'srt'," +
"'trigger' : 'search_rate_trigger'," +
"'stage' : ['FAILED','SUCCEEDED']," +
+ "'afterAction': ['compute', 'execute', 'test']," +
"'class' : '" + TestTriggerListener.class.getName() + "'" +
"}" +
"}";
@@ -1177,9 +1182,20 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
boolean await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
assertTrue("The trigger did not fire at all", await);
// wait for listener to capture the SUCCEEDED stage
- cluster.getTimeSource().sleep(2000);
- assertEquals(listenerEvents.toString(), 1, listenerEvents.get("srt").size());
- CapturedEvent ev = listenerEvents.get("srt").get(0);
+ cluster.getTimeSource().sleep(5000);
+ List<CapturedEvent> events = listenerEvents.get("srt");
+
+ assertEquals(listenerEvents.toString(), 4, events.size());
+ assertEquals("AFTER_ACTION", events.get(0).stage.toString());
+ assertEquals("compute", events.get(0).actionName);
+ assertEquals("AFTER_ACTION", events.get(1).stage.toString());
+ assertEquals("execute", events.get(1).actionName);
+ assertEquals("AFTER_ACTION", events.get(2).stage.toString());
+ assertEquals("test", events.get(2).actionName);
+ assertEquals("SUCCEEDED", events.get(3).stage.toString());
+ assertNull(events.get(3).actionName);
+
+ CapturedEvent ev = events.get(0);
long now = cluster.getTimeSource().getTime();
// verify waitFor
assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
@@ -1213,5 +1229,13 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
assertTrue(totalNodeRate.get() > 100.0);
assertTrue(totalShardRate.get() > 100.0);
assertTrue(totalReplicaRate.get() > 100.0);
+
+ // check operations
+ List<Map<String, Object>> ops = (List<Map<String, Object>>)ev.context.get("properties.operations");
+ assertNotNull(ops);
+ assertTrue(ops.size() > 1);
+ for (Map<String, Object> m : ops) {
+ assertEquals("ADDREPLICA", m.get("params.action"));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ed7b4970/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/AddReplicaSuggester.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/AddReplicaSuggester.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/AddReplicaSuggester.java
index 3b997bf..5ebf761 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/AddReplicaSuggester.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/AddReplicaSuggester.java
@@ -41,34 +41,35 @@ class AddReplicaSuggester extends Suggester {
Set<Pair<String, String>> shards = (Set<Pair<String, String>>) hints.getOrDefault(Hint.COLL_SHARD, Collections.emptySet());
if (shards.isEmpty()) {
throw new RuntimeException("add-replica requires 'collection' and 'shard'");
+ } else if (shards.size() > 1) {
+ throw new RuntimeException("add-replica requires exactly one hint of type " + Hint.COLL_SHARD);
}
- for (Pair<String,String> shard : shards) {
- Replica.Type type = Replica.Type.get((String) hints.get(Hint.REPLICATYPE));
- //iterate through elements and identify the least loaded
- List<Violation> leastSeriousViolation = null;
- Integer targetNodeIndex = null;
- for (int i = getMatrix().size() - 1; i >= 0; i--) {
- Row row = getMatrix().get(i);
- if (!row.isLive) continue;
- if (!isAllowed(row.node, Hint.TARGET_NODE)) continue;
- Row tmpRow = row.addReplica(shard.first(), shard.second(), type);
+ Pair<String, String> shard = shards.iterator().next();
+ Replica.Type type = Replica.Type.get((String) hints.get(Hint.REPLICATYPE));
+ //iterate through elements and identify the least loaded
+ List<Violation> leastSeriousViolation = null;
+ Integer targetNodeIndex = null;
+ for (int i = getMatrix().size() - 1; i >= 0; i--) {
+ Row row = getMatrix().get(i);
+ if (!row.isLive) continue;
+ if (!isAllowed(row.node, Hint.TARGET_NODE)) continue;
+ Row tmpRow = row.addReplica(shard.first(), shard.second(), type);
- List<Violation> errs = testChangedMatrix(strict, getModifiedMatrix(getMatrix(), tmpRow, i));
- if (!containsNewErrors(errs)) {
- if (isLessSerious(errs, leastSeriousViolation)) {
- leastSeriousViolation = errs;
- targetNodeIndex = i;
- }
+ List<Violation> errs = testChangedMatrix(strict, getModifiedMatrix(getMatrix(), tmpRow, i));
+ if (!containsNewErrors(errs)) {
+ if (isLessSerious(errs, leastSeriousViolation)) {
+ leastSeriousViolation = errs;
+ targetNodeIndex = i;
}
}
+ }
- if (targetNodeIndex != null) {// there are no rule violations
- getMatrix().set(targetNodeIndex, getMatrix().get(targetNodeIndex).addReplica(shard.first(), shard.second(), type));
- return CollectionAdminRequest
- .addReplicaToShard(shard.first(), shard.second())
- .setType(type)
- .setNode(getMatrix().get(targetNodeIndex).node);
- }
+ if (targetNodeIndex != null) {// there are no rule violations
+ getMatrix().set(targetNodeIndex, getMatrix().get(targetNodeIndex).addReplica(shard.first(), shard.second(), type));
+ return CollectionAdminRequest
+ .addReplicaToShard(shard.first(), shard.second())
+ .setType(type)
+ .setNode(getMatrix().get(targetNodeIndex).node);
}
return null;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ed7b4970/solr/solrj/src/java/org/apache/solr/common/params/AutoScalingParams.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/AutoScalingParams.java b/solr/solrj/src/java/org/apache/solr/common/params/AutoScalingParams.java
index c708b7c..0d4cc89 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/AutoScalingParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/AutoScalingParams.java
@@ -68,6 +68,7 @@ public interface AutoScalingParams {
String TRIGGER_SCHEDULE_DELAY_SECONDS = "triggerScheduleDelaySeconds";
String TRIGGER_COOLDOWN_PERIOD_SECONDS = "triggerCooldownPeriodSeconds";
String TRIGGER_CORE_POOL_SIZE = "triggerCorePoolSize";
+ String MAX_COMPUTE_OPERATIONS = "maxComputeOperations";
@Deprecated
String ACTION_THROTTLE_PERIOD_SECONDS = "actionThrottlePeriodSeconds";