You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2018/04/13 10:20:51 UTC
lucene-solr:jira/solr-11833: SOLR-11833: Add support for belowRate
monitoring and actions.
Repository: lucene-solr
Updated Branches:
refs/heads/jira/solr-11833 [created] 76461f3bc
SOLR-11833: Add support for belowRate monitoring and actions.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/76461f3b
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/76461f3b
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/76461f3b
Branch: refs/heads/jira/solr-11833
Commit: 76461f3bc1cd0e3948facc4112560cc77907b22c
Parents: 376f6c4
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Fri Apr 13 12:20:05 2018 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Fri Apr 13 12:20:05 2018 +0200
----------------------------------------------------------------------
.../cloud/autoscaling/IndexSizeTrigger.java | 4 +-
.../cloud/autoscaling/SearchRateTrigger.java | 314 ++++++++++++++++---
.../SearchRateTriggerIntegrationTest.java | 92 +++---
.../autoscaling/SearchRateTriggerTest.java | 19 +-
.../cloud/autoscaling/sim/TestLargeCluster.java | 5 +-
.../autoscaling/sim/TestTriggerIntegration.java | 11 +-
.../autoscaling/DeleteReplicaSuggester.java | 74 +++++
.../client/solrj/cloud/autoscaling/Policy.java | 2 +-
.../cloud/autoscaling/SplitShardSuggester.java | 6 +
.../solrj/cloud/autoscaling/Suggester.java | 8 +-
.../solrj/request/CollectionAdminRequest.java | 4 +
.../org/apache/solr/common/cloud/Replica.java | 2 +
12 files changed, 437 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/76461f3b/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java
index 756f88f..9978362 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java
@@ -143,11 +143,11 @@ public class IndexSizeTrigger extends TriggerBase {
String belowOpStr = String.valueOf(properties.getOrDefault(BELOW_OP_PROP, CollectionParams.CollectionAction.MERGESHARDS.toLower()));
aboveOp = CollectionParams.CollectionAction.get(aboveOpStr);
if (aboveOp == null) {
- throw new TriggerValidationException(getName(), ABOVE_OP_PROP, "unrecognized value of " + ABOVE_OP_PROP + ": '" + aboveOpStr + "'");
+ throw new TriggerValidationException(getName(), ABOVE_OP_PROP, "unrecognized value of: '" + aboveOpStr + "'");
}
belowOp = CollectionParams.CollectionAction.get(belowOpStr);
if (belowOp == null) {
- throw new TriggerValidationException(getName(), BELOW_OP_PROP, "unrecognized value of " + BELOW_OP_PROP + ": '" + belowOpStr + "'");
+ throw new TriggerValidationException(getName(), BELOW_OP_PROP, "unrecognized value of: '" + belowOpStr + "'");
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/76461f3b/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java
index 02a2d0c..ecbee25 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java
@@ -24,8 +24,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
import com.google.common.util.concurrent.AtomicDouble;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
@@ -34,6 +34,7 @@ import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.util.Pair;
@@ -49,11 +50,29 @@ import org.slf4j.LoggerFactory;
public class SearchRateTrigger extends TriggerBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ public static final String ABOVE_RATE_PROP = "aboveRate";
+ public static final String BELOW_RATE_PROP = "belowRate";
+ public static final String ABOVE_OP_PROP = "aboveOp";
+ public static final String BELOW_OP_PROP = "belowOp";
+ public static final String ABOVE_NODE_OP_PROP = "aboveNodeOp";
+ public static final String BELOW_NODE_OP_PROP = "belowNodeOp";
+
+ public static final String HOT_NODES = "hotNodes";
+ public static final String HOT_COLLECTIONS = "hotCollections";
+ public static final String HOT_SHARDS = "hotShards";
+ public static final String HOT_REPLICAS = "hotReplicas";
+ public static final String COLD_NODES = "coldNodes";
+ public static final String COLD_COLLECTIONS = "coldCollections";
+ public static final String COLD_SHARDS = "coldShards";
+ public static final String COLD_REPLICAS = "coldReplicas";
+
private String handler;
private String collection;
private String shard;
private String node;
- private double rate;
+ private double aboveRate;
+ private double belowRate;
+ private CollectionParams.CollectionAction aboveOp, belowOp, aboveNodeOp, belowNodeOp;
private final Map<String, Long> lastCollectionEvent = new ConcurrentHashMap<>();
private final Map<String, Long> lastNodeEvent = new ConcurrentHashMap<>();
private final Map<String, Long> lastShardEvent = new ConcurrentHashMap<>();
@@ -66,7 +85,10 @@ public class SearchRateTrigger extends TriggerBase {
this.state.put("lastNodeEvent", lastNodeEvent);
this.state.put("lastShardEvent", lastShardEvent);
this.state.put("lastReplicaEvent", lastReplicaEvent);
- TriggerUtils.requiredProperties(requiredProperties, validProperties, "rate");
+ TriggerUtils.requiredProperties(requiredProperties, validProperties);
+ TriggerUtils.validProperties(validProperties,
+ AutoScalingParams.COLLECTION, AutoScalingParams.SHARD, AutoScalingParams.NODE,
+ AutoScalingParams.HANDLER, ABOVE_RATE_PROP, BELOW_RATE_PROP);
}
@Override
@@ -76,16 +98,55 @@ public class SearchRateTrigger extends TriggerBase {
collection = (String)properties.getOrDefault(AutoScalingParams.COLLECTION, Policy.ANY);
shard = (String)properties.getOrDefault(AutoScalingParams.SHARD, Policy.ANY);
if (collection.equals(Policy.ANY) && !shard.equals(Policy.ANY)) {
- throw new TriggerValidationException("shard", "When 'shard' is other than #ANY then collection name must be also other than #ANY");
+ throw new TriggerValidationException(name, AutoScalingParams.SHARD, "When 'shard' is other than #ANY then collection name must be also other than #ANY");
}
node = (String)properties.getOrDefault(AutoScalingParams.NODE, Policy.ANY);
handler = (String)properties.getOrDefault(AutoScalingParams.HANDLER, "/select");
- String rateString = String.valueOf(properties.get("rate"));
- try {
- rate = Double.parseDouble(rateString);
- } catch (Exception e) {
- throw new TriggerValidationException(name, "rate", "Invalid 'rate' configuration value: '" + rateString + "': " + e.toString());
+ Object above = properties.get(ABOVE_RATE_PROP);
+ Object below = properties.get(BELOW_RATE_PROP);
+ if (above == null && below == null) {
+ throw new TriggerValidationException(name, ABOVE_RATE_PROP, "at least one of '" +
+ ABOVE_RATE_PROP + "' or '" + BELOW_RATE_PROP + "' must be set");
+ }
+ if (above != null) {
+ try {
+ aboveRate = Double.parseDouble(String.valueOf(above));
+ } catch (Exception e) {
+ throw new TriggerValidationException(name, ABOVE_RATE_PROP, "Invalid configuration value: '" + above + "': " + e.toString());
+ }
+ } else {
+ aboveRate = Double.MAX_VALUE;
+ }
+ if (below != null) {
+ try {
+ belowRate = Double.parseDouble(String.valueOf(below));
+ } catch (Exception e) {
+ throw new TriggerValidationException(name, BELOW_RATE_PROP, "Invalid configuration value: '" + below + "': " + e.toString());
+ }
+ } else {
+ belowRate = -1;
+ }
+
+ String aboveOpStr = String.valueOf(properties.getOrDefault(ABOVE_OP_PROP, CollectionParams.CollectionAction.ADDREPLICA.toLower()));
+ String belowOpStr = String.valueOf(properties.getOrDefault(BELOW_OP_PROP, CollectionParams.CollectionAction.DELETEREPLICA.toLower()));
+ aboveOp = CollectionParams.CollectionAction.get(aboveOpStr);
+ if (aboveOp == null) {
+ throw new TriggerValidationException(getName(), ABOVE_OP_PROP, "unrecognized value: '" + aboveOpStr + "'");
+ }
+ belowOp = CollectionParams.CollectionAction.get(belowOpStr);
+ if (belowOp == null) {
+ throw new TriggerValidationException(getName(), BELOW_OP_PROP, "unrecognized value: '" + belowOpStr + "'");
+ }
+ aboveOpStr = String.valueOf(properties.getOrDefault(ABOVE_NODE_OP_PROP, CollectionParams.CollectionAction.MOVEREPLICA.toLower()));
+ belowOpStr = String.valueOf(properties.getOrDefault(BELOW_NODE_OP_PROP, CollectionParams.CollectionAction.DELETENODE.toLower()));
+ aboveNodeOp = CollectionParams.CollectionAction.get(aboveOpStr);
+ if (aboveNodeOp == null) {
+ throw new TriggerValidationException(getName(), ABOVE_NODE_OP_PROP, "unrecognized value: '" + aboveOpStr + "'");
+ }
+ belowNodeOp = CollectionParams.CollectionAction.get(belowOpStr);
+ if (belowNodeOp == null) {
+ throw new TriggerValidationException(getName(), BELOW_NODE_OP_PROP, "unrecognized value: '" + belowOpStr + "'");
}
}
@@ -146,18 +207,31 @@ public class SearchRateTrigger extends TriggerBase {
return;
}
+ // collection, shard, list(replica + rate)
Map<String, Map<String, List<ReplicaInfo>>> collectionRates = new HashMap<>();
+ // node, rate
Map<String, AtomicDouble> nodeRates = new HashMap<>();
- Map<String, Integer> replicationFactors = new HashMap<>();
+ // this replication factor only considers replica types that are searchable
+ // collection, shard, RF
+ Map<String, Map<String, AtomicInteger>> searchableReplicationFactors = new HashMap<>();
for (String node : cloudManager.getClusterStateProvider().getLiveNodes()) {
Map<String, ReplicaInfo> metricTags = new HashMap<>();
// coll, shard, replica
Map<String, Map<String, List<ReplicaInfo>>> infos = cloudManager.getNodeStateProvider().getReplicaInfo(node, Collections.emptyList());
infos.forEach((coll, shards) -> {
- replicationFactors.computeIfAbsent(coll, c -> shards.size());
+ Map<String, AtomicInteger> replPerShard = searchableReplicationFactors.computeIfAbsent(coll, c -> new HashMap<>());
shards.forEach((sh, replicas) -> {
+ AtomicInteger repl = replPerShard.computeIfAbsent(sh, s -> new AtomicInteger());
replicas.forEach(replica -> {
+ // skip PULL and non-active replicas
+ if (replica.getType().equals(Replica.Type.PULL)) {
+ return;
+ }
+ if (replica.getState() != Replica.State.ACTIVE) {
+ return;
+ }
+ repl.incrementAndGet();
// we have to translate to the metrics registry name, which uses "_replica_nN" as suffix
String replicaName = Utils.parseMetricsReplicaName(coll, replica.getCore());
if (replicaName == null) { // should never happen???
@@ -191,48 +265,74 @@ public class SearchRateTrigger extends TriggerBase {
}
long now = cloudManager.getTimeSource().getTimeNs();
+ Map<String, Double> hotNodes = new HashMap<>();
+ Map<String, Double> coldNodes = new HashMap<>();
// check for exceeded rates and filter out those with less than waitFor from previous events
- Map<String, Double> hotNodes = nodeRates.entrySet().stream()
+ nodeRates.entrySet().stream()
.filter(entry -> node.equals(Policy.ANY) || node.equals(entry.getKey()))
.filter(entry -> waitForElapsed(entry.getKey(), now, lastNodeEvent))
- .filter(entry -> entry.getValue().get() > rate)
- .collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue().get()));
+ .forEach(entry -> {
+ if (entry.getValue().get() > aboveRate) {
+ hotNodes.put(entry.getKey(), entry.getValue().get());
+ } else if (entry.getValue().get() < belowRate) {
+ coldNodes.put(entry.getKey(), entry.getValue().get());
+ }
+ });
Map<String, Map<String, Double>> hotShards = new HashMap<>();
+ Map<String, Map<String, Double>> coldShards = new HashMap<>();
List<ReplicaInfo> hotReplicas = new ArrayList<>();
+ List<ReplicaInfo> coldReplicas = new ArrayList<>();
collectionRates.forEach((coll, shardRates) -> {
shardRates.forEach((sh, replicaRates) -> {
double shardRate = replicaRates.stream()
.map(r -> {
- if (waitForElapsed(r.getCollection() + "." + r.getCore(), now, lastReplicaEvent) &&
- ((Double)r.getVariable(AutoScalingParams.RATE) > rate)) {
- hotReplicas.add(r);
+ if (waitForElapsed(r.getCollection() + "." + r.getCore(), now, lastReplicaEvent)) {
+ if (((Double)r.getVariable(AutoScalingParams.RATE) > aboveRate)) {
+ hotReplicas.add(r);
+ } else if (((Double)r.getVariable(AutoScalingParams.RATE) < belowRate)) {
+ coldReplicas.add(r);
+ }
}
return r;
})
.mapToDouble(r -> (Double)r.getVariable(AutoScalingParams.RATE)).sum();
if (waitForElapsed(coll + "." + sh, now, lastShardEvent) &&
- (shardRate > rate) &&
(collection.equals(Policy.ANY) || collection.equals(coll)) &&
(shard.equals(Policy.ANY) || shard.equals(sh))) {
- hotShards.computeIfAbsent(coll, s -> new HashMap<>()).put(sh, shardRate);
+ if (shardRate > aboveRate) {
+ hotShards.computeIfAbsent(coll, s -> new HashMap<>()).put(sh, shardRate);
+ } else if (shardRate < belowRate) {
+ coldShards.computeIfAbsent(coll, s -> new HashMap<>()).put(sh, shardRate);
+ }
}
});
});
Map<String, Double> hotCollections = new HashMap<>();
+ Map<String, Double> coldCollections = new HashMap<>();
collectionRates.forEach((coll, shardRates) -> {
double total = shardRates.entrySet().stream()
.mapToDouble(e -> e.getValue().stream()
.mapToDouble(r -> (Double)r.getVariable(AutoScalingParams.RATE)).sum()).sum();
if (waitForElapsed(coll, now, lastCollectionEvent) &&
- (total > rate) &&
(collection.equals(Policy.ANY) || collection.equals(coll))) {
- hotCollections.put(coll, total);
+ if (total > aboveRate) {
+ hotCollections.put(coll, total);
+ } else if (total < belowRate) {
+ coldCollections.put(coll, total);
+ }
}
});
- if (hotCollections.isEmpty() && hotShards.isEmpty() && hotReplicas.isEmpty() && hotNodes.isEmpty()) {
+ if (hotCollections.isEmpty() &&
+ hotShards.isEmpty() &&
+ hotReplicas.isEmpty() &&
+ hotNodes.isEmpty() &&
+ coldCollections.isEmpty() &&
+ coldShards.isEmpty() &&
+ coldReplicas.isEmpty() &&
+ coldNodes.isEmpty()) {
return;
}
@@ -246,6 +346,12 @@ public class SearchRateTrigger extends TriggerBase {
eventTime.set(time);
}
});
+ coldCollections.forEach((c, r) -> {
+ long time = lastCollectionEvent.get(c);
+ if (eventTime.get() > time) {
+ eventTime.set(time);
+ }
+ });
hotShards.forEach((c, shards) -> {
shards.forEach((s, r) -> {
long time = lastShardEvent.get(c + "." + s);
@@ -254,26 +360,80 @@ public class SearchRateTrigger extends TriggerBase {
}
});
});
+ coldShards.forEach((c, shards) -> {
+ shards.forEach((s, r) -> {
+ long time = lastShardEvent.get(c + "." + s);
+ if (eventTime.get() > time) {
+ eventTime.set(time);
+ }
+ });
+ });
hotReplicas.forEach(r -> {
long time = lastReplicaEvent.get(r.getCollection() + "." + r.getCore());
if (eventTime.get() > time) {
eventTime.set(time);
}
});
+ coldReplicas.forEach(r -> {
+ long time = lastReplicaEvent.get(r.getCollection() + "." + r.getCore());
+ if (eventTime.get() > time) {
+ eventTime.set(time);
+ }
+ });
hotNodes.forEach((n, r) -> {
long time = lastNodeEvent.get(n);
if (eventTime.get() > time) {
eventTime.set(time);
}
});
+ coldNodes.forEach((n, r) -> {
+ long time = lastNodeEvent.get(n);
+ if (eventTime.get() > time) {
+ eventTime.set(time);
+ }
+ });
+
+ final List<TriggerEvent.Op> ops = new ArrayList<>();
+
+ calculateHotOps(ops, searchableReplicationFactors, hotNodes, hotCollections, hotShards, hotReplicas);
+ calculateColdOps(ops, searchableReplicationFactors, coldNodes, coldCollections, coldShards, coldReplicas);
+
+ if (ops.isEmpty()) {
+ return;
+ }
+ if (processor.process(new SearchRateEvent(getName(), eventTime.get(), ops,
+ hotNodes, hotCollections, hotShards, hotReplicas,
+ coldNodes, coldCollections, coldShards, coldReplicas))) {
+ // update lastEvent times
+ hotNodes.keySet().forEach(node -> lastNodeEvent.put(node, now));
+ coldNodes.keySet().forEach(node -> lastNodeEvent.put(node, now));
+ hotCollections.keySet().forEach(coll -> lastCollectionEvent.put(coll, now));
+ coldCollections.keySet().forEach(coll -> lastCollectionEvent.put(coll, now));
+ hotShards.entrySet().forEach(e -> e.getValue()
+ .forEach((sh, rate) -> lastShardEvent.put(e.getKey() + "." + sh, now)));
+ coldShards.entrySet().forEach(e -> e.getValue()
+ .forEach((sh, rate) -> lastShardEvent.put(e.getKey() + "." + sh, now)));
+ hotReplicas.forEach(r -> lastReplicaEvent.put(r.getCollection() + "." + r.getCore(), now));
+ coldReplicas.forEach(r -> lastReplicaEvent.put(r.getCollection() + "." + r.getCore(), now));
+ }
+ }
+
+ private void calculateHotOps(List<TriggerEvent.Op> ops,
+ Map<String, Map<String, AtomicInteger>> searchableReplicationFactors,
+ Map<String, Double> hotNodes,
+ Map<String, Double> hotCollections,
+ Map<String, Map<String, Double>> hotShards,
+ List<ReplicaInfo> hotReplicas) {
// calculate the number of replicas to add to each hot shard, based on how much the rate was
// exceeded - but within limits.
- final List<TriggerEvent.Op> ops = new ArrayList<>();
- if (hotShards.isEmpty() && hotCollections.isEmpty() && hotReplicas.isEmpty()) {
+
+ // first resolve a situation when only a node is hot but no collection / shard / replica is hot
+ // TODO: eventually we may want to commission a new node
+ if (!hotNodes.isEmpty() && hotShards.isEmpty() && hotCollections.isEmpty() && hotReplicas.isEmpty()) {
// move replicas around
hotNodes.forEach((n, r) -> {
- ops.add(new TriggerEvent.Op(CollectionParams.CollectionAction.MOVEREPLICA, Suggester.Hint.SRC_NODE, n));
+ ops.add(new TriggerEvent.Op(aboveNodeOp, Suggester.Hint.SRC_NODE, n));
});
} else {
// add replicas
@@ -283,7 +443,7 @@ public class SearchRateTrigger extends TriggerBase {
List<Pair<String, String>> perShard = hints
.computeIfAbsent(coll, c -> new HashMap<>())
.computeIfAbsent(s, sh -> new ArrayList<>());
- addHints(coll, s, r, replicationFactors.get(coll), perShard);
+ addReplicaHints(coll, s, r, searchableReplicationFactors.get(coll).get(s).get(), perShard);
}));
hotReplicas.forEach(ri -> {
double r = (Double)ri.getVariable(AutoScalingParams.RATE);
@@ -292,30 +452,27 @@ public class SearchRateTrigger extends TriggerBase {
.computeIfAbsent(ri.getCollection(), c -> new HashMap<>())
.computeIfAbsent(ri.getShard(), sh -> new ArrayList<>());
if (perShard.isEmpty()) {
- addHints(ri.getCollection(), ri.getShard(), r, replicationFactors.get(ri.getCollection()), perShard);
+ addReplicaHints(ri.getCollection(), ri.getShard(), r, searchableReplicationFactors.get(ri.getCollection()).get(ri.getShard()).get(), perShard);
}
});
hints.values().forEach(m -> m.values().forEach(lst -> lst.forEach(p -> {
- ops.add(new TriggerEvent.Op(CollectionParams.CollectionAction.ADDREPLICA, Suggester.Hint.COLL_SHARD, p));
+ ops.add(new TriggerEvent.Op(aboveOp, Suggester.Hint.COLL_SHARD, p));
})));
}
- if (processor.process(new SearchRateEvent(getName(), eventTime.get(), ops, hotNodes, hotCollections, hotShards, hotReplicas))) {
- // update lastEvent times
- hotNodes.keySet().forEach(node -> lastNodeEvent.put(node, now));
- hotCollections.keySet().forEach(coll -> lastCollectionEvent.put(coll, now));
- hotShards.entrySet().forEach(e -> e.getValue()
- .forEach((sh, rate) -> lastShardEvent.put(e.getKey() + "." + sh, now)));
- hotReplicas.forEach(r -> lastReplicaEvent.put(r.getCollection() + "." + r.getCore(), now));
- }
}
- private void addHints(String collection, String shard, double r, int replicationFactor, List<Pair<String, String>> hints) {
- int numReplicas = (int)Math.round((r - rate) / (double) replicationFactor);
+ /**
+ * This method implements a primitive form of proportional controller with a limiter.
+ */
+ private void addReplicaHints(String collection, String shard, double r, int replicationFactor, List<Pair<String, String>> hints) {
+ int numReplicas = (int)Math.round((r - aboveRate) / (double) replicationFactor);
+ // in one event add at least 1 replica
if (numReplicas < 1) {
numReplicas = 1;
}
+ // ... and at most 3 replicas
if (numReplicas > 3) {
numReplicas = 3;
}
@@ -324,6 +481,65 @@ public class SearchRateTrigger extends TriggerBase {
}
}
+ private void calculateColdOps(List<TriggerEvent.Op> ops,
+ Map<String, Map<String, AtomicInteger>> searchableReplicationFactors,
+ Map<String, Double> coldNodes,
+ Map<String, Double> coldCollections,
+ Map<String, Map<String, Double>> coldShards,
+ List<ReplicaInfo> coldReplicas) {
+ // COLD NODES:
+ // Unlike in case of hot nodes, if a node is cold then any monitored
+ // collections / shards / replicas located on that node are cold, too.
+ // HOWEVER, we check only non-pull replicas and only from selected collections / shards,
+ // so deleting a cold node is dangerous because it may interfere with these
+ // non-monitored resources
+ /*
+ coldNodes.forEach((node, rate) -> {
+ ops.add(new TriggerEvent.Op(belowNodeOp, Suggester.Hint.SRC_NODE, node));
+ });
+ */
+
+ // COLD COLLECTIONS
+ // Probably can't do anything reasonable about whole cold collections
+ // because they may be needed even if not used.
+
+ // COLD SHARDS:
+ // Cold shards mean that there are too many replicas per shard - but it also
+ // means that all replicas in these shards are cold too, so we can simply
+ // address this by deleting cold replicas
+
+ // COLD REPLICAS:
+ // Remove cold replicas but only when there's at least one more searchable replica
+ // still available (additional non-searchable replicas may exist, too)
+ Map<String, Map<String, List<ReplicaInfo>>> byCollectionByShard = new HashMap<>();
+ coldReplicas.forEach(ri -> {
+ byCollectionByShard.computeIfAbsent(ri.getCollection(), c -> new HashMap<>())
+ .computeIfAbsent(ri.getShard(), s -> new ArrayList<>())
+ .add(ri);
+ });
+ byCollectionByShard.forEach((coll, shards) -> {
+ shards.forEach((shard, replicas) -> {
+ // only delete if there's at least one searchable replica left
+ // again, use a simple proportional controller with a limiter
+ int rf = searchableReplicationFactors.get(coll).get(shard).get();
+ if (rf > replicas.size()) {
+ // delete at most 3 replicas at a time
+ AtomicInteger limit = new AtomicInteger(3);
+ replicas.forEach(ri -> {
+ if (limit.get() == 0) {
+ return;
+ }
+ TriggerEvent.Op op = new TriggerEvent.Op(belowOp,
+ Suggester.Hint.COLL_SHARD, new Pair<>(ri.getCollection(), ri.getShard()));
+ op.addHint(Suggester.Hint.REPLICA, ri.getName());
+ ops.add(op);
+ limit.decrementAndGet();
+ });
+ }
+ });
+ });
+ }
+
private boolean waitForElapsed(String name, long now, Map<String, Long> lastEventMap) {
Long lastTime = lastEventMap.computeIfAbsent(name, s -> now);
long elapsed = TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS);
@@ -335,15 +551,25 @@ public class SearchRateTrigger extends TriggerBase {
}
public static class SearchRateEvent extends TriggerEvent {
- public SearchRateEvent(String source, long eventTime, List<Op> ops, Map<String, Double> hotNodes,
+ public SearchRateEvent(String source, long eventTime, List<Op> ops,
+ Map<String, Double> hotNodes,
Map<String, Double> hotCollections,
- Map<String, Map<String, Double>> hotShards, List<ReplicaInfo> hotReplicas) {
+ Map<String, Map<String, Double>> hotShards,
+ List<ReplicaInfo> hotReplicas,
+ Map<String, Double> coldNodes,
+ Map<String, Double> coldCollections,
+ Map<String, Map<String, Double>> coldShards,
+ List<ReplicaInfo> coldReplicas) {
super(TriggerEventType.SEARCHRATE, source, eventTime, null);
properties.put(TriggerEvent.REQUESTED_OPS, ops);
- properties.put(AutoScalingParams.COLLECTION, hotCollections);
- properties.put(AutoScalingParams.SHARD, hotShards);
- properties.put(AutoScalingParams.REPLICA, hotReplicas);
- properties.put(AutoScalingParams.NODE, hotNodes);
+ properties.put(HOT_NODES, hotNodes);
+ properties.put(HOT_COLLECTIONS, hotCollections);
+ properties.put(HOT_SHARDS, hotShards);
+ properties.put(HOT_REPLICAS, hotReplicas);
+ properties.put(COLD_NODES, coldNodes);
+ properties.put(COLD_COLLECTIONS, coldCollections);
+ properties.put(COLD_SHARDS, coldShards);
+ properties.put(COLD_REPLICAS, coldReplicas);
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/76461f3b/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerIntegrationTest.java
index 796670a..c1412ab 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerIntegrationTest.java
@@ -65,6 +65,7 @@ public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
private static int waitForSeconds = 1;
private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
private static Map<String, List<CapturedEvent>> listenerEvents = new HashMap<>();
+ static CountDownLatch finished = new CountDownLatch(1);
@BeforeClass
public static void setupCluster() throws Exception {
@@ -82,7 +83,7 @@ public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
}
@Test
- public void testSearchRate() throws Exception {
+ public void testAboveSearchRate() throws Exception {
CloudSolrClient solrClient = cluster.getSolrClient();
String COLL1 = "collection1";
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
@@ -94,59 +95,76 @@ public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
"'event' : 'searchRate'," +
"'waitFor' : '" + waitForSeconds + "s'," +
"'enabled' : true," +
- "'rate' : 1.0," +
+ "'collection' : '" + COLL1 + "'," +
+ "'aboveRate' : 1.0," +
+ "'belowRate' : 0.1," +
"'actions' : [" +
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
- "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
- "{'name':'test','class':'" + TestSearchRateAction.class.getName() + "'}" +
+ "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}" +
"]" +
"}}";
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
NamedList<Object> response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
- String setListenerCommand1 = "{" +
+ String setListenerCommand = "{" +
"'set-listener' : " +
"{" +
"'name' : 'srt'," +
"'trigger' : 'search_rate_trigger'," +
"'stage' : ['FAILED','SUCCEEDED']," +
- "'afterAction': ['compute', 'execute', 'test']," +
- "'class' : '" + TestTriggerListener.class.getName() + "'" +
+ "'afterAction': ['compute', 'execute']," +
+ "'class' : '" + CapturingTriggerListener.class.getName() + "'" +
"}" +
"}";
- req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand1);
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
+
+ setListenerCommand = "{" +
+ "'set-listener' : " +
+ "{" +
+ "'name' : 'finished'," +
+ "'trigger' : 'search_rate_trigger'," +
+ "'stage' : ['SUCCEEDED']," +
+ "'class' : '" + FinishedProcessingListener.class.getName() + "'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
+
SolrParams query = params(CommonParams.Q, "*:*");
for (int i = 0; i < 500; i++) {
solrClient.query(COLL1, query);
}
- boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+
+ boolean await = finished.await(20, TimeUnit.SECONDS);
assertTrue("The trigger did not fire at all", await);
- // wait for listener to capture the SUCCEEDED stage
- Thread.sleep(5000);
+
+ timeSource.sleep(5000);
+
List<CapturedEvent> events = listenerEvents.get("srt");
- assertEquals(listenerEvents.toString(), 4, events.size());
+ assertEquals(listenerEvents.toString(), 3, events.size());
assertEquals("AFTER_ACTION", events.get(0).stage.toString());
assertEquals("compute", events.get(0).actionName);
assertEquals("AFTER_ACTION", events.get(1).stage.toString());
assertEquals("execute", events.get(1).actionName);
- assertEquals("AFTER_ACTION", events.get(2).stage.toString());
- assertEquals("test", events.get(2).actionName);
- assertEquals("SUCCEEDED", events.get(3).stage.toString());
- assertNull(events.get(3).actionName);
+ assertEquals("SUCCEEDED", events.get(2).stage.toString());
+ assertNull(events.get(2).actionName);
CapturedEvent ev = events.get(0);
long now = timeSource.getTimeNs();
// verify waitFor
assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
- Map<String, Double> nodeRates = (Map<String, Double>) ev.event.getProperties().get("node");
+ Map<String, Double> nodeRates = (Map<String, Double>) ev.event.getProperties().get(SearchRateTrigger.HOT_NODES);
assertNotNull("nodeRates", nodeRates);
assertTrue(nodeRates.toString(), nodeRates.size() > 0);
AtomicDouble totalNodeRate = new AtomicDouble();
nodeRates.forEach((n, r) -> totalNodeRate.addAndGet(r));
- List<ReplicaInfo> replicaRates = (List<ReplicaInfo>) ev.event.getProperties().get("replica");
+ List<ReplicaInfo> replicaRates = (List<ReplicaInfo>) ev.event.getProperties().get(SearchRateTrigger.HOT_REPLICAS);
assertNotNull("replicaRates", replicaRates);
assertTrue(replicaRates.toString(), replicaRates.size() > 0);
AtomicDouble totalReplicaRate = new AtomicDouble();
@@ -154,7 +172,7 @@ public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
assertTrue(r.toString(), r.getVariable("rate") != null);
totalReplicaRate.addAndGet((Double) r.getVariable("rate"));
});
- Map<String, Object> shardRates = (Map<String, Object>) ev.event.getProperties().get("shard");
+ Map<String, Object> shardRates = (Map<String, Object>) ev.event.getProperties().get(SearchRateTrigger.HOT_SHARDS);
assertNotNull("shardRates", shardRates);
assertEquals(shardRates.toString(), 1, shardRates.size());
shardRates = (Map<String, Object>) shardRates.get(COLL1);
@@ -162,7 +180,7 @@ public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
assertEquals(shardRates.toString(), 1, shardRates.size());
AtomicDouble totalShardRate = new AtomicDouble();
shardRates.forEach((s, r) -> totalShardRate.addAndGet((Double) r));
- Map<String, Double> collectionRates = (Map<String, Double>) ev.event.getProperties().get("collection");
+ Map<String, Double> collectionRates = (Map<String, Double>) ev.event.getProperties().get(SearchRateTrigger.HOT_COLLECTIONS);
assertNotNull("collectionRates", collectionRates);
assertEquals(collectionRates.toString(), 1, collectionRates.size());
Double collectionRate = collectionRates.get(COLL1);
@@ -181,27 +199,12 @@ public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
}
}
- public static class TestSearchRateAction extends TriggerActionBase {
+ @Test
+ public void testBelowSearchRate() throws Exception {
- @Override
- public void process(TriggerEvent event, ActionContext context) throws Exception {
- try {
- events.add(event);
- long currentTimeNanos = timeSource.getTimeNs();
- long eventTimeNanos = event.getEventTime();
- long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
- if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
- fail(event.source + " was fired before the configured waitFor period");
- }
- triggerFiredLatch.countDown();
- } catch (Throwable t) {
- log.debug("--throwable", t);
- throw t;
- }
- }
}
- public static class TestTriggerListener extends TriggerListenerBase {
+ public static class CapturingTriggerListener extends TriggerListenerBase {
@Override
public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) throws TriggerValidationException {
super.configure(loader, cloudManager, config);
@@ -212,7 +215,18 @@ public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
ActionContext context, Throwable error, String message) {
List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
- lst.add(new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message));
+ CapturedEvent ev = new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message);
+ log.info("=======> " + ev);
+ lst.add(ev);
}
}
+
+ public static class FinishedProcessingListener extends TriggerListenerBase {
+
+ @Override
+ public void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName, ActionContext context, Throwable error, String message) throws Exception {
+ finished.countDown();
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/76461f3b/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerTest.java
index 1c72649..50dd71b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerTest.java
@@ -79,7 +79,7 @@ public class SearchRateTriggerTest extends SolrCloudTestCase {
SolrCloudManager cloudManager = new SolrClientCloudManager(new ZkDistributedQueueFactory(zkClient), cluster.getSolrClient());
URL baseUrl = cluster.getJettySolrRunners().get(1).getBaseUrl();
long waitForSeconds = 5 + random().nextInt(5);
- Map<String, Object> props = createTriggerProps(waitForSeconds, rate);
+ Map<String, Object> props = createTriggerProps(waitForSeconds, rate, -1);
final List<TriggerEvent> events = new ArrayList<>();
CloudSolrClient solrClient = cluster.getSolrClient();
@@ -107,7 +107,7 @@ public class SearchRateTriggerTest extends SolrCloudTestCase {
assertEquals(1, events.size());
TriggerEvent event = events.get(0);
assertEquals(TriggerEventType.SEARCHRATE, event.eventType);
- List<ReplicaInfo> infos = (List<ReplicaInfo>)event.getProperty(AutoScalingParams.REPLICA);
+ List<ReplicaInfo> infos = (List<ReplicaInfo>)event.getProperty(SearchRateTrigger.HOT_REPLICAS);
assertEquals(1, infos.size());
ReplicaInfo info = infos.get(0);
assertEquals(coreName, info.getCore());
@@ -125,7 +125,7 @@ public class SearchRateTriggerTest extends SolrCloudTestCase {
// should generate collection event
assertEquals(1, events.size());
TriggerEvent event = events.get(0);
- Map<String, Double> hotCollections = (Map<String, Double>)event.getProperty(AutoScalingParams.COLLECTION);
+ Map<String, Double> hotCollections = (Map<String, Double>)event.getProperty(SearchRateTrigger.HOT_COLLECTIONS);
assertEquals(1, hotCollections.size());
Double Rate = hotCollections.get(COLL1);
assertNotNull(Rate);
@@ -140,10 +140,10 @@ public class SearchRateTriggerTest extends SolrCloudTestCase {
// should generate node and collection event but not for COLL2 because of waitFor
assertEquals(1, events.size());
event = events.get(0);
- Map<String, Double> hotNodes = (Map<String, Double>)event.getProperty(AutoScalingParams.NODE);
+ Map<String, Double> hotNodes = (Map<String, Double>)event.getProperty(SearchRateTrigger.HOT_NODES);
assertEquals(3, hotNodes.size());
hotNodes.forEach((n, r) -> assertTrue(n, r > rate));
- hotCollections = (Map<String, Double>)event.getProperty(AutoScalingParams.COLLECTION);
+ hotCollections = (Map<String, Double>)event.getProperty(SearchRateTrigger.HOT_COLLECTIONS);
assertEquals(2, hotCollections.size());
Rate = hotCollections.get(COLL1);
assertNotNull(Rate);
@@ -160,21 +160,22 @@ public class SearchRateTriggerTest extends SolrCloudTestCase {
trigger.run();
// should generate node and collection event
assertEquals(1, events.size());
- hotCollections = (Map<String, Double>)event.getProperty(AutoScalingParams.COLLECTION);
+ hotCollections = (Map<String, Double>)event.getProperty(SearchRateTrigger.HOT_COLLECTIONS);
assertEquals(2, hotCollections.size());
Rate = hotCollections.get(COLL1);
assertNotNull(Rate);
Rate = hotCollections.get(COLL2);
assertNotNull(Rate);
- hotNodes = (Map<String, Double>)event.getProperty(AutoScalingParams.NODE);
+ hotNodes = (Map<String, Double>)event.getProperty(SearchRateTrigger.HOT_NODES);
assertEquals(3, hotNodes.size());
hotNodes.forEach((n, r) -> assertTrue(n, r > rate));
}
}
- private Map<String, Object> createTriggerProps(long waitForSeconds, double rate) {
+ private Map<String, Object> createTriggerProps(long waitForSeconds, double aboveRate, double belowRate) {
Map<String, Object> props = new HashMap<>();
- props.put("rate", rate);
+ props.put("aboveRate", aboveRate);
+ props.put("belowRate", belowRate);
props.put("event", "searchRate");
props.put("waitFor", waitForSeconds);
props.put("enabled", true);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/76461f3b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java
index 129f18c..6d53363 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java
@@ -46,6 +46,7 @@ import org.apache.solr.cloud.CloudTestUtils;
import org.apache.solr.cloud.autoscaling.ActionContext;
import org.apache.solr.cloud.autoscaling.ComputePlanAction;
import org.apache.solr.cloud.autoscaling.ExecutePlanAction;
+import org.apache.solr.cloud.autoscaling.SearchRateTrigger;
import org.apache.solr.cloud.autoscaling.TriggerActionBase;
import org.apache.solr.cloud.autoscaling.TriggerEvent;
import org.apache.solr.cloud.autoscaling.TriggerListenerBase;
@@ -549,7 +550,7 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
"'name' : 'search_rate_trigger'," +
"'event' : 'searchRate'," +
"'waitFor' : '" + waitForSeconds + "s'," +
- "'rate' : 1.0," +
+ "'aboveRate' : 1.0," +
"'enabled' : true," +
"'actions' : [" +
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
@@ -581,7 +582,7 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
assertEquals(listenerEvents.toString(), 1, listenerEvents.get("srt").size());
CapturedEvent ev = listenerEvents.get("srt").get(0);
assertEquals(TriggerEventType.SEARCHRATE, ev.event.getEventType());
- Map<String, Number> m = (Map<String, Number>)ev.event.getProperty("node");
+ Map<String, Number> m = (Map<String, Number>)ev.event.getProperty(SearchRateTrigger.HOT_NODES);
assertNotNull(m);
assertEquals(nodes.size(), m.size());
assertEquals(nodes, m.keySet());
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/76461f3b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
index c898dbc..f645ba4 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
@@ -46,6 +46,7 @@ import org.apache.solr.cloud.autoscaling.ComputePlanAction;
import org.apache.solr.cloud.autoscaling.ExecutePlanAction;
import org.apache.solr.cloud.autoscaling.NodeLostTrigger;
import org.apache.solr.cloud.autoscaling.ScheduledTriggers;
+import org.apache.solr.cloud.autoscaling.SearchRateTrigger;
import org.apache.solr.cloud.autoscaling.TriggerActionBase;
import org.apache.solr.cloud.autoscaling.TriggerEvent;
import org.apache.solr.cloud.autoscaling.TriggerEventQueue;
@@ -1162,7 +1163,7 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
"'event' : 'searchRate'," +
"'waitFor' : '" + waitForSeconds + "s'," +
"'enabled' : true," +
- "'rate' : 1.0," +
+ "'aboveRate' : 1.0," +
"'actions' : [" +
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}" +
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}" +
@@ -1214,12 +1215,12 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
long now = cluster.getTimeSource().getTimeNs();
// verify waitFor
assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
- Map<String, Double> nodeRates = (Map<String, Double>)ev.event.getProperties().get("node");
+ Map<String, Double> nodeRates = (Map<String, Double>)ev.event.getProperties().get(SearchRateTrigger.HOT_NODES);
assertNotNull("nodeRates", nodeRates);
assertTrue(nodeRates.toString(), nodeRates.size() > 0);
AtomicDouble totalNodeRate = new AtomicDouble();
nodeRates.forEach((n, r) -> totalNodeRate.addAndGet(r));
- List<ReplicaInfo> replicaRates = (List<ReplicaInfo>)ev.event.getProperties().get("replica");
+ List<ReplicaInfo> replicaRates = (List<ReplicaInfo>)ev.event.getProperties().get(SearchRateTrigger.HOT_REPLICAS);
assertNotNull("replicaRates", replicaRates);
assertTrue(replicaRates.toString(), replicaRates.size() > 0);
AtomicDouble totalReplicaRate = new AtomicDouble();
@@ -1227,7 +1228,7 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
assertTrue(r.toString(), r.getVariable("rate") != null);
totalReplicaRate.addAndGet((Double)r.getVariable("rate"));
});
- Map<String, Object> shardRates = (Map<String, Object>)ev.event.getProperties().get("shard");
+ Map<String, Object> shardRates = (Map<String, Object>)ev.event.getProperties().get(SearchRateTrigger.HOT_SHARDS);
assertNotNull("shardRates", shardRates);
assertEquals(shardRates.toString(), 1, shardRates.size());
shardRates = (Map<String, Object>)shardRates.get(COLL1);
@@ -1235,7 +1236,7 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
assertEquals(shardRates.toString(), 1, shardRates.size());
AtomicDouble totalShardRate = new AtomicDouble();
shardRates.forEach((s, r) -> totalShardRate.addAndGet((Double)r));
- Map<String, Double> collectionRates = (Map<String, Double>)ev.event.getProperties().get("collection");
+ Map<String, Double> collectionRates = (Map<String, Double>)ev.event.getProperties().get(SearchRateTrigger.HOT_COLLECTIONS);
assertNotNull("collectionRates", collectionRates);
assertEquals(collectionRates.toString(), 1, collectionRates.size());
Double collectionRate = collectionRates.get(COLL1);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/76461f3b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DeleteReplicaSuggester.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DeleteReplicaSuggester.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DeleteReplicaSuggester.java
new file mode 100644
index 0000000..a7d5d70
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DeleteReplicaSuggester.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.cloud.autoscaling;
+
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.util.Pair;
+
+/**
+ * This suggester produces a DELETEREPLICA request using provided {@link Hint#COLL_SHARD} and
+ * {@link Hint#NUMBER} hints to specify the collection, shard and number of replicas to delete.
+ */
+class DeleteReplicaSuggester extends Suggester {
+
+ @Override
+ public CollectionParams.CollectionAction getAction() {
+ return CollectionParams.CollectionAction.DELETEREPLICA;
+ }
+
+ @Override
+ SolrRequest init() {
+ Set<Pair<String, String>> shards = (Set<Pair<String, String>>) hints.getOrDefault(Hint.COLL_SHARD, Collections.emptySet());
+ if (shards.isEmpty()) {
+ throw new RuntimeException("delete-replica requires 'collection' and 'shard'");
+ }
+ if (shards.size() > 1) {
+ throw new RuntimeException("delete-replica requires exactly one pair of 'collection' and 'shard'");
+ }
+ Pair<String, String> collShard = shards.iterator().next();
+ Set<Number> counts = (Set<Number>) hints.getOrDefault(Hint.NUMBER, Collections.emptySet());
+ Integer count = null;
+ if (!counts.isEmpty()) {
+ if (counts.size() > 1) {
+ throw new RuntimeException("delete-replica allows at most one number hint specifying the number of replicas to delete");
+ }
+ Number n = counts.iterator().next();
+ count = n.intValue();
+ }
+ Set<String> replicas = (Set<String>) hints.getOrDefault(Hint.REPLICA, Collections.emptySet());
+ String replica = null;
+ if (!replicas.isEmpty()) {
+ if (replicas.size() > 1) {
+ throw new RuntimeException("delete-replica allows at most one 'replica' hint");
+ }
+ replica = replicas.iterator().next();
+ }
+ if (replica == null && count == null) {
+ throw new RuntimeException("delete-replica requires either 'replica' or 'number' hint");
+ }
+ if (replica != null) {
+ return CollectionAdminRequest.deleteReplica(collShard.first(), collShard.second(), replica);
+ } else {
+ return CollectionAdminRequest.deleteReplica(collShard.first(), collShard.second(), count);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/76461f3b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
index 9496b0f..74a4d1f 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
@@ -466,7 +466,7 @@ public class Policy implements MapWriter {
static {
ops.put(CollectionAction.ADDREPLICA, () -> new AddReplicaSuggester());
- ops.put(CollectionAction.DELETEREPLICA, () -> new UnsupportedSuggester(CollectionAction.DELETEREPLICA));
+ ops.put(CollectionAction.DELETEREPLICA, () -> new DeleteReplicaSuggester());
ops.put(CollectionAction.MOVEREPLICA, () -> new MoveReplicaSuggester());
ops.put(CollectionAction.SPLITSHARD, () -> new SplitShardSuggester());
ops.put(CollectionAction.MERGESHARDS, () -> new UnsupportedSuggester(CollectionAction.MERGESHARDS));
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/76461f3b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/SplitShardSuggester.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/SplitShardSuggester.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/SplitShardSuggester.java
index 2a42d27..2c1d7df 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/SplitShardSuggester.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/SplitShardSuggester.java
@@ -21,6 +21,7 @@ import java.util.Set;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.util.Pair;
/**
@@ -29,6 +30,11 @@ import org.apache.solr.common.util.Pair;
class SplitShardSuggester extends Suggester {
@Override
+ public CollectionParams.CollectionAction getAction() {
+ return CollectionParams.CollectionAction.SPLITSHARD;
+ }
+
+ @Override
SolrRequest init() {
Set<Pair<String, String>> shards = (Set<Pair<String, String>>) hints.getOrDefault(Hint.COLL_SHARD, Collections.emptySet());
if (shards.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/76461f3b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java
index 56e1d88..eb0c63c 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java
@@ -255,7 +255,7 @@ public abstract class Suggester implements MapWriter {
Collection c = v instanceof Collection ? (Collection) v : Collections.singleton(v);
for (Object o : c) {
if (!(o instanceof Pair)) {
- throw new RuntimeException("SHARD hint must use a Pair");
+ throw new RuntimeException("COLL_SHARD hint must use a Pair");
}
Pair p = (Pair) o;
if (p.first() == null || p.second() == null) {
@@ -288,7 +288,11 @@ public abstract class Suggester implements MapWriter {
Double actualFreediskInGb = (Double) FREEDISK.validate(null, hintValVsActual.second(), false);
if (actualFreediskInGb == null) return false;
return actualFreediskInGb > hintFreediskInGb;
- });
+ }),
+ NUMBER(true, o -> {
+ if (!(o instanceof Number)) throw new RuntimeException("NUMBER hint must be a number");
+ }),
+ REPLICA(true);
public final boolean multiValued;
public final Consumer<Object> validator;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/76461f3b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index a6a4f87..96fd4e8 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -1667,6 +1667,10 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
checkNotNull(CoreAdminParams.REPLICA, replica));
}
+ public static DeleteReplica deleteReplica(String collection, String shard, int count) {
+ return new DeleteReplica(collection, checkNotNull(CoreAdminParams.SHARD, shard), count);
+ }
+
/**
* Returns a SolrRequest to remove a number of replicas from a specific shard
*/
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/76461f3b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
index 02beb97..2fb2718 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
@@ -135,6 +135,7 @@ public class Replica extends ZkNodeProps {
return name.equals(replica.name);
}
+ /** Also known as coreNodeName. */
public String getName() {
return name;
}
@@ -146,6 +147,7 @@ public class Replica extends ZkNodeProps {
return getStr(ZkStateReader.BASE_URL_PROP);
}
+ /** SolrCore name. */
public String getCoreName() {
return getStr(ZkStateReader.CORE_NAME_PROP);
}