You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2018/04/24 08:58:03 UTC
[1/2] lucene-solr:master: SOLR-11833: Allow searchRate trigger to
delete replicas.
Repository: lucene-solr
Updated Branches:
refs/heads/master 1409ab8f8 -> 0d969ab85
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0d969ab8/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java
index 129f18c..6d53363 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java
@@ -46,6 +46,7 @@ import org.apache.solr.cloud.CloudTestUtils;
import org.apache.solr.cloud.autoscaling.ActionContext;
import org.apache.solr.cloud.autoscaling.ComputePlanAction;
import org.apache.solr.cloud.autoscaling.ExecutePlanAction;
+import org.apache.solr.cloud.autoscaling.SearchRateTrigger;
import org.apache.solr.cloud.autoscaling.TriggerActionBase;
import org.apache.solr.cloud.autoscaling.TriggerEvent;
import org.apache.solr.cloud.autoscaling.TriggerListenerBase;
@@ -549,7 +550,7 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
"'name' : 'search_rate_trigger'," +
"'event' : 'searchRate'," +
"'waitFor' : '" + waitForSeconds + "s'," +
- "'rate' : 1.0," +
+ "'aboveRate' : 1.0," +
"'enabled' : true," +
"'actions' : [" +
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
@@ -581,7 +582,7 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
assertEquals(listenerEvents.toString(), 1, listenerEvents.get("srt").size());
CapturedEvent ev = listenerEvents.get("srt").get(0);
assertEquals(TriggerEventType.SEARCHRATE, ev.event.getEventType());
- Map<String, Number> m = (Map<String, Number>)ev.event.getProperty("node");
+ Map<String, Number> m = (Map<String, Number>)ev.event.getProperty(SearchRateTrigger.HOT_NODES);
assertNotNull(m);
assertEquals(nodes.size(), m.size());
assertEquals(nodes, m.keySet());
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0d969ab8/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
index 41316ae..37f1596 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
@@ -46,6 +46,7 @@ import org.apache.solr.cloud.autoscaling.ComputePlanAction;
import org.apache.solr.cloud.autoscaling.ExecutePlanAction;
import org.apache.solr.cloud.autoscaling.NodeLostTrigger;
import org.apache.solr.cloud.autoscaling.ScheduledTriggers;
+import org.apache.solr.cloud.autoscaling.SearchRateTrigger;
import org.apache.solr.cloud.autoscaling.TriggerActionBase;
import org.apache.solr.cloud.autoscaling.TriggerEvent;
import org.apache.solr.cloud.autoscaling.TriggerEventQueue;
@@ -1164,7 +1165,7 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
"'event' : 'searchRate'," +
"'waitFor' : '" + waitForSeconds + "s'," +
"'enabled' : true," +
- "'rate' : 1.0," +
+ "'aboveRate' : 1.0," +
"'actions' : [" +
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}" +
"{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}" +
@@ -1216,12 +1217,12 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
long now = cluster.getTimeSource().getTimeNs();
// verify waitFor
assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
- Map<String, Double> nodeRates = (Map<String, Double>)ev.event.getProperties().get("node");
+ Map<String, Double> nodeRates = (Map<String, Double>)ev.event.getProperties().get(SearchRateTrigger.HOT_NODES);
assertNotNull("nodeRates", nodeRates);
assertTrue(nodeRates.toString(), nodeRates.size() > 0);
AtomicDouble totalNodeRate = new AtomicDouble();
nodeRates.forEach((n, r) -> totalNodeRate.addAndGet(r));
- List<ReplicaInfo> replicaRates = (List<ReplicaInfo>)ev.event.getProperties().get("replica");
+ List<ReplicaInfo> replicaRates = (List<ReplicaInfo>)ev.event.getProperties().get(SearchRateTrigger.HOT_REPLICAS);
assertNotNull("replicaRates", replicaRates);
assertTrue(replicaRates.toString(), replicaRates.size() > 0);
AtomicDouble totalReplicaRate = new AtomicDouble();
@@ -1229,7 +1230,7 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
assertTrue(r.toString(), r.getVariable("rate") != null);
totalReplicaRate.addAndGet((Double)r.getVariable("rate"));
});
- Map<String, Object> shardRates = (Map<String, Object>)ev.event.getProperties().get("shard");
+ Map<String, Object> shardRates = (Map<String, Object>)ev.event.getProperties().get(SearchRateTrigger.HOT_SHARDS);
assertNotNull("shardRates", shardRates);
assertEquals(shardRates.toString(), 1, shardRates.size());
shardRates = (Map<String, Object>)shardRates.get(COLL1);
@@ -1237,7 +1238,7 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
assertEquals(shardRates.toString(), 1, shardRates.size());
AtomicDouble totalShardRate = new AtomicDouble();
shardRates.forEach((s, r) -> totalShardRate.addAndGet((Double)r));
- Map<String, Double> collectionRates = (Map<String, Double>)ev.event.getProperties().get("collection");
+ Map<String, Double> collectionRates = (Map<String, Double>)ev.event.getProperties().get(SearchRateTrigger.HOT_COLLECTIONS);
assertNotNull("collectionRates", collectionRates);
assertEquals(collectionRates.toString(), 1, collectionRates.size());
Double collectionRate = collectionRates.get(COLL1);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0d969ab8/solr/solr-ref-guide/src/solrcloud-autoscaling-triggers.adoc
----------------------------------------------------------------------
diff --git a/solr/solr-ref-guide/src/solrcloud-autoscaling-triggers.adoc b/solr/solr-ref-guide/src/solrcloud-autoscaling-triggers.adoc
index 432a5e9..75c5d76 100644
--- a/solr/solr-ref-guide/src/solrcloud-autoscaling-triggers.adoc
+++ b/solr/solr-ref-guide/src/solrcloud-autoscaling-triggers.adoc
@@ -184,43 +184,92 @@ operation `NONE` (which still can be monitored and acted upon by an appropriate
== Search Rate Trigger
-The search rate trigger can be used for monitoring 1-minute average search rates in a selected
-collection, and request that either replicas be moved to different nodes or new replicas be added
-to reduce the per-replica search rate for a collection or shard with search rate hot spots.
-(Future versions of Solr will also be able to automatically remove some replicas
-when search rate falls below the configured lower threshold).
+The search rate trigger can be used for monitoring search rates in a selected
+collection (1-min average rate by default), and request that either replicas be moved from
+"hot nodes" to different nodes, or new replicas be added to "hot shards" to reduce the
+per-replica search rate for a collection or shard with hot spots.
-This trigger support the following configuration:
+Similarly, if the search rate falls below a threshold then the trigger may request that some
+replicas are deleted from "cold" shards. It can also optionally issue node-level action requests
+when a cumulative node-level rate falls below a threshold.
-`collection`:: (string, optional) collection name to monitor, or any collection if empty.
+Note: this trigger calculates node-level cumulative rates using per-replica rates reported by
+replicas that are part of monitored collections / shards. This means that it may report
+some nodes as "cold" (underutilized) because it ignores other, perhaps more active, replicas
+belonging to other collections. Also, nodes that don't host any of the monitored replicas or
+those that are explicitly excluded by `node` config property won't be reported at all.
-`shard`:: (string, optional) shard name within the collection (requires `collection` to be set), or any shard if empty.
-`node`:: (string, optional) node name to monitor, or any if empty.
+This trigger support the following configuration:
-`handler`:: (string, optional) handler name whose request rate represents the search rate
-(default is `/select`). This name is used for creating the full metric key, in
-this case `solr.core.<coreName>:QUERY./select.requestTimes:1minRate`.
+`collections`:: (string, optional) comma-separated list of collection names to monitor, or any collection if empty / not set.
-`rate`:: (double, required) the upper bound for the request rate metric value.
+`shard`:: (string, optional) shard name within the collection (requires `collections` to be set to exactly one name), or any shard if empty.
-If a rate is exceeded for a node (but not for individual replicas placed on this node) then
-the action requested by this event is to move one replica (with the highest rate) to another
-node. If a rate is exceeded for a collection or shard then the action requested is to add some
-replicas - currently at least 1 and at most 3, depending on how much the rate is exceeded, proportional to
-the threshold rate and the current request rate.
+`node`:: (string, optional) node name to monitor, or any if empty.
-.Example: a search rate trigger that monitors collection "test" and adds new replicas if 1-minute average request rate of "/select" handler exceeds 100 requests/sec:
+`metric`:: (string, optional) metric name that represents the search rate
+(default is `QUERY./select.requestTimes:1minRate`). This name has to identify a single numeric
+metric value, and it may use the colon syntax for selecting one property of a complex metric.
+
+`maxOps`:: (integer, optional) maximum number of add replica / delete replica operations
+requested in a single autoscaling event. The default value is 3 and it helps to smooth out
+the changes to the number of replicas during periods of large search rate fluctuations.
+
+`minReplicas`:: (integer, optional) minimum acceptable number of searchable replicas (ie. replicas other
+than `PULL` type). The trigger will not generate any DELETEREPLICA requests when the number of
+searchable replicas in a shard reaches this threshold. When this value is not set (the default)
+the `replicationFactor` property of the collection is used, and if that property is not set then
+the value is set to 1. Note also that shard leaders are never deleted.
+
+`aboveRate`:: (float) the upper bound for the request rate metric value. At least one of
+`aboveRate` or `belowRate` must be set.
+
+`belowRate`:: (float) the lower bound for the request rate metric value. At least one of
+`aboveRate` or `belowRate` must be set.
+
+`aboveOp`:: collection action to request when the upper threshold for a shard or replica is
+exceeded. Default action is `ADDREPLICA` and the trigger will request 1 to `maxOps` operations
+per shard per event, depending how much the rate is exceeded. This property can be set to 'NONE'
+to effectively disable the action (but still report it to the listeners).
+
+`aboveNodeOp`:: collection action to request when the upper threshold for a node is exceeded.
+Default action is `MOVEREPLICA`, and the trigger will request 1 replica operation per hot node per event.
+If both `aboveOp` and `aboveNodeOp` operations are requested then `aboveNodeOp` operations are
+always requested first. This property can be set to 'NONE' to effectively disable the action (but still report it to the listeners).
+
+`belowOp`:: collection action to request when the lower threshold for a shard or replica is
+exceeded. Default action is `DELETEREPLICA`, and the trigger will request at most `maxOps` replicas
+to be deleted from eligible cold shards. This property can be set to 'NONE'
+to effectively disable the action (but still report it to the listeners).
+
+`belowNodeOp`:: action to request when the lower threshold for a node is exceeded.
+Default action is null (not set) and the condition is ignored, because in many cases the
+trigger will monitor only some selected resources (non-pull replica types from selected
+collections / shards) so setting this by default to eg. `DELETENODE` could interfere with
+these non-monitored resources. The trigger will request 1 operation per cold node per event.
+If both `belowOp` and `belowNodeOp` operations are requested then `belowOp` operations are
+always requested first.
+
+.Example:
+A search rate trigger that monitors collection "test" and adds new replicas if 5-minute
+average request rate of "/select" handler exceeds 100 requests/sec, and the condition persists
+for over 20 minutes. If the rate falls below 0.01 and persists for 20 min the trigger will
+request not only replica deletions (leaving at most 1 replica per shard) but also it may
+request node deletion.
[source,json]
----
{
"set-trigger": {
"name" : "search_rate_trigger",
"event" : "searchRate",
- "collection" : "test",
- "handler" : "/select",
- "rate" : 100.0,
- "waitFor" : "1m",
+ "collections" : "test",
+ "metric" : "QUERY./select.requestTimes:5minRate",
+ "aboveRate" : 100.0,
+ "belowRate" : 0.01,
+ "belowNodeOp" : "DELETENODE",
+ "minReplicas" : 1,
+ "waitFor" : "20m",
"enabled" : true,
"actions" : [
{
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0d969ab8/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DeleteNodeSuggester.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DeleteNodeSuggester.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DeleteNodeSuggester.java
new file mode 100644
index 0000000..cfff49e
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DeleteNodeSuggester.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.cloud.autoscaling;
+
+import java.util.Set;
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.common.params.CollectionParams;
+
+/**
+ * This suggester produces a DELETENODE request using provided {@link org.apache.solr.client.solrj.cloud.autoscaling.Suggester.Hint#SRC_NODE}.
+ */
+class DeleteNodeSuggester extends Suggester {
+
+ @Override
+ public CollectionParams.CollectionAction getAction() {
+ return CollectionParams.CollectionAction.DELETENODE;
+ }
+
+ @Override
+ SolrRequest init() {
+ Set<String> srcNodes = (Set<String>) hints.get(Hint.SRC_NODE);
+ if (srcNodes.isEmpty()) {
+ throw new RuntimeException("delete-node requires 'src_node' hint");
+ }
+ if (srcNodes.size() > 1) {
+ throw new RuntimeException("delete-node requires exactly one 'src_node' hint");
+ }
+ return CollectionAdminRequest.deleteNode(srcNodes.iterator().next());
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0d969ab8/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DeleteReplicaSuggester.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DeleteReplicaSuggester.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DeleteReplicaSuggester.java
new file mode 100644
index 0000000..9a942ad
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DeleteReplicaSuggester.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.cloud.autoscaling;
+
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.util.Pair;
+
+/**
+ * This suggester produces a DELETEREPLICA request using provided {@link org.apache.solr.client.solrj.cloud.autoscaling.Suggester.Hint#COLL_SHARD} and
+ * {@link org.apache.solr.client.solrj.cloud.autoscaling.Suggester.Hint#NUMBER} hints to specify the collection, shard and number of replicas to delete.
+ */
+class DeleteReplicaSuggester extends Suggester {
+
+ @Override
+ public CollectionParams.CollectionAction getAction() {
+ return CollectionParams.CollectionAction.DELETEREPLICA;
+ }
+
+ @Override
+ SolrRequest init() {
+ Set<Pair<String, String>> shards = (Set<Pair<String, String>>) hints.getOrDefault(Hint.COLL_SHARD, Collections.emptySet());
+ if (shards.isEmpty()) {
+ throw new RuntimeException("delete-replica requires 'collection' and 'shard'");
+ }
+ if (shards.size() > 1) {
+ throw new RuntimeException("delete-replica requires exactly one pair of 'collection' and 'shard'");
+ }
+ Pair<String, String> collShard = shards.iterator().next();
+ Set<Number> counts = (Set<Number>) hints.getOrDefault(Hint.NUMBER, Collections.emptySet());
+ Integer count = null;
+ if (!counts.isEmpty()) {
+ if (counts.size() > 1) {
+ throw new RuntimeException("delete-replica allows at most one number hint specifying the number of replicas to delete");
+ }
+ Number n = counts.iterator().next();
+ count = n.intValue();
+ }
+ Set<String> replicas = (Set<String>) hints.getOrDefault(Hint.REPLICA, Collections.emptySet());
+ String replica = null;
+ if (!replicas.isEmpty()) {
+ if (replicas.size() > 1) {
+ throw new RuntimeException("delete-replica allows at most one 'replica' hint");
+ }
+ replica = replicas.iterator().next();
+ }
+ if (replica == null && count == null) {
+ throw new RuntimeException("delete-replica requires either 'replica' or 'number' hint");
+ }
+ if (replica != null) {
+ return CollectionAdminRequest.deleteReplica(collShard.first(), collShard.second(), replica);
+ } else {
+ return CollectionAdminRequest.deleteReplica(collShard.first(), collShard.second(), count);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0d969ab8/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
index cbdb2a7..05c9c20 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
@@ -468,10 +468,12 @@ public class Policy implements MapWriter {
static {
ops.put(CollectionAction.ADDREPLICA, AddReplicaSuggester::new);
- ops.put(CollectionAction.DELETEREPLICA, () -> new UnsupportedSuggester(CollectionAction.DELETEREPLICA));
+ ops.put(CollectionAction.DELETEREPLICA, DeleteReplicaSuggester::new);
+ ops.put(CollectionAction.DELETENODE, DeleteNodeSuggester::new);
ops.put(CollectionAction.MOVEREPLICA, MoveReplicaSuggester::new);
ops.put(CollectionAction.SPLITSHARD, SplitShardSuggester::new);
ops.put(CollectionAction.MERGESHARDS, () -> new UnsupportedSuggester(CollectionAction.MERGESHARDS));
+ ops.put(CollectionAction.NONE, () -> new UnsupportedSuggester(CollectionAction.NONE));
}
public Map<String, List<Clause>> getPolicies() {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0d969ab8/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java
index e1d8281..50e77f8 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java
@@ -138,6 +138,24 @@ public class ReplicaInfo implements MapWriter {
return variables.get(name);
}
+ public Object getVariable(String name, Object defValue) {
+ Object o = variables.get(name);
+ if (o != null) {
+ return o;
+ } else {
+ return defValue;
+ }
+ }
+
+ public boolean getBool(String name, boolean defValue) {
+ Object o = getVariable(name, defValue);
+ if (o instanceof Boolean) {
+ return (Boolean)o;
+ } else {
+ return Boolean.parseBoolean(String.valueOf(o));
+ }
+ }
+
@Override
public String toString() {
return Utils.toJSONString(this);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0d969ab8/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/SplitShardSuggester.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/SplitShardSuggester.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/SplitShardSuggester.java
index 2a42d27..2c1d7df 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/SplitShardSuggester.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/SplitShardSuggester.java
@@ -21,6 +21,7 @@ import java.util.Set;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.util.Pair;
/**
@@ -29,6 +30,11 @@ import org.apache.solr.common.util.Pair;
class SplitShardSuggester extends Suggester {
@Override
+ public CollectionParams.CollectionAction getAction() {
+ return CollectionParams.CollectionAction.SPLITSHARD;
+ }
+
+ @Override
SolrRequest init() {
Set<Pair<String, String>> shards = (Set<Pair<String, String>>) hints.getOrDefault(Hint.COLL_SHARD, Collections.emptySet());
if (shards.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0d969ab8/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java
index 56e1d88..eb0c63c 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java
@@ -255,7 +255,7 @@ public abstract class Suggester implements MapWriter {
Collection c = v instanceof Collection ? (Collection) v : Collections.singleton(v);
for (Object o : c) {
if (!(o instanceof Pair)) {
- throw new RuntimeException("SHARD hint must use a Pair");
+ throw new RuntimeException("COLL_SHARD hint must use a Pair");
}
Pair p = (Pair) o;
if (p.first() == null || p.second() == null) {
@@ -288,7 +288,11 @@ public abstract class Suggester implements MapWriter {
Double actualFreediskInGb = (Double) FREEDISK.validate(null, hintValVsActual.second(), false);
if (actualFreediskInGb == null) return false;
return actualFreediskInGb > hintFreediskInGb;
- });
+ }),
+ NUMBER(true, o -> {
+ if (!(o instanceof Number)) throw new RuntimeException("NUMBER hint must be a number");
+ }),
+ REPLICA(true);
public final boolean multiValued;
public final Consumer<Object> validator;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0d969ab8/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientCloudManager.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientCloudManager.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientCloudManager.java
index 5f469f9..fcefc2f 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientCloudManager.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientCloudManager.java
@@ -55,7 +55,7 @@ import org.slf4j.LoggerFactory;
public class SolrClientCloudManager implements SolrCloudManager {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final CloudSolrClient solrClient;
+ protected final CloudSolrClient solrClient;
private final ZkDistribStateManager stateManager;
private final DistributedQueueFactory queueFactory;
private final ZkStateReader zkStateReader;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0d969ab8/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index a6a4f87..4c74a58 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -553,6 +553,13 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
}
}
+ /**
+ * Returns a SolrRequest to delete a node.
+ */
+ public static DeleteNode deleteNode(String node) {
+ return new DeleteNode(node);
+ }
+
public static class DeleteNode extends AsyncCollectionAdminRequest {
String node;
@@ -1667,6 +1674,10 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
checkNotNull(CoreAdminParams.REPLICA, replica));
}
+ public static DeleteReplica deleteReplica(String collection, String shard, int count) {
+ return new DeleteReplica(collection, checkNotNull(CoreAdminParams.SHARD, shard), count);
+ }
+
/**
* Returns a SolrRequest to remove a number of replicas from a specific shard
*/
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0d969ab8/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
index 02beb97..2fb2718 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
@@ -135,6 +135,7 @@ public class Replica extends ZkNodeProps {
return name.equals(replica.name);
}
+ /** Also known as coreNodeName. */
public String getName() {
return name;
}
@@ -146,6 +147,7 @@ public class Replica extends ZkNodeProps {
return getStr(ZkStateReader.BASE_URL_PROP);
}
+ /** SolrCore name. */
public String getCoreName() {
return getStr(ZkStateReader.CORE_NAME_PROP);
}
[2/2] lucene-solr:master: SOLR-11833: Allow searchRate trigger to
delete replicas.
Posted by ab...@apache.org.
SOLR-11833: Allow searchRate trigger to delete replicas.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/0d969ab8
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/0d969ab8
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/0d969ab8
Branch: refs/heads/master
Commit: 0d969ab85d5b16a960f5b8f7735a4ed267975553
Parents: 1409ab8
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Mon Apr 23 22:19:01 2018 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Mon Apr 23 22:19:01 2018 +0200
----------------------------------------------------------------------
solr/CHANGES.txt | 4 +-
.../cloud/autoscaling/IndexSizeTrigger.java | 4 +-
.../cloud/autoscaling/SearchRateTrigger.java | 487 +++++++++++++--
.../org/apache/solr/cloud/CloudTestUtils.java | 8 +-
.../autoscaling/AutoScalingHandlerTest.java | 4 +-
.../SearchRateTriggerIntegrationTest.java | 592 +++++++++++++++++--
.../autoscaling/SearchRateTriggerTest.java | 201 ++++++-
.../cloud/autoscaling/sim/TestLargeCluster.java | 5 +-
.../autoscaling/sim/TestTriggerIntegration.java | 11 +-
.../src/solrcloud-autoscaling-triggers.adoc | 95 ++-
.../cloud/autoscaling/DeleteNodeSuggester.java | 46 ++
.../autoscaling/DeleteReplicaSuggester.java | 74 +++
.../client/solrj/cloud/autoscaling/Policy.java | 4 +-
.../solrj/cloud/autoscaling/ReplicaInfo.java | 18 +
.../cloud/autoscaling/SplitShardSuggester.java | 6 +
.../solrj/cloud/autoscaling/Suggester.java | 8 +-
.../solrj/impl/SolrClientCloudManager.java | 2 +-
.../solrj/request/CollectionAdminRequest.java | 11 +
.../org/apache/solr/common/cloud/Replica.java | 2 +
19 files changed, 1410 insertions(+), 172 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0d969ab8/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index f131c07..fb49903 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -182,9 +182,11 @@ Bug Fixes
* SOLR-12250: NegativeArraySizeException on TransactionLog if previous document more than 1.9GB (Cao Manh Dat)
-
* SOLR-12253: Remove optimize button from the core admin page (Erick Erickson)
+* SOLR-11833: Allow searchRate trigger to delete replicas. Improve configurability of the trigger by specifying
+ upper / lower thresholds and respective actions (ab)
+
Optimizations
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0d969ab8/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java
index 756f88f..9978362 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java
@@ -143,11 +143,11 @@ public class IndexSizeTrigger extends TriggerBase {
String belowOpStr = String.valueOf(properties.getOrDefault(BELOW_OP_PROP, CollectionParams.CollectionAction.MERGESHARDS.toLower()));
aboveOp = CollectionParams.CollectionAction.get(aboveOpStr);
if (aboveOp == null) {
- throw new TriggerValidationException(getName(), ABOVE_OP_PROP, "unrecognized value of " + ABOVE_OP_PROP + ": '" + aboveOpStr + "'");
+ throw new TriggerValidationException(getName(), ABOVE_OP_PROP, "unrecognized value of: '" + aboveOpStr + "'");
}
belowOp = CollectionParams.CollectionAction.get(belowOpStr);
if (belowOp == null) {
- throw new TriggerValidationException(getName(), BELOW_OP_PROP, "unrecognized value of " + BELOW_OP_PROP + ": '" + belowOpStr + "'");
+ throw new TriggerValidationException(getName(), BELOW_OP_PROP, "unrecognized value of: '" + belowOpStr + "'");
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0d969ab8/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java
index 02a2d0c..1824f7f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java
@@ -16,17 +16,21 @@
*/
package org.apache.solr.cloud.autoscaling;
+import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.AtomicDouble;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
@@ -34,9 +38,13 @@ import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.util.Pair;
+import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.metrics.SolrCoreMetricManager;
@@ -49,11 +57,43 @@ import org.slf4j.LoggerFactory;
public class SearchRateTrigger extends TriggerBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private String handler;
- private String collection;
+ public static final String COLLECTIONS_PROP = "collections";
+ public static final String METRIC_PROP = "metric";
+ public static final String MAX_OPS_PROP = "maxOps";
+ public static final String MIN_REPLICAS_PROP = "minReplicas";
+ public static final String ABOVE_RATE_PROP = "aboveRate";
+ public static final String BELOW_RATE_PROP = "belowRate";
+ public static final String ABOVE_OP_PROP = "aboveOp";
+ public static final String BELOW_OP_PROP = "belowOp";
+ public static final String ABOVE_NODE_OP_PROP = "aboveNodeOp";
+ public static final String BELOW_NODE_OP_PROP = "belowNodeOp";
+
+ // back-compat
+ public static final String BC_COLLECTION_PROP = "collection";
+ public static final String BC_RATE_PROP = "rate";
+
+
+ public static final String HOT_NODES = "hotNodes";
+ public static final String HOT_COLLECTIONS = "hotCollections";
+ public static final String HOT_SHARDS = "hotShards";
+ public static final String HOT_REPLICAS = "hotReplicas";
+ public static final String COLD_NODES = "coldNodes";
+ public static final String COLD_COLLECTIONS = "coldCollections";
+ public static final String COLD_SHARDS = "coldShards";
+ public static final String COLD_REPLICAS = "coldReplicas";
+
+ public static final int DEFAULT_MAX_OPS = 3;
+ public static final String DEFAULT_METRIC = "QUERY./select.requestTimes:1minRate";
+
+ private String metric;
+ private int maxOps;
+ private Integer minReplicas = null;
+ private final Set<String> collections = new HashSet<>();
private String shard;
private String node;
- private double rate;
+ private double aboveRate;
+ private double belowRate;
+ private CollectionParams.CollectionAction aboveOp, belowOp, aboveNodeOp, belowNodeOp;
private final Map<String, Long> lastCollectionEvent = new ConcurrentHashMap<>();
private final Map<String, Long> lastNodeEvent = new ConcurrentHashMap<>();
private final Map<String, Long> lastShardEvent = new ConcurrentHashMap<>();
@@ -66,29 +106,138 @@ public class SearchRateTrigger extends TriggerBase {
this.state.put("lastNodeEvent", lastNodeEvent);
this.state.put("lastShardEvent", lastShardEvent);
this.state.put("lastReplicaEvent", lastReplicaEvent);
- TriggerUtils.requiredProperties(requiredProperties, validProperties, "rate");
+ TriggerUtils.validProperties(validProperties,
+ COLLECTIONS_PROP, AutoScalingParams.SHARD, AutoScalingParams.NODE,
+ METRIC_PROP,
+ MAX_OPS_PROP,
+ MIN_REPLICAS_PROP,
+ ABOVE_OP_PROP,
+ BELOW_OP_PROP,
+ ABOVE_NODE_OP_PROP,
+ BELOW_NODE_OP_PROP,
+ ABOVE_RATE_PROP,
+ BELOW_RATE_PROP,
+ // back-compat props
+ BC_COLLECTION_PROP,
+ BC_RATE_PROP);
}
@Override
public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, Map<String, Object> properties) throws TriggerValidationException {
super.configure(loader, cloudManager, properties);
// parse config options
- collection = (String)properties.getOrDefault(AutoScalingParams.COLLECTION, Policy.ANY);
+ String collectionsStr = (String)properties.get(COLLECTIONS_PROP);
+ if (collectionsStr != null) {
+ collections.addAll(StrUtils.splitSmart(collectionsStr, ','));
+ }
+ // check back-compat collection prop
+ collectionsStr = (String)properties.get(BC_COLLECTION_PROP);
+ if (collectionsStr != null) {
+ if (!collectionsStr.equals(Policy.ANY)) {
+ collections.add(collectionsStr);
+ }
+ }
shard = (String)properties.getOrDefault(AutoScalingParams.SHARD, Policy.ANY);
- if (collection.equals(Policy.ANY) && !shard.equals(Policy.ANY)) {
- throw new TriggerValidationException("shard", "When 'shard' is other than #ANY then collection name must be also other than #ANY");
+ if (!shard.equals(Policy.ANY) && (collections.isEmpty() || collections.size() > 1)) {
+ throw new TriggerValidationException(name, AutoScalingParams.SHARD, "When 'shard' is other than #ANY then exactly one collection name must be set");
}
node = (String)properties.getOrDefault(AutoScalingParams.NODE, Policy.ANY);
- handler = (String)properties.getOrDefault(AutoScalingParams.HANDLER, "/select");
+ metric = (String)properties.getOrDefault(METRIC_PROP, DEFAULT_METRIC);
+
+ String maxOpsStr = String.valueOf(properties.getOrDefault(MAX_OPS_PROP, DEFAULT_MAX_OPS));
+ try {
+ maxOps = Integer.parseInt(maxOpsStr);
+ } catch (Exception e) {
+ throw new TriggerValidationException(name, MAX_OPS_PROP, "invalid value '" + maxOpsStr + "': " + e.toString());
+ }
+
+ Object o = properties.get(MIN_REPLICAS_PROP);
+ if (o != null) {
+ try {
+ minReplicas = Integer.parseInt(o.toString());
+ if (minReplicas < 1) {
+ throw new Exception("must be at least 1, or not set to use 'replicationFactor'");
+ }
+ } catch (Exception e) {
+ throw new TriggerValidationException(name, MIN_REPLICAS_PROP, "invalid value '" + o + "': " + e.toString());
+ }
+ }
+
+ Object above = properties.get(ABOVE_RATE_PROP);
+ Object below = properties.get(BELOW_RATE_PROP);
+ // back-compat rate prop
+ if (properties.containsKey(BC_RATE_PROP)) {
+ above = properties.get(BC_RATE_PROP);
+ }
+ if (above == null && below == null) {
+ throw new TriggerValidationException(name, ABOVE_RATE_PROP, "at least one of '" +
+ ABOVE_RATE_PROP + "' or '" + BELOW_RATE_PROP + "' must be set");
+ }
+ if (above != null) {
+ try {
+ aboveRate = Double.parseDouble(String.valueOf(above));
+ } catch (Exception e) {
+ throw new TriggerValidationException(name, ABOVE_RATE_PROP, "Invalid configuration value: '" + above + "': " + e.toString());
+ }
+ } else {
+ aboveRate = Double.MAX_VALUE;
+ }
+ if (below != null) {
+ try {
+ belowRate = Double.parseDouble(String.valueOf(below));
+ } catch (Exception e) {
+ throw new TriggerValidationException(name, BELOW_RATE_PROP, "Invalid configuration value: '" + below + "': " + e.toString());
+ }
+ } else {
+ belowRate = -1;
+ }
- String rateString = String.valueOf(properties.get("rate"));
+ String aboveOpStr = String.valueOf(properties.getOrDefault(ABOVE_OP_PROP, CollectionParams.CollectionAction.ADDREPLICA.toLower()));
+ String belowOpStr = String.valueOf(properties.getOrDefault(BELOW_OP_PROP, CollectionParams.CollectionAction.DELETEREPLICA.toLower()));
+ aboveOp = CollectionParams.CollectionAction.get(aboveOpStr);
+ if (aboveOp == null) {
+ throw new TriggerValidationException(getName(), ABOVE_OP_PROP, "unrecognized value: '" + aboveOpStr + "'");
+ }
+ belowOp = CollectionParams.CollectionAction.get(belowOpStr);
+ if (belowOp == null) {
+ throw new TriggerValidationException(getName(), BELOW_OP_PROP, "unrecognized value: '" + belowOpStr + "'");
+ }
+ Object aboveNodeObj = properties.getOrDefault(ABOVE_NODE_OP_PROP, CollectionParams.CollectionAction.MOVEREPLICA.toLower());
+ // do NOT set the default to DELETENODE
+ Object belowNodeObj = properties.get(BELOW_NODE_OP_PROP);
try {
- rate = Double.parseDouble(rateString);
+ aboveNodeOp = CollectionParams.CollectionAction.get(String.valueOf(aboveNodeObj));
} catch (Exception e) {
- throw new TriggerValidationException(name, "rate", "Invalid 'rate' configuration value: '" + rateString + "': " + e.toString());
+ throw new TriggerValidationException(getName(), ABOVE_NODE_OP_PROP, "unrecognized value: '" + aboveNodeObj + "'");
+ }
+ if (belowNodeObj != null) {
+ try {
+ belowNodeOp = CollectionParams.CollectionAction.get(String.valueOf(belowNodeObj));
+ } catch (Exception e) {
+ throw new TriggerValidationException(getName(), BELOW_NODE_OP_PROP, "unrecognized value: '" + belowNodeObj + "'");
+ }
}
}
+ @VisibleForTesting
+ Map<String, Object> getConfig() {
+ Map<String, Object> config = new HashMap<>();
+ config.put("name", name);
+ config.put(COLLECTIONS_PROP, collections);
+ config.put(AutoScalingParams.SHARD, shard);
+ config.put(AutoScalingParams.NODE, node);
+ config.put(METRIC_PROP, metric);
+ config.put(MAX_OPS_PROP, maxOps);
+ config.put(MIN_REPLICAS_PROP, minReplicas);
+ config.put(ABOVE_RATE_PROP, aboveRate);
+ config.put(BELOW_RATE_PROP, belowRate);
+ config.put(ABOVE_OP_PROP, aboveOp);
+ config.put(ABOVE_NODE_OP_PROP, aboveNodeOp);
+ config.put(BELOW_OP_PROP, belowOp);
+ config.put(BELOW_NODE_OP_PROP, belowNodeOp);
+ return config;
+ }
+
@Override
protected Map<String, Object> getState() {
return state;
@@ -146,26 +295,42 @@ public class SearchRateTrigger extends TriggerBase {
return;
}
+ // collection, shard, list(replica + rate)
Map<String, Map<String, List<ReplicaInfo>>> collectionRates = new HashMap<>();
+ // node, rate
Map<String, AtomicDouble> nodeRates = new HashMap<>();
- Map<String, Integer> replicationFactors = new HashMap<>();
+ // this replication factor only considers replica types that are searchable
+ // collection, shard, RF
+ Map<String, Map<String, AtomicInteger>> searchableReplicationFactors = new HashMap<>();
+ ClusterState clusterState = null;
+ try {
+ clusterState = cloudManager.getClusterStateProvider().getClusterState();
+ } catch (IOException e) {
+ log.warn("Error getting ClusterState", e);
+ return;
+ }
for (String node : cloudManager.getClusterStateProvider().getLiveNodes()) {
Map<String, ReplicaInfo> metricTags = new HashMap<>();
// coll, shard, replica
Map<String, Map<String, List<ReplicaInfo>>> infos = cloudManager.getNodeStateProvider().getReplicaInfo(node, Collections.emptyList());
infos.forEach((coll, shards) -> {
- replicationFactors.computeIfAbsent(coll, c -> shards.size());
+ Map<String, AtomicInteger> replPerShard = searchableReplicationFactors.computeIfAbsent(coll, c -> new HashMap<>());
shards.forEach((sh, replicas) -> {
+ AtomicInteger repl = replPerShard.computeIfAbsent(sh, s -> new AtomicInteger());
replicas.forEach(replica -> {
+ // skip non-active replicas
+ if (replica.getState() != Replica.State.ACTIVE) {
+ return;
+ }
+ repl.incrementAndGet();
// we have to translate to the metrics registry name, which uses "_replica_nN" as suffix
String replicaName = Utils.parseMetricsReplicaName(coll, replica.getCore());
if (replicaName == null) { // should never happen???
replicaName = replica.getName(); // which is actually coreNode name...
}
String registry = SolrCoreMetricManager.createRegistryName(true, coll, sh, replicaName, null);
- String tag = "metrics:" + registry
- + ":QUERY." + handler + ".requestTimes:1minRate";
+ String tag = "metrics:" + registry + ":" + metric;
metricTags.put(tag, replica);
});
});
@@ -191,48 +356,100 @@ public class SearchRateTrigger extends TriggerBase {
}
long now = cloudManager.getTimeSource().getTimeNs();
+ Map<String, Double> hotNodes = new HashMap<>();
+ Map<String, Double> coldNodes = new HashMap<>();
// check for exceeded rates and filter out those with less than waitFor from previous events
- Map<String, Double> hotNodes = nodeRates.entrySet().stream()
+ nodeRates.entrySet().stream()
.filter(entry -> node.equals(Policy.ANY) || node.equals(entry.getKey()))
- .filter(entry -> waitForElapsed(entry.getKey(), now, lastNodeEvent))
- .filter(entry -> entry.getValue().get() > rate)
- .collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue().get()));
+ .forEach(entry -> {
+ if (entry.getValue().get() > aboveRate) {
+ if (waitForElapsed(entry.getKey(), now, lastNodeEvent)) {
+ hotNodes.put(entry.getKey(), entry.getValue().get());
+ }
+ } else if (entry.getValue().get() < belowRate) {
+ if (waitForElapsed(entry.getKey(), now, lastNodeEvent)) {
+ coldNodes.put(entry.getKey(), entry.getValue().get());
+ }
+ } else {
+ // no violation - clear waitForElapsed
+ // (violation is only valid if it persists throughout waitFor)
+ lastNodeEvent.remove(entry.getKey());
+ }
+ });
Map<String, Map<String, Double>> hotShards = new HashMap<>();
+ Map<String, Map<String, Double>> coldShards = new HashMap<>();
List<ReplicaInfo> hotReplicas = new ArrayList<>();
+ List<ReplicaInfo> coldReplicas = new ArrayList<>();
collectionRates.forEach((coll, shardRates) -> {
shardRates.forEach((sh, replicaRates) -> {
double shardRate = replicaRates.stream()
.map(r -> {
- if (waitForElapsed(r.getCollection() + "." + r.getCore(), now, lastReplicaEvent) &&
- ((Double)r.getVariable(AutoScalingParams.RATE) > rate)) {
- hotReplicas.add(r);
+ String elapsedKey = r.getCollection() + "." + r.getCore();
+ if ((Double)r.getVariable(AutoScalingParams.RATE) > aboveRate) {
+ if (waitForElapsed(elapsedKey, now, lastReplicaEvent)) {
+ hotReplicas.add(r);
+ }
+ } else if ((Double)r.getVariable(AutoScalingParams.RATE) < belowRate) {
+ if (waitForElapsed(elapsedKey, now, lastReplicaEvent)) {
+ coldReplicas.add(r);
+ }
+ } else {
+ // no violation - clear waitForElapsed
+ lastReplicaEvent.remove(elapsedKey);
}
return r;
})
.mapToDouble(r -> (Double)r.getVariable(AutoScalingParams.RATE)).sum();
- if (waitForElapsed(coll + "." + sh, now, lastShardEvent) &&
- (shardRate > rate) &&
- (collection.equals(Policy.ANY) || collection.equals(coll)) &&
+ String elapsedKey = coll + "." + sh;
+ if ((collections.isEmpty() || collections.contains(coll)) &&
(shard.equals(Policy.ANY) || shard.equals(sh))) {
- hotShards.computeIfAbsent(coll, s -> new HashMap<>()).put(sh, shardRate);
+ if (shardRate > aboveRate) {
+ if (waitForElapsed(elapsedKey, now, lastShardEvent)) {
+ hotShards.computeIfAbsent(coll, s -> new HashMap<>()).put(sh, shardRate);
+ }
+ } else if (shardRate < belowRate) {
+ if (waitForElapsed(elapsedKey, now, lastShardEvent)) {
+ coldShards.computeIfAbsent(coll, s -> new HashMap<>()).put(sh, shardRate);
+ }
+ } else {
+ // no violation - clear waitForElapsed
+ lastShardEvent.remove(elapsedKey);
+ }
}
});
});
Map<String, Double> hotCollections = new HashMap<>();
+ Map<String, Double> coldCollections = new HashMap<>();
collectionRates.forEach((coll, shardRates) -> {
double total = shardRates.entrySet().stream()
.mapToDouble(e -> e.getValue().stream()
.mapToDouble(r -> (Double)r.getVariable(AutoScalingParams.RATE)).sum()).sum();
- if (waitForElapsed(coll, now, lastCollectionEvent) &&
- (total > rate) &&
- (collection.equals(Policy.ANY) || collection.equals(coll))) {
- hotCollections.put(coll, total);
+ if (collections.isEmpty() || collections.contains(coll)) {
+ if (total > aboveRate) {
+ if (waitForElapsed(coll, now, lastCollectionEvent)) {
+ hotCollections.put(coll, total);
+ }
+ } else if (total < belowRate) {
+ if (waitForElapsed(coll, now, lastCollectionEvent)) {
+ coldCollections.put(coll, total);
+ }
+ } else {
+ // no violation - clear waitForElapsed
+ lastCollectionEvent.remove(coll);
+ }
}
});
- if (hotCollections.isEmpty() && hotShards.isEmpty() && hotReplicas.isEmpty() && hotNodes.isEmpty()) {
+ if (hotCollections.isEmpty() &&
+ hotShards.isEmpty() &&
+ hotReplicas.isEmpty() &&
+ hotNodes.isEmpty() &&
+ coldCollections.isEmpty() &&
+ coldShards.isEmpty() &&
+ coldReplicas.isEmpty() &&
+ coldNodes.isEmpty()) {
return;
}
@@ -246,6 +463,12 @@ public class SearchRateTrigger extends TriggerBase {
eventTime.set(time);
}
});
+ coldCollections.forEach((c, r) -> {
+ long time = lastCollectionEvent.get(c);
+ if (eventTime.get() > time) {
+ eventTime.set(time);
+ }
+ });
hotShards.forEach((c, shards) -> {
shards.forEach((s, r) -> {
long time = lastShardEvent.get(c + "." + s);
@@ -254,27 +477,83 @@ public class SearchRateTrigger extends TriggerBase {
}
});
});
+ coldShards.forEach((c, shards) -> {
+ shards.forEach((s, r) -> {
+ long time = lastShardEvent.get(c + "." + s);
+ if (eventTime.get() > time) {
+ eventTime.set(time);
+ }
+ });
+ });
hotReplicas.forEach(r -> {
long time = lastReplicaEvent.get(r.getCollection() + "." + r.getCore());
if (eventTime.get() > time) {
eventTime.set(time);
}
});
+ coldReplicas.forEach(r -> {
+ long time = lastReplicaEvent.get(r.getCollection() + "." + r.getCore());
+ if (eventTime.get() > time) {
+ eventTime.set(time);
+ }
+ });
hotNodes.forEach((n, r) -> {
long time = lastNodeEvent.get(n);
if (eventTime.get() > time) {
eventTime.set(time);
}
});
+ coldNodes.forEach((n, r) -> {
+ long time = lastNodeEvent.get(n);
+ if (eventTime.get() > time) {
+ eventTime.set(time);
+ }
+ });
+
+ final List<TriggerEvent.Op> ops = new ArrayList<>();
+
+ calculateHotOps(ops, searchableReplicationFactors, hotNodes, hotCollections, hotShards, hotReplicas);
+ calculateColdOps(ops, clusterState, searchableReplicationFactors, coldNodes, coldCollections, coldShards, coldReplicas);
+
+ if (ops.isEmpty()) {
+ return;
+ }
+ if (processor.process(new SearchRateEvent(getName(), eventTime.get(), ops,
+ hotNodes, hotCollections, hotShards, hotReplicas,
+ coldNodes, coldCollections, coldShards, coldReplicas))) {
+ // update lastEvent times
+ hotNodes.keySet().forEach(node -> lastNodeEvent.put(node, now));
+ coldNodes.keySet().forEach(node -> lastNodeEvent.put(node, now));
+ hotCollections.keySet().forEach(coll -> lastCollectionEvent.put(coll, now));
+ coldCollections.keySet().forEach(coll -> lastCollectionEvent.put(coll, now));
+ hotShards.entrySet().forEach(e -> e.getValue()
+ .forEach((sh, rate) -> lastShardEvent.put(e.getKey() + "." + sh, now)));
+ coldShards.entrySet().forEach(e -> e.getValue()
+ .forEach((sh, rate) -> lastShardEvent.put(e.getKey() + "." + sh, now)));
+ hotReplicas.forEach(r -> lastReplicaEvent.put(r.getCollection() + "." + r.getCore(), now));
+ coldReplicas.forEach(r -> lastReplicaEvent.put(r.getCollection() + "." + r.getCore(), now));
+ }
+ }
+
+ private void calculateHotOps(List<TriggerEvent.Op> ops,
+ Map<String, Map<String, AtomicInteger>> searchableReplicationFactors,
+ Map<String, Double> hotNodes,
+ Map<String, Double> hotCollections,
+ Map<String, Map<String, Double>> hotShards,
+ List<ReplicaInfo> hotReplicas) {
// calculate the number of replicas to add to each hot shard, based on how much the rate was
// exceeded - but within limits.
- final List<TriggerEvent.Op> ops = new ArrayList<>();
- if (hotShards.isEmpty() && hotCollections.isEmpty() && hotReplicas.isEmpty()) {
+
+ // first resolve a situation when only a node is hot but no collection / shard / replica is hot
+ // TODO: eventually we may want to commission a new node
+ if (!hotNodes.isEmpty() && hotShards.isEmpty() && hotCollections.isEmpty() && hotReplicas.isEmpty()) {
// move replicas around
- hotNodes.forEach((n, r) -> {
- ops.add(new TriggerEvent.Op(CollectionParams.CollectionAction.MOVEREPLICA, Suggester.Hint.SRC_NODE, n));
- });
+ if (aboveNodeOp != null) {
+ hotNodes.forEach((n, r) -> {
+ ops.add(new TriggerEvent.Op(aboveNodeOp, Suggester.Hint.SRC_NODE, n));
+ });
+ }
} else {
// add replicas
Map<String, Map<String, List<Pair<String, String>>>> hints = new HashMap<>();
@@ -283,7 +562,7 @@ public class SearchRateTrigger extends TriggerBase {
List<Pair<String, String>> perShard = hints
.computeIfAbsent(coll, c -> new HashMap<>())
.computeIfAbsent(s, sh -> new ArrayList<>());
- addHints(coll, s, r, replicationFactors.get(coll), perShard);
+ addReplicaHints(coll, s, r, searchableReplicationFactors.get(coll).get(s).get(), perShard);
}));
hotReplicas.forEach(ri -> {
double r = (Double)ri.getVariable(AutoScalingParams.RATE);
@@ -292,38 +571,120 @@ public class SearchRateTrigger extends TriggerBase {
.computeIfAbsent(ri.getCollection(), c -> new HashMap<>())
.computeIfAbsent(ri.getShard(), sh -> new ArrayList<>());
if (perShard.isEmpty()) {
- addHints(ri.getCollection(), ri.getShard(), r, replicationFactors.get(ri.getCollection()), perShard);
+ addReplicaHints(ri.getCollection(), ri.getShard(), r, searchableReplicationFactors.get(ri.getCollection()).get(ri.getShard()).get(), perShard);
}
});
hints.values().forEach(m -> m.values().forEach(lst -> lst.forEach(p -> {
- ops.add(new TriggerEvent.Op(CollectionParams.CollectionAction.ADDREPLICA, Suggester.Hint.COLL_SHARD, p));
+ ops.add(new TriggerEvent.Op(aboveOp, Suggester.Hint.COLL_SHARD, p));
})));
}
- if (processor.process(new SearchRateEvent(getName(), eventTime.get(), ops, hotNodes, hotCollections, hotShards, hotReplicas))) {
- // update lastEvent times
- hotNodes.keySet().forEach(node -> lastNodeEvent.put(node, now));
- hotCollections.keySet().forEach(coll -> lastCollectionEvent.put(coll, now));
- hotShards.entrySet().forEach(e -> e.getValue()
- .forEach((sh, rate) -> lastShardEvent.put(e.getKey() + "." + sh, now)));
- hotReplicas.forEach(r -> lastReplicaEvent.put(r.getCollection() + "." + r.getCore(), now));
- }
}
- private void addHints(String collection, String shard, double r, int replicationFactor, List<Pair<String, String>> hints) {
- int numReplicas = (int)Math.round((r - rate) / (double) replicationFactor);
+ /**
+ * This method implements a primitive form of proportional controller with a limiter.
+ */
+ private void addReplicaHints(String collection, String shard, double r, int replicationFactor, List<Pair<String, String>> hints) {
+ int numReplicas = (int)Math.round((r - aboveRate) / (double) replicationFactor);
+ // in one event add at least 1 replica
if (numReplicas < 1) {
numReplicas = 1;
}
- if (numReplicas > 3) {
- numReplicas = 3;
+ // ... and at most maxOps replicas
+ if (numReplicas > maxOps) {
+ numReplicas = maxOps;
}
for (int i = 0; i < numReplicas; i++) {
hints.add(new Pair(collection, shard));
}
}
+ private void calculateColdOps(List<TriggerEvent.Op> ops,
+ ClusterState clusterState,
+ Map<String, Map<String, AtomicInteger>> searchableReplicationFactors,
+ Map<String, Double> coldNodes,
+ Map<String, Double> coldCollections,
+ Map<String, Map<String, Double>> coldShards,
+ List<ReplicaInfo> coldReplicas) {
+ // COLD COLLECTIONS
+ // Probably can't do anything reasonable about whole cold collections
+ // because they may be needed even if not used.
+
+ // COLD SHARDS:
+ // Cold shards mean that there are too many replicas per shard - but it also
+ // means that all replicas in these shards are cold too, so we can simply
+ // address this by deleting cold replicas
+
+ // COLD REPLICAS:
+ // Remove cold replicas but only when there's at least a minimum number of searchable
+ // replicas still available (additional non-searchable replicas may exist, too)
+ // NOTE: do this before adding ops for DELETENODE because we don't want to attempt
+ // deleting replicas that have been already moved elsewhere
+ Map<String, Map<String, List<ReplicaInfo>>> byCollectionByShard = new HashMap<>();
+ coldReplicas.forEach(ri -> {
+ byCollectionByShard.computeIfAbsent(ri.getCollection(), c -> new HashMap<>())
+ .computeIfAbsent(ri.getShard(), s -> new ArrayList<>())
+ .add(ri);
+ });
+ byCollectionByShard.forEach((coll, shards) -> {
+ shards.forEach((shard, replicas) -> {
+ // only delete if there's at least minRF searchable replicas left
+ int rf = searchableReplicationFactors.get(coll).get(shard).get();
+ // we only really need a leader and we may be allowed to remove other replicas
+ int minRF = 1;
+ // but check the official RF and don't go below that
+ Integer RF = clusterState.getCollection(coll).getReplicationFactor();
+ if (RF != null) {
+ minRF = RF;
+ }
+ // unless minReplicas is set explicitly
+ if (minReplicas != null) {
+ minRF = minReplicas;
+ }
+ if (minRF < 1) {
+ minRF = 1;
+ }
+ if (rf > minRF) {
+ // delete at most maxOps replicas at a time
+ AtomicInteger limit = new AtomicInteger(Math.min(maxOps, rf - minRF));
+ replicas.forEach(ri -> {
+ if (limit.get() == 0) {
+ return;
+ }
+ // don't delete a leader
+ if (ri.getBool(ZkStateReader.LEADER_PROP, false)) {
+ return;
+ }
+ TriggerEvent.Op op = new TriggerEvent.Op(belowOp,
+ Suggester.Hint.COLL_SHARD, new Pair<>(ri.getCollection(), ri.getShard()));
+ op.addHint(Suggester.Hint.REPLICA, ri.getName());
+ ops.add(op);
+ limit.decrementAndGet();
+ });
+ }
+ });
+ });
+
+ // COLD NODES:
+ // Unlike the case of hot nodes, if a node is cold then any monitored
+ // collections / shards / replicas located on that node are cold, too.
+ // HOWEVER, we check only non-pull replicas and only from selected collections / shards,
+ // so deleting a cold node is dangerous because it may interfere with these
+ // non-monitored resources - this is the reason the default belowNodeOp is null / ignored.
+ //
+ // Also, note that due to the way activity is measured only nodes that contain any
+ // monitored resources are considered - there may be cold nodes in the cluster that don't
+ // belong to the monitored collections and they will be ignored.
+ if (belowNodeOp != null) {
+ coldNodes.forEach((node, rate) -> {
+ ops.add(new TriggerEvent.Op(belowNodeOp, Suggester.Hint.SRC_NODE, node));
+ });
+ }
+
+
+ }
+
private boolean waitForElapsed(String name, long now, Map<String, Long> lastEventMap) {
Long lastTime = lastEventMap.computeIfAbsent(name, s -> now);
long elapsed = TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS);
@@ -335,15 +696,25 @@ public class SearchRateTrigger extends TriggerBase {
}
public static class SearchRateEvent extends TriggerEvent {
- public SearchRateEvent(String source, long eventTime, List<Op> ops, Map<String, Double> hotNodes,
+ public SearchRateEvent(String source, long eventTime, List<Op> ops,
+ Map<String, Double> hotNodes,
Map<String, Double> hotCollections,
- Map<String, Map<String, Double>> hotShards, List<ReplicaInfo> hotReplicas) {
+ Map<String, Map<String, Double>> hotShards,
+ List<ReplicaInfo> hotReplicas,
+ Map<String, Double> coldNodes,
+ Map<String, Double> coldCollections,
+ Map<String, Map<String, Double>> coldShards,
+ List<ReplicaInfo> coldReplicas) {
super(TriggerEventType.SEARCHRATE, source, eventTime, null);
properties.put(TriggerEvent.REQUESTED_OPS, ops);
- properties.put(AutoScalingParams.COLLECTION, hotCollections);
- properties.put(AutoScalingParams.SHARD, hotShards);
- properties.put(AutoScalingParams.REPLICA, hotReplicas);
- properties.put(AutoScalingParams.NODE, hotNodes);
+ properties.put(HOT_NODES, hotNodes);
+ properties.put(HOT_COLLECTIONS, hotCollections);
+ properties.put(HOT_SHARDS, hotShards);
+ properties.put(HOT_REPLICAS, hotReplicas);
+ properties.put(COLD_NODES, coldNodes);
+ properties.put(COLD_COLLECTIONS, coldCollections);
+ properties.put(COLD_SHARDS, coldShards);
+ properties.put(COLD_REPLICAS, coldReplicas);
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0d969ab8/solr/core/src/test/org/apache/solr/cloud/CloudTestUtils.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/CloudTestUtils.java b/solr/core/src/test/org/apache/solr/cloud/CloudTestUtils.java
index 5590252..768cd91 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CloudTestUtils.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CloudTestUtils.java
@@ -88,9 +88,11 @@ public class CloudTestUtils {
final CollectionStatePredicate predicate) throws InterruptedException, TimeoutException, IOException {
TimeOut timeout = new TimeOut(wait, unit, cloudManager.getTimeSource());
long timeWarn = timeout.timeLeft(TimeUnit.MILLISECONDS) / 4;
+ ClusterState state = null;
+ DocCollection coll = null;
while (!timeout.hasTimedOut()) {
- ClusterState state = cloudManager.getClusterStateProvider().getClusterState();
- DocCollection coll = state.getCollectionOrNull(collection);
+ state = cloudManager.getClusterStateProvider().getClusterState();
+ coll = state.getCollectionOrNull(collection);
// due to the way we manage collections in SimClusterStateProvider a null here
// can mean that a collection is still being created but has no replicas
if (coll == null) { // does not yet exist?
@@ -106,7 +108,7 @@ public class CloudTestUtils {
log.trace("-- still not matching predicate: {}", state);
}
}
- throw new TimeoutException();
+ throw new TimeoutException("last state: " + coll);
}
/**
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0d969ab8/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoScalingHandlerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoScalingHandlerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoScalingHandlerTest.java
index b98ee70..2aec88e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoScalingHandlerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoScalingHandlerTest.java
@@ -475,7 +475,7 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
"'event' : 'searchRate'," +
"'waitFor' : '10m'," +
"'enabled' : true," +
- "'rate': 'foo'," +
+ "'aboveRate': 'foo'," +
"'actions' : [" +
"{" +
"'name' : 'compute_plan'," +
@@ -489,7 +489,7 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
} catch (HttpSolrClient.RemoteSolrException e) {
// expected
assertTrue(String.valueOf(getObjectByPath(((HttpSolrClient.RemoteExecutionException) e).getMetaData(),
- false, "error/details[0]/errorMessages[0]")).contains("rate=Invalid 'rate' configuration value: 'foo'"));
+ false, "error/details[0]/errorMessages[0]")).contains("aboveRate=Invalid configuration value: 'foo'"));
}
// unknown trigger action properties
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0d969ab8/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerIntegrationTest.java
index 796670a..098d0a5 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerIntegrationTest.java
@@ -22,10 +22,9 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.util.concurrent.AtomicDouble;
import org.apache.lucene.util.LuceneTestCase;
@@ -37,12 +36,20 @@ import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.CloudTestUtils;
import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.util.LogLevel;
+import org.apache.zookeeper.data.Stat;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
@@ -51,20 +58,23 @@ import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.WAIT_FOR_DELTA_NANOS;
import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.timeSource;
+import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
/**
* Integration test for {@link SearchRateTrigger}
*/
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
-@LuceneTestCase.BadApple(bugUrl = "https://issues.apache.org/jira/browse/SOLR-12028")
+@LuceneTestCase.Slow
public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private static CountDownLatch triggerFiredLatch = new CountDownLatch(1);
private static CountDownLatch listenerCreated = new CountDownLatch(1);
- private static int waitForSeconds = 1;
- private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
private static Map<String, List<CapturedEvent>> listenerEvents = new HashMap<>();
+ private static CountDownLatch finished = new CountDownLatch(1);
+ private static CountDownLatch started = new CountDownLatch(1);
+ private static SolrCloudManager cloudManager;
+
+ private int waitForSeconds;
@BeforeClass
public static void setupCluster() throws Exception {
@@ -79,74 +89,156 @@ public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
SolrClient solrClient = cluster.getSolrClient();
NamedList<Object> response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
+ cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
+ }
+
+ @Before
+ public void beforeTest() throws Exception {
+ cluster.deleteAllCollections();
+ // clear any persisted auto scaling configuration
+ Stat stat = zkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), true);
+ log.info(SOLR_AUTOSCALING_CONF_PATH + " reset, new znode version {}", stat.getVersion());
+ timeSource.sleep(5000);
+ deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH);
+ deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH);
+ deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
+ deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
+
+ finished = new CountDownLatch(1);
+ started = new CountDownLatch(1);
+ listenerEvents = new HashMap<>();
+ waitForSeconds = 3 + random().nextInt(5);
+ }
+
+ private void deleteChildrenRecursively(String path) throws Exception {
+ cloudManager.getDistribStateManager().removeRecursively(path, true, false);
}
@Test
- public void testSearchRate() throws Exception {
+ public void testAboveSearchRate() throws Exception {
CloudSolrClient solrClient = cluster.getSolrClient();
- String COLL1 = "collection1";
+ String COLL1 = "aboveRate_collection";
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
"conf", 1, 2);
create.process(solrClient);
+
+ CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS,
+ CloudTestUtils.clusterShape(1, 2));
+
+ // the trigger is initially disabled so that we have the time to set up listeners
+ // and generate the traffic
String setTriggerCommand = "{" +
"'set-trigger' : {" +
- "'name' : 'search_rate_trigger'," +
+ "'name' : 'search_rate_trigger1'," +
"'event' : 'searchRate'," +
"'waitFor' : '" + waitForSeconds + "s'," +
- "'enabled' : true," +
- "'rate' : 1.0," +
+ "'enabled' : false," +
+ "'collections' : '" + COLL1 + "'," +
+ "'aboveRate' : 1.0," +
+ "'belowRate' : 0.1," +
"'actions' : [" +
"{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
- "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
- "{'name':'test','class':'" + TestSearchRateAction.class.getName() + "'}" +
+ "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}" +
"]" +
"}}";
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
NamedList<Object> response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
- String setListenerCommand1 = "{" +
+ String setListenerCommand = "{" +
+ "'set-listener' : " +
+ "{" +
+ "'name' : 'started'," +
+ "'trigger' : 'search_rate_trigger1'," +
+ "'stage' : ['STARTED']," +
+ "'class' : '" + StartedProcessingListener.class.getName() + "'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ setListenerCommand = "{" +
"'set-listener' : " +
"{" +
"'name' : 'srt'," +
- "'trigger' : 'search_rate_trigger'," +
+ "'trigger' : 'search_rate_trigger1'," +
"'stage' : ['FAILED','SUCCEEDED']," +
- "'afterAction': ['compute', 'execute', 'test']," +
- "'class' : '" + TestTriggerListener.class.getName() + "'" +
+ "'afterAction': ['compute', 'execute']," +
+ "'class' : '" + CapturingTriggerListener.class.getName() + "'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ setListenerCommand = "{" +
+ "'set-listener' : " +
+ "{" +
+ "'name' : 'finished'," +
+ "'trigger' : 'search_rate_trigger1'," +
+ "'stage' : ['SUCCEEDED']," +
+ "'class' : '" + FinishedProcessingListener.class.getName() + "'" +
"}" +
"}";
- req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand1);
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
+
SolrParams query = params(CommonParams.Q, "*:*");
for (int i = 0; i < 500; i++) {
solrClient.query(COLL1, query);
}
- boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+
+ // enable the trigger
+ String resumeTriggerCommand = "{" +
+ "'resume-trigger' : {" +
+ "'name' : 'search_rate_trigger1'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, resumeTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
+
+ boolean await = started.await(20, TimeUnit.SECONDS);
assertTrue("The trigger did not fire at all", await);
- // wait for listener to capture the SUCCEEDED stage
- Thread.sleep(5000);
+
+ await = finished.await(60, TimeUnit.SECONDS);
+ assertTrue("The trigger did not finish processing", await);
+
+ // suspend the trigger
+ String suspendTriggerCommand = "{" +
+ "'suspend-trigger' : {" +
+ "'name' : 'search_rate_trigger1'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ timeSource.sleep(5000);
+
List<CapturedEvent> events = listenerEvents.get("srt");
- assertEquals(listenerEvents.toString(), 4, events.size());
+ assertEquals(listenerEvents.toString(), 3, events.size());
assertEquals("AFTER_ACTION", events.get(0).stage.toString());
assertEquals("compute", events.get(0).actionName);
assertEquals("AFTER_ACTION", events.get(1).stage.toString());
assertEquals("execute", events.get(1).actionName);
- assertEquals("AFTER_ACTION", events.get(2).stage.toString());
- assertEquals("test", events.get(2).actionName);
- assertEquals("SUCCEEDED", events.get(3).stage.toString());
- assertNull(events.get(3).actionName);
+ assertEquals("SUCCEEDED", events.get(2).stage.toString());
+ assertNull(events.get(2).actionName);
CapturedEvent ev = events.get(0);
long now = timeSource.getTimeNs();
// verify waitFor
assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
- Map<String, Double> nodeRates = (Map<String, Double>) ev.event.getProperties().get("node");
+ Map<String, Double> nodeRates = (Map<String, Double>) ev.event.getProperties().get(SearchRateTrigger.HOT_NODES);
assertNotNull("nodeRates", nodeRates);
assertTrue(nodeRates.toString(), nodeRates.size() > 0);
AtomicDouble totalNodeRate = new AtomicDouble();
nodeRates.forEach((n, r) -> totalNodeRate.addAndGet(r));
- List<ReplicaInfo> replicaRates = (List<ReplicaInfo>) ev.event.getProperties().get("replica");
+ List<ReplicaInfo> replicaRates = (List<ReplicaInfo>) ev.event.getProperties().get(SearchRateTrigger.HOT_REPLICAS);
assertNotNull("replicaRates", replicaRates);
assertTrue(replicaRates.toString(), replicaRates.size() > 0);
AtomicDouble totalReplicaRate = new AtomicDouble();
@@ -154,7 +246,7 @@ public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
assertTrue(r.toString(), r.getVariable("rate") != null);
totalReplicaRate.addAndGet((Double) r.getVariable("rate"));
});
- Map<String, Object> shardRates = (Map<String, Object>) ev.event.getProperties().get("shard");
+ Map<String, Object> shardRates = (Map<String, Object>) ev.event.getProperties().get(SearchRateTrigger.HOT_SHARDS);
assertNotNull("shardRates", shardRates);
assertEquals(shardRates.toString(), 1, shardRates.size());
shardRates = (Map<String, Object>) shardRates.get(COLL1);
@@ -162,7 +254,7 @@ public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
assertEquals(shardRates.toString(), 1, shardRates.size());
AtomicDouble totalShardRate = new AtomicDouble();
shardRates.forEach((s, r) -> totalShardRate.addAndGet((Double) r));
- Map<String, Double> collectionRates = (Map<String, Double>) ev.event.getProperties().get("collection");
+ Map<String, Double> collectionRates = (Map<String, Double>) ev.event.getProperties().get(SearchRateTrigger.HOT_COLLECTIONS);
assertNotNull("collectionRates", collectionRates);
assertEquals(collectionRates.toString(), 1, collectionRates.size());
Double collectionRate = collectionRates.get(COLL1);
@@ -181,27 +273,414 @@ public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
}
}
- public static class TestSearchRateAction extends TriggerActionBase {
+ @Test
+ public void testBelowSearchRate() throws Exception {
+ CloudSolrClient solrClient = cluster.getSolrClient();
+ String COLL1 = "belowRate_collection";
+ // replicationFactor == 2
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
+ "conf", 1, 2);
+ create.process(solrClient);
+ CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS,
+ CloudTestUtils.clusterShape(1, 2));
+
+ // add a couple of spare replicas above RF. Use different types.
+ // these additional replicas will be placed on other nodes in the cluster
+ solrClient.request(CollectionAdminRequest.addReplicaToShard(COLL1, "shard1", Replica.Type.NRT));
+ solrClient.request(CollectionAdminRequest.addReplicaToShard(COLL1, "shard1", Replica.Type.TLOG));
+ solrClient.request(CollectionAdminRequest.addReplicaToShard(COLL1, "shard1", Replica.Type.PULL));
- @Override
- public void process(TriggerEvent event, ActionContext context) throws Exception {
- try {
- events.add(event);
- long currentTimeNanos = timeSource.getTimeNs();
- long eventTimeNanos = event.getEventTime();
- long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
- if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
- fail(event.source + " was fired before the configured waitFor period");
+ CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS,
+ CloudTestUtils.clusterShape(1, 5));
+
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'search_rate_trigger2'," +
+ "'event' : 'searchRate'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : false," +
+ "'collections' : '" + COLL1 + "'," +
+ "'aboveRate' : 1.0," +
+ "'belowRate' : 0.1," +
+ // do nothing but generate an op
+ "'belowNodeOp' : 'none'," +
+ "'actions' : [" +
+ "{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
+ "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}" +
+ "]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ String setListenerCommand = "{" +
+ "'set-listener' : " +
+ "{" +
+ "'name' : 'started'," +
+ "'trigger' : 'search_rate_trigger2'," +
+ "'stage' : ['STARTED']," +
+ "'class' : '" + StartedProcessingListener.class.getName() + "'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ setListenerCommand = "{" +
+ "'set-listener' : " +
+ "{" +
+ "'name' : 'srt'," +
+ "'trigger' : 'search_rate_trigger2'," +
+ "'stage' : ['FAILED','SUCCEEDED']," +
+ "'afterAction': ['compute', 'execute']," +
+ "'class' : '" + CapturingTriggerListener.class.getName() + "'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ setListenerCommand = "{" +
+ "'set-listener' : " +
+ "{" +
+ "'name' : 'finished'," +
+ "'trigger' : 'search_rate_trigger2'," +
+ "'stage' : ['SUCCEEDED']," +
+ "'class' : '" + FinishedProcessingListener.class.getName() + "'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
+
+ // enable the trigger
+ String resumeTriggerCommand = "{" +
+ "'resume-trigger' : {" +
+ "'name' : 'search_rate_trigger2'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, resumeTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
+
+ boolean await = started.await(20, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ await = finished.await(60, TimeUnit.SECONDS);
+ assertTrue("The trigger did not finish processing", await);
+
+ // suspend the trigger
+ String suspendTriggerCommand = "{" +
+ "'suspend-trigger' : {" +
+ "'name' : 'search_rate_trigger2'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ timeSource.sleep(5000);
+
+ List<CapturedEvent> events = listenerEvents.get("srt");
+ assertEquals(events.toString(), 3, events.size());
+ CapturedEvent ev = events.get(0);
+ assertEquals(ev.toString(), "compute", ev.actionName);
+ List<TriggerEvent.Op> ops = (List<TriggerEvent.Op>)ev.event.getProperty(TriggerEvent.REQUESTED_OPS);
+ assertNotNull("there should be some requestedOps: " + ev.toString(), ops);
+ // 4 cold nodes, 3 cold replicas
+ assertEquals(ops.toString(), 7, ops.size());
+ AtomicInteger coldNodes = new AtomicInteger();
+ AtomicInteger coldReplicas = new AtomicInteger();
+ ops.forEach(op -> {
+ if (op.getAction().equals(CollectionParams.CollectionAction.NONE)) {
+ coldNodes.incrementAndGet();
+ } else if (op.getAction().equals(CollectionParams.CollectionAction.DELETEREPLICA)) {
+ coldReplicas.incrementAndGet();
+ } else {
+ fail("unexpected op: " + op);
+ }
+ });
+ assertEquals("cold nodes", 4, coldNodes.get());
+ assertEquals("cold replicas", 3, coldReplicas.get());
+
+ // now the collection should be down to RF = 2
+ CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS,
+ CloudTestUtils.clusterShape(1, 2));
+
+ listenerEvents.clear();
+ finished = new CountDownLatch(1);
+ started = new CountDownLatch(1);
+
+ // resume trigger
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, resumeTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ // there should be only coldNode ops now, and no coldReplica ops since searchable RF == collection RF
+ timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
+
+ await = started.await(20, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ await = finished.await(60, TimeUnit.SECONDS);
+ assertTrue("The trigger did not finish processing", await);
+
+ // suspend trigger
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ timeSource.sleep(5000);
+
+ events = listenerEvents.get("srt");
+ assertEquals(events.toString(), 3, events.size());
+
+ ev = events.get(0);
+ assertEquals(ev.toString(), "compute", ev.actionName);
+ ops = (List<TriggerEvent.Op>)ev.event.getProperty(TriggerEvent.REQUESTED_OPS);
+ assertNotNull("there should be some requestedOps: " + ev.toString(), ops);
+ assertEquals(ops.toString(), 1, ops.size());
+ assertEquals(ops.toString(), CollectionParams.CollectionAction.NONE, ops.get(0).getAction());
+
+ listenerEvents.clear();
+ finished = new CountDownLatch(1);
+ started = new CountDownLatch(1);
+
+ // now allow single replicas
+ setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'search_rate_trigger2'," +
+ "'event' : 'searchRate'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'collections' : '" + COLL1 + "'," +
+ "'aboveRate' : 1.0," +
+ "'belowRate' : 0.1," +
+ "'minReplicas' : 1," +
+ "'belowNodeOp' : 'none'," +
+ "'actions' : [" +
+ "{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
+ "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}" +
+ "]" +
+ "}}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
+
+ await = started.await(20, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ await = finished.await(60, TimeUnit.SECONDS);
+ assertTrue("The trigger did not finish processing", await);
+
+ // suspend trigger
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ timeSource.sleep(5000);
+
+ events = listenerEvents.get("srt");
+ assertEquals(events.toString(), 3, events.size());
+
+ ev = events.get(0);
+ assertEquals(ev.toString(), "compute", ev.actionName);
+ ops = (List<TriggerEvent.Op>)ev.event.getProperty(TriggerEvent.REQUESTED_OPS);
+ assertNotNull("there should be some requestedOps: " + ev.toString(), ops);
+ assertEquals(ops.toString(), 2, ops.size());
+ AtomicInteger coldNodes2 = new AtomicInteger();
+ AtomicInteger coldReplicas2 = new AtomicInteger();
+ ops.forEach(op -> {
+ if (op.getAction().equals(CollectionParams.CollectionAction.NONE)) {
+ coldNodes2.incrementAndGet();
+ } else if (op.getAction().equals(CollectionParams.CollectionAction.DELETEREPLICA)) {
+ coldReplicas2.incrementAndGet();
+ } else {
+ fail("unexpected op: " + op);
+ }
+ });
+
+ assertEquals("coldNodes", 1, coldNodes2.get());
+ assertEquals("colReplicas", 1, coldReplicas2.get());
+
+ // now the collection should be at RF == 1, with one additional PULL replica
+ CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS,
+ CloudTestUtils.clusterShape(1, 1));
+ }
+
+ @Test
+ public void testDeleteNode() throws Exception {
+ CloudSolrClient solrClient = cluster.getSolrClient();
+ String COLL1 = "deleteNode_collection";
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
+ "conf", 1, 2);
+
+ create.process(solrClient);
+ CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS,
+ CloudTestUtils.clusterShape(1, 2));
+
+ // add a couple of spare replicas above RF. Use different types to verify that only
+ // searchable replicas are considered
+ // these additional replicas will be placed on other nodes in the cluster
+ solrClient.request(CollectionAdminRequest.addReplicaToShard(COLL1, "shard1", Replica.Type.NRT));
+ solrClient.request(CollectionAdminRequest.addReplicaToShard(COLL1, "shard1", Replica.Type.TLOG));
+ solrClient.request(CollectionAdminRequest.addReplicaToShard(COLL1, "shard1", Replica.Type.PULL));
+
+ CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS,
+ CloudTestUtils.clusterShape(1, 5));
+
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'search_rate_trigger3'," +
+ "'event' : 'searchRate'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : false," +
+ "'collections' : '" + COLL1 + "'," +
+ "'aboveRate' : 1.0," +
+ "'belowRate' : 0.1," +
+ // allow deleting all spare replicas
+ "'minReplicas' : 1," +
+ // allow requesting all deletions in one event
+ "'maxOps' : 10," +
+ // delete underutilised nodes
+ "'belowNodeOp' : 'DELETENODE'," +
+ "'actions' : [" +
+ "{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
+ "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}" +
+ "]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ String setListenerCommand = "{" +
+ "'set-listener' : " +
+ "{" +
+ "'name' : 'started'," +
+ "'trigger' : 'search_rate_trigger3'," +
+ "'stage' : ['STARTED']," +
+ "'class' : '" + StartedProcessingListener.class.getName() + "'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ setListenerCommand = "{" +
+ "'set-listener' : " +
+ "{" +
+ "'name' : 'srt'," +
+ "'trigger' : 'search_rate_trigger3'," +
+ "'stage' : ['FAILED','SUCCEEDED']," +
+ "'afterAction': ['compute', 'execute']," +
+ "'class' : '" + CapturingTriggerListener.class.getName() + "'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ setListenerCommand = "{" +
+ "'set-listener' : " +
+ "{" +
+ "'name' : 'finished'," +
+ "'trigger' : 'search_rate_trigger3'," +
+ "'stage' : ['SUCCEEDED']," +
+ "'class' : '" + FinishedProcessingListener.class.getName() + "'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
+
+ // enable the trigger
+ String resumeTriggerCommand = "{" +
+ "'resume-trigger' : {" +
+ "'name' : 'search_rate_trigger3'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, resumeTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
+
+ boolean await = started.await(20, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ await = finished.await(90, TimeUnit.SECONDS);
+ assertTrue("The trigger did not finish processing", await);
+
+ // suspend the trigger
+ String suspendTriggerCommand = "{" +
+ "'suspend-trigger' : {" +
+ "'name' : 'search_rate_trigger3'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ timeSource.sleep(5000);
+
+ List<CapturedEvent> events = listenerEvents.get("srt");
+ assertEquals(events.toString(), 3, events.size());
+
+ CapturedEvent ev = events.get(0);
+ assertEquals(ev.toString(), "compute", ev.actionName);
+ List<TriggerEvent.Op> ops = (List<TriggerEvent.Op>)ev.event.getProperty(TriggerEvent.REQUESTED_OPS);
+ assertNotNull("there should be some requestedOps: " + ev.toString(), ops);
+ // 4 DELETEREPLICA, 4 DELETENODE
+ assertEquals(ops.toString(), 8, ops.size());
+ AtomicInteger replicas = new AtomicInteger();
+ AtomicInteger nodes = new AtomicInteger();
+ ops.forEach(op -> {
+ if (op.getAction().equals(CollectionParams.CollectionAction.DELETEREPLICA)) {
+ replicas.incrementAndGet();
+ } else if (op.getAction().equals(CollectionParams.CollectionAction.DELETENODE)) {
+ nodes.incrementAndGet();
+ } else {
+ fail("unexpected op: " + op);
+ }
+ });
+ assertEquals(ops.toString(), 4, replicas.get());
+ assertEquals(ops.toString(), 4, nodes.get());
+ // check status
+ ev = events.get(1);
+ assertEquals(ev.toString(), "execute", ev.actionName);
+ List<NamedList<Object>> responses = (List<NamedList<Object>>)ev.context.get("properties.responses");
+ assertNotNull(ev.toString(), responses);
+ assertEquals(responses.toString(), 8, responses.size());
+ replicas.set(0);
+ nodes.set(0);
+ responses.forEach(m -> {
+ if (m.get("success") != null) {
+ replicas.incrementAndGet();
+ } else if (m.get("status") != null) {
+ NamedList<Object> status = (NamedList<Object>)m.get("status");
+ if ("completed".equals(status.get("state"))) {
+ nodes.incrementAndGet();
+ } else {
+ fail("unexpected DELETENODE status: " + m);
}
- triggerFiredLatch.countDown();
- } catch (Throwable t) {
- log.debug("--throwable", t);
- throw t;
+ } else {
+ fail("unexpected status: " + m);
}
- }
+ });
+
+ assertEquals(responses.toString(), 4, replicas.get());
+ assertEquals(responses.toString(), 4, nodes.get());
+
+ // we are left with one searchable replica
+ CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS,
+ CloudTestUtils.clusterShape(1, 1));
}
- public static class TestTriggerListener extends TriggerListenerBase {
+ public static class CapturingTriggerListener extends TriggerListenerBase {
@Override
public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) throws TriggerValidationException {
super.configure(loader, cloudManager, config);
@@ -212,7 +691,26 @@ public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
ActionContext context, Throwable error, String message) {
List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
- lst.add(new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message));
+ CapturedEvent ev = new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message);
+ log.info("=======> " + ev);
+ lst.add(ev);
+ }
+ }
+
+ public static class StartedProcessingListener extends TriggerListenerBase {
+
+ @Override
+ public void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName, ActionContext context, Throwable error, String message) throws Exception {
+ started.countDown();
}
}
+
+ public static class FinishedProcessingListener extends TriggerListenerBase {
+
+ @Override
+ public void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName, ActionContext context, Throwable error, String message) throws Exception {
+ finished.countDown();
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0d969ab8/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerTest.java
index 1c72649..7969869 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerTest.java
@@ -18,25 +18,37 @@ package org.apache.solr.cloud.autoscaling;
import java.net.URL;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import com.google.common.util.concurrent.AtomicDouble;
+import org.apache.solr.client.solrj.cloud.NodeStateProvider;
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.SolrClientCloudManager;
+import org.apache.solr.client.solrj.impl.SolrClientNodeStateProvider;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.CloudTestUtils;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.cloud.ZkDistributedQueueFactory;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.params.AutoScalingParams;
+import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.TimeSource;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrResourceLoader;
+import org.apache.solr.util.TimeOut;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -59,6 +71,23 @@ public class SearchRateTriggerTest extends SolrCloudTestCase {
configureCluster(4)
.addConfig("conf", configset("cloud-minimal"))
.configure();
+ }
+
+ @Before
+ public void removeCollections() throws Exception {
+ cluster.deleteAllCollections();
+ if (cluster.getJettySolrRunners().size() < 4) {
+ cluster.startJettySolrRunner();
+ }
+ }
+
+ @Test
+ public void testTrigger() throws Exception {
+ SolrZkClient zkClient = cluster.getSolrClient().getZkStateReader().getZkClient();
+ SolrResourceLoader loader = cluster.getJettySolrRunner(0).getCoreContainer().getResourceLoader();
+ CoreContainer container = cluster.getJettySolrRunner(0).getCoreContainer();
+ SolrCloudManager cloudManager = new SolrClientCloudManager(new ZkDistributedQueueFactory(zkClient), cluster.getSolrClient());
+
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
"conf", 2, 2);
CloudSolrClient solrClient = cluster.getSolrClient();
@@ -68,20 +97,15 @@ public class SearchRateTriggerTest extends SolrCloudTestCase {
"conf", 2, 2);
create.setMaxShardsPerNode(1);
create.process(solrClient);
- }
- @Test
- public void testTrigger() throws Exception {
+ CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS, clusterShape(2, 2));
+ CloudTestUtils.waitForState(cloudManager, COLL2, 60, TimeUnit.SECONDS, clusterShape(2, 2));
+
double rate = 1.0;
- SolrZkClient zkClient = cluster.getSolrClient().getZkStateReader().getZkClient();
- SolrResourceLoader loader = cluster.getJettySolrRunner(0).getCoreContainer().getResourceLoader();
- CoreContainer container = cluster.getJettySolrRunner(0).getCoreContainer();
- SolrCloudManager cloudManager = new SolrClientCloudManager(new ZkDistributedQueueFactory(zkClient), cluster.getSolrClient());
URL baseUrl = cluster.getJettySolrRunners().get(1).getBaseUrl();
long waitForSeconds = 5 + random().nextInt(5);
- Map<String, Object> props = createTriggerProps(waitForSeconds, rate);
+ Map<String, Object> props = createTriggerProps(Arrays.asList(COLL1, COLL2), waitForSeconds, rate, -1);
final List<TriggerEvent> events = new ArrayList<>();
- CloudSolrClient solrClient = cluster.getSolrClient();
try (SearchRateTrigger trigger = new SearchRateTrigger("search_rate_trigger")) {
trigger.configure(loader, cloudManager, props);
@@ -95,19 +119,21 @@ public class SearchRateTriggerTest extends SolrCloudTestCase {
String url = baseUrl.toString() + "/" + coreName;
try (HttpSolrClient simpleClient = new HttpSolrClient.Builder(url).build()) {
SolrParams query = params(CommonParams.Q, "*:*", CommonParams.DISTRIB, "false");
- for (int i = 0; i < 200; i++) {
+ for (int i = 0; i < 500; i++) {
simpleClient.query(query);
}
trigger.run();
// waitFor delay
assertEquals(0, events.size());
- Thread.sleep(waitForSeconds * 1000 + 2000);
+ Thread.sleep(waitForSeconds * 1000);
+ trigger.run();
+ Thread.sleep(waitForSeconds * 1000);
// should generate replica event
trigger.run();
assertEquals(1, events.size());
TriggerEvent event = events.get(0);
assertEquals(TriggerEventType.SEARCHRATE, event.eventType);
- List<ReplicaInfo> infos = (List<ReplicaInfo>)event.getProperty(AutoScalingParams.REPLICA);
+ List<ReplicaInfo> infos = (List<ReplicaInfo>)event.getProperty(SearchRateTrigger.HOT_REPLICAS);
assertEquals(1, infos.size());
ReplicaInfo info = infos.get(0);
assertEquals(coreName, info.getCore());
@@ -120,12 +146,12 @@ public class SearchRateTriggerTest extends SolrCloudTestCase {
for (int i = 0; i < 500; i++) {
solrClient.query(COLL1, query);
}
- Thread.sleep(waitForSeconds * 1000 + 2000);
+ Thread.sleep(waitForSeconds * 1000);
trigger.run();
// should generate collection event
assertEquals(1, events.size());
TriggerEvent event = events.get(0);
- Map<String, Double> hotCollections = (Map<String, Double>)event.getProperty(AutoScalingParams.COLLECTION);
+ Map<String, Double> hotCollections = (Map<String, Double>)event.getProperty(SearchRateTrigger.HOT_COLLECTIONS);
assertEquals(1, hotCollections.size());
Double Rate = hotCollections.get(COLL1);
assertNotNull(Rate);
@@ -134,21 +160,20 @@ public class SearchRateTriggerTest extends SolrCloudTestCase {
for (int i = 0; i < 1000; i++) {
solrClient.query(COLL2, query);
+ solrClient.query(COLL1, query);
}
- Thread.sleep(waitForSeconds * 1000 + 2000);
+ Thread.sleep(waitForSeconds * 1000);
trigger.run();
// should generate node and collection event but not for COLL2 because of waitFor
assertEquals(1, events.size());
event = events.get(0);
- Map<String, Double> hotNodes = (Map<String, Double>)event.getProperty(AutoScalingParams.NODE);
+ Map<String, Double> hotNodes = (Map<String, Double>)event.getProperty(SearchRateTrigger.HOT_NODES);
assertEquals(3, hotNodes.size());
hotNodes.forEach((n, r) -> assertTrue(n, r > rate));
- hotCollections = (Map<String, Double>)event.getProperty(AutoScalingParams.COLLECTION);
- assertEquals(2, hotCollections.size());
+ hotCollections = (Map<String, Double>)event.getProperty(SearchRateTrigger.HOT_COLLECTIONS);
+ assertEquals(1, hotCollections.size());
Rate = hotCollections.get(COLL1);
assertNotNull(Rate);
- Rate = hotCollections.get(COLL2);
- assertNotNull(Rate);
events.clear();
// assert that waitFor prevents new events from being generated
@@ -156,28 +181,154 @@ public class SearchRateTriggerTest extends SolrCloudTestCase {
// should not generate any events
assertEquals(0, events.size());
- Thread.sleep(waitForSeconds * 1000 + 2000);
+ Thread.sleep(waitForSeconds * 1000 * 2);
trigger.run();
// should generate node and collection event
assertEquals(1, events.size());
- hotCollections = (Map<String, Double>)event.getProperty(AutoScalingParams.COLLECTION);
+ event = events.get(0);
+ hotCollections = (Map<String, Double>)event.getProperty(SearchRateTrigger.HOT_COLLECTIONS);
assertEquals(2, hotCollections.size());
Rate = hotCollections.get(COLL1);
assertNotNull(Rate);
Rate = hotCollections.get(COLL2);
assertNotNull(Rate);
- hotNodes = (Map<String, Double>)event.getProperty(AutoScalingParams.NODE);
+ hotNodes = (Map<String, Double>)event.getProperty(SearchRateTrigger.HOT_NODES);
assertEquals(3, hotNodes.size());
hotNodes.forEach((n, r) -> assertTrue(n, r > rate));
}
}
- private Map<String, Object> createTriggerProps(long waitForSeconds, double rate) {
+ private static final AtomicDouble mockRate = new AtomicDouble();
+
+ @Test
+ public void testWaitForElapsed() throws Exception {
+ SolrResourceLoader loader = cluster.getJettySolrRunner(0).getCoreContainer().getResourceLoader();
+ CloudSolrClient solrClient = cluster.getSolrClient();
+ SolrZkClient zkClient = solrClient.getZkStateReader().getZkClient();
+ SolrCloudManager cloudManager = new SolrClientCloudManager(new ZkDistributedQueueFactory(zkClient), solrClient) {
+ @Override
+ public NodeStateProvider getNodeStateProvider() {
+ return new SolrClientNodeStateProvider(solrClient) {
+ @Override
+ public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
+ Map<String, Object> values = super.getNodeValues(node, tags);
+ values.keySet().forEach(k -> {
+ values.replace(k, mockRate.get());
+ });
+ return values;
+ }
+ };
+ }
+ };
+ TimeSource timeSource = cloudManager.getTimeSource();
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
+ "conf", 2, 2);
+ create.setMaxShardsPerNode(1);
+ create.process(solrClient);
+ CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS, clusterShape(2, 2));
+
+ long waitForSeconds = 5 + random().nextInt(5);
+ Map<String, Object> props = createTriggerProps(Arrays.asList(COLL1, COLL2), waitForSeconds, 1.0, 0.1);
+ final List<TriggerEvent> events = new ArrayList<>();
+
+ try (SearchRateTrigger trigger = new SearchRateTrigger("search_rate_trigger1")) {
+ trigger.configure(loader, cloudManager, props);
+ trigger.init();
+ trigger.setProcessor(noFirstRunProcessor);
+ trigger.run();
+ trigger.setProcessor(event -> events.add(event));
+
+ // set mock rates
+ mockRate.set(2.0);
+ TimeOut timeOut = new TimeOut(waitForSeconds + 2, TimeUnit.SECONDS, timeSource);
+ // simulate ScheduledTriggers
+ while (!timeOut.hasTimedOut()) {
+ trigger.run();
+ timeSource.sleep(1000);
+ }
+ // violation persisted longer than waitFor - there should be events
+ assertTrue(events.toString(), events.size() > 0);
+ TriggerEvent event = events.get(0);
+ assertEquals(event.toString(), TriggerEventType.SEARCHRATE, event.eventType);
+ Map<String, Object> hotNodes, hotCollections, hotShards;
+ List<ReplicaInfo> hotReplicas;
+ hotNodes = (Map<String, Object>)event.properties.get(SearchRateTrigger.HOT_NODES);
+ hotCollections = (Map<String, Object>)event.properties.get(SearchRateTrigger.HOT_COLLECTIONS);
+ hotShards = (Map<String, Object>)event.properties.get(SearchRateTrigger.HOT_SHARDS);
+ hotReplicas = (List<ReplicaInfo>)event.properties.get(SearchRateTrigger.HOT_REPLICAS);
+ assertFalse("no hot nodes?", hotNodes.isEmpty());
+ assertFalse("no hot collections?", hotCollections.isEmpty());
+ assertFalse("no hot shards?", hotShards.isEmpty());
+ assertFalse("no hot replicas?", hotReplicas.isEmpty());
+ }
+
+ mockRate.set(0.0);
+ events.clear();
+
+ try (SearchRateTrigger trigger = new SearchRateTrigger("search_rate_trigger2")) {
+ trigger.configure(loader, cloudManager, props);
+ trigger.init();
+ trigger.setProcessor(noFirstRunProcessor);
+ trigger.run();
+ trigger.setProcessor(event -> events.add(event));
+
+ mockRate.set(2.0);
+ trigger.run();
+ // waitFor not elapsed
+ assertTrue(events.toString(), events.isEmpty());
+ Thread.sleep(1000);
+ trigger.run();
+ assertTrue(events.toString(), events.isEmpty());
+ Thread.sleep(1000);
+ mockRate.set(0.0);
+ trigger.run();
+ Thread.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds - 2, TimeUnit.SECONDS));
+ trigger.run();
+
+ // violations persisted shorter than waitFor - there should be no events
+ assertTrue(events.toString(), events.isEmpty());
+
+ }
+ }
+
+ @Test
+ public void testDefaultsAndBackcompat() throws Exception {
Map<String, Object> props = new HashMap<>();
- props.put("rate", rate);
+ props.put("rate", 1.0);
+ props.put("collection", "test");
+ SolrResourceLoader loader = cluster.getJettySolrRunner(0).getCoreContainer().getResourceLoader();
+ SolrZkClient zkClient = cluster.getSolrClient().getZkStateReader().getZkClient();
+ SolrCloudManager cloudManager = new SolrClientCloudManager(new ZkDistributedQueueFactory(zkClient), cluster.getSolrClient());
+ try (SearchRateTrigger trigger = new SearchRateTrigger("search_rate_trigger2")) {
+ trigger.configure(loader, cloudManager, props);
+ Map<String, Object> config = trigger.getConfig();
+ Set<String> collections = (Set<String>)config.get(SearchRateTrigger.COLLECTIONS_PROP);
+ assertEquals(collections.toString(), 1, collections.size());
+ assertEquals("test", collections.iterator().next());
+ assertEquals("#ANY", config.get(AutoScalingParams.SHARD));
+ assertEquals("#ANY", config.get(AutoScalingParams.NODE));
+ assertEquals(1.0, config.get(SearchRateTrigger.ABOVE_RATE_PROP));
+ assertEquals(-1.0, config.get(SearchRateTrigger.BELOW_RATE_PROP));
+ assertEquals(SearchRateTrigger.DEFAULT_METRIC, config.get(SearchRateTrigger.METRIC_PROP));
+ assertEquals(SearchRateTrigger.DEFAULT_MAX_OPS, config.get(SearchRateTrigger.MAX_OPS_PROP));
+ assertNull(config.get(SearchRateTrigger.MIN_REPLICAS_PROP));
+ assertEquals(CollectionParams.CollectionAction.ADDREPLICA, config.get(SearchRateTrigger.ABOVE_OP_PROP));
+ assertEquals(CollectionParams.CollectionAction.MOVEREPLICA, config.get(SearchRateTrigger.ABOVE_NODE_OP_PROP));
+ assertEquals(CollectionParams.CollectionAction.DELETEREPLICA, config.get(SearchRateTrigger.BELOW_OP_PROP));
+ assertNull(config.get(SearchRateTrigger.BELOW_NODE_OP_PROP));
+ }
+ }
+
+ private Map<String, Object> createTriggerProps(List<String> collections, long waitForSeconds, double aboveRate, double belowRate) {
+ Map<String, Object> props = new HashMap<>();
+ props.put("aboveRate", aboveRate);
+ props.put("belowRate", belowRate);
props.put("event", "searchRate");
props.put("waitFor", waitForSeconds);
props.put("enabled", true);
+ if (collections != null && !collections.isEmpty()) {
+ props.put("collections", String.join(",", collections));
+ }
List<Map<String, String>> actions = new ArrayList<>(3);
Map<String, String> map = new HashMap<>(2);
map.put("name", "compute_plan");