You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2018/04/24 09:54:39 UTC

[2/2] lucene-solr:branch_7x: SOLR-11833: Allow searchRate trigger to delete replicas.

SOLR-11833: Allow searchRate trigger to delete replicas.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/9541d7e3
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/9541d7e3
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/9541d7e3

Branch: refs/heads/branch_7x
Commit: 9541d7e38b4c6b7ac779d76e2728a7bb3a301d36
Parents: 9e36c51
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Mon Apr 23 22:19:01 2018 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Tue Apr 24 10:58:29 2018 +0200

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   4 +-
 .../cloud/autoscaling/IndexSizeTrigger.java     |   4 +-
 .../cloud/autoscaling/SearchRateTrigger.java    | 487 +++++++++++++--
 .../org/apache/solr/cloud/CloudTestUtils.java   |   8 +-
 .../autoscaling/AutoScalingHandlerTest.java     |   4 +-
 .../SearchRateTriggerIntegrationTest.java       | 592 +++++++++++++++++--
 .../autoscaling/SearchRateTriggerTest.java      | 201 ++++++-
 .../cloud/autoscaling/sim/TestLargeCluster.java |   5 +-
 .../autoscaling/sim/TestTriggerIntegration.java |  11 +-
 .../src/solrcloud-autoscaling-triggers.adoc     |  95 ++-
 .../cloud/autoscaling/DeleteNodeSuggester.java  |  46 ++
 .../autoscaling/DeleteReplicaSuggester.java     |  74 +++
 .../client/solrj/cloud/autoscaling/Policy.java  |   4 +-
 .../solrj/cloud/autoscaling/ReplicaInfo.java    |  18 +
 .../cloud/autoscaling/SplitShardSuggester.java  |   6 +
 .../solrj/cloud/autoscaling/Suggester.java      |   8 +-
 .../solrj/impl/SolrClientCloudManager.java      |   2 +-
 .../solrj/request/CollectionAdminRequest.java   |  11 +
 .../org/apache/solr/common/cloud/Replica.java   |   2 +
 19 files changed, 1410 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9541d7e3/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 5513a5f..b03e5c7 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -162,9 +162,11 @@ Bug Fixes
 
 * SOLR-12250: NegativeArraySizeException on TransactionLog if previous document more than 1.9GB (Cao Manh Dat)
 
-
 * SOLR-12253: Remove optimize button from the core admin page (Erick Erickson)
 
+* SOLR-11833: Allow searchRate trigger to delete replicas. Improve configurability of the trigger by specifying
+  upper / lower thresholds and respective actions (ab)
+
 Optimizations
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9541d7e3/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java
index 756f88f..9978362 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/IndexSizeTrigger.java
@@ -143,11 +143,11 @@ public class IndexSizeTrigger extends TriggerBase {
     String belowOpStr = String.valueOf(properties.getOrDefault(BELOW_OP_PROP, CollectionParams.CollectionAction.MERGESHARDS.toLower()));
     aboveOp = CollectionParams.CollectionAction.get(aboveOpStr);
     if (aboveOp == null) {
-      throw new TriggerValidationException(getName(), ABOVE_OP_PROP, "unrecognized value of " + ABOVE_OP_PROP + ": '" + aboveOpStr + "'");
+      throw new TriggerValidationException(getName(), ABOVE_OP_PROP, "unrecognized value of: '" + aboveOpStr + "'");
     }
     belowOp = CollectionParams.CollectionAction.get(belowOpStr);
     if (belowOp == null) {
-      throw new TriggerValidationException(getName(), BELOW_OP_PROP, "unrecognized value of " + BELOW_OP_PROP + ": '" + belowOpStr + "'");
+      throw new TriggerValidationException(getName(), BELOW_OP_PROP, "unrecognized value of: '" + belowOpStr + "'");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9541d7e3/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java
index 02a2d0c..1824f7f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java
@@ -16,17 +16,21 @@
  */
 package org.apache.solr.cloud.autoscaling;
 
+import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.AtomicDouble;
 import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
 import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
@@ -34,9 +38,13 @@ import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
 import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.AutoScalingParams;
 import org.apache.solr.common.params.CollectionParams;
 import org.apache.solr.common.util.Pair;
+import org.apache.solr.common.util.StrUtils;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.SolrResourceLoader;
 import org.apache.solr.metrics.SolrCoreMetricManager;
@@ -49,11 +57,43 @@ import org.slf4j.LoggerFactory;
 public class SearchRateTrigger extends TriggerBase {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private String handler;
-  private String collection;
+  public static final String COLLECTIONS_PROP = "collections";
+  public static final String METRIC_PROP = "metric";
+  public static final String MAX_OPS_PROP = "maxOps";
+  public static final String MIN_REPLICAS_PROP = "minReplicas";
+  public static final String ABOVE_RATE_PROP = "aboveRate";
+  public static final String BELOW_RATE_PROP = "belowRate";
+  public static final String ABOVE_OP_PROP = "aboveOp";
+  public static final String BELOW_OP_PROP = "belowOp";
+  public static final String ABOVE_NODE_OP_PROP = "aboveNodeOp";
+  public static final String BELOW_NODE_OP_PROP = "belowNodeOp";
+
+  // back-compat
+  public static final String BC_COLLECTION_PROP = "collection";
+  public static final String BC_RATE_PROP = "rate";
+
+
+  public static final String HOT_NODES = "hotNodes";
+  public static final String HOT_COLLECTIONS = "hotCollections";
+  public static final String HOT_SHARDS = "hotShards";
+  public static final String HOT_REPLICAS = "hotReplicas";
+  public static final String COLD_NODES = "coldNodes";
+  public static final String COLD_COLLECTIONS = "coldCollections";
+  public static final String COLD_SHARDS = "coldShards";
+  public static final String COLD_REPLICAS = "coldReplicas";
+
+  public static final int DEFAULT_MAX_OPS = 3;
+  public static final String DEFAULT_METRIC = "QUERY./select.requestTimes:1minRate";
+
+  private String metric;
+  private int maxOps;
+  private Integer minReplicas = null;
+  private final Set<String> collections = new HashSet<>();
   private String shard;
   private String node;
-  private double rate;
+  private double aboveRate;
+  private double belowRate;
+  private CollectionParams.CollectionAction aboveOp, belowOp, aboveNodeOp, belowNodeOp;
   private final Map<String, Long> lastCollectionEvent = new ConcurrentHashMap<>();
   private final Map<String, Long> lastNodeEvent = new ConcurrentHashMap<>();
   private final Map<String, Long> lastShardEvent = new ConcurrentHashMap<>();
@@ -66,29 +106,138 @@ public class SearchRateTrigger extends TriggerBase {
     this.state.put("lastNodeEvent", lastNodeEvent);
     this.state.put("lastShardEvent", lastShardEvent);
     this.state.put("lastReplicaEvent", lastReplicaEvent);
-    TriggerUtils.requiredProperties(requiredProperties, validProperties, "rate");
+    TriggerUtils.validProperties(validProperties,
+        COLLECTIONS_PROP, AutoScalingParams.SHARD, AutoScalingParams.NODE,
+        METRIC_PROP,
+        MAX_OPS_PROP,
+        MIN_REPLICAS_PROP,
+        ABOVE_OP_PROP,
+        BELOW_OP_PROP,
+        ABOVE_NODE_OP_PROP,
+        BELOW_NODE_OP_PROP,
+        ABOVE_RATE_PROP,
+        BELOW_RATE_PROP,
+        // back-compat props
+        BC_COLLECTION_PROP,
+        BC_RATE_PROP);
   }
 
   @Override
   public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, Map<String, Object> properties) throws TriggerValidationException {
     super.configure(loader, cloudManager, properties);
     // parse config options
-    collection = (String)properties.getOrDefault(AutoScalingParams.COLLECTION, Policy.ANY);
+    String collectionsStr = (String)properties.get(COLLECTIONS_PROP);
+    if (collectionsStr != null) {
+      collections.addAll(StrUtils.splitSmart(collectionsStr, ','));
+    }
+    // check back-compat collection prop
+    collectionsStr = (String)properties.get(BC_COLLECTION_PROP);
+    if (collectionsStr != null) {
+      if (!collectionsStr.equals(Policy.ANY)) {
+        collections.add(collectionsStr);
+      }
+    }
     shard = (String)properties.getOrDefault(AutoScalingParams.SHARD, Policy.ANY);
-    if (collection.equals(Policy.ANY) && !shard.equals(Policy.ANY)) {
-      throw new TriggerValidationException("shard", "When 'shard' is other than #ANY then collection name must be also other than #ANY");
+    if (!shard.equals(Policy.ANY) && (collections.isEmpty() || collections.size() > 1)) {
+      throw new TriggerValidationException(name, AutoScalingParams.SHARD, "When 'shard' is other than #ANY then exactly one collection name must be set");
     }
     node = (String)properties.getOrDefault(AutoScalingParams.NODE, Policy.ANY);
-    handler = (String)properties.getOrDefault(AutoScalingParams.HANDLER, "/select");
+    metric = (String)properties.getOrDefault(METRIC_PROP, DEFAULT_METRIC);
+
+    String maxOpsStr = String.valueOf(properties.getOrDefault(MAX_OPS_PROP, DEFAULT_MAX_OPS));
+    try {
+      maxOps = Integer.parseInt(maxOpsStr);
+    } catch (Exception e) {
+      throw new TriggerValidationException(name, MAX_OPS_PROP, "invalid value '" + maxOpsStr + "': " + e.toString());
+    }
+
+    Object o = properties.get(MIN_REPLICAS_PROP);
+    if (o != null) {
+      try {
+        minReplicas = Integer.parseInt(o.toString());
+        if (minReplicas < 1) {
+          throw new Exception("must be at least 1, or not set to use 'replicationFactor'");
+        }
+      } catch (Exception e) {
+        throw new TriggerValidationException(name, MIN_REPLICAS_PROP, "invalid value '" + o + "': " + e.toString());
+      }
+    }
+
+    Object above = properties.get(ABOVE_RATE_PROP);
+    Object below = properties.get(BELOW_RATE_PROP);
+    // back-compat rate prop
+    if (properties.containsKey(BC_RATE_PROP)) {
+      above = properties.get(BC_RATE_PROP);
+    }
+    if (above == null && below == null) {
+      throw new TriggerValidationException(name, ABOVE_RATE_PROP, "at least one of '" +
+      ABOVE_RATE_PROP + "' or '" + BELOW_RATE_PROP + "' must be set");
+    }
+    if (above != null) {
+      try {
+        aboveRate = Double.parseDouble(String.valueOf(above));
+      } catch (Exception e) {
+        throw new TriggerValidationException(name, ABOVE_RATE_PROP, "Invalid configuration value: '" + above + "': " + e.toString());
+      }
+    } else {
+      aboveRate = Double.MAX_VALUE;
+    }
+    if (below != null) {
+      try {
+        belowRate = Double.parseDouble(String.valueOf(below));
+      } catch (Exception e) {
+        throw new TriggerValidationException(name, BELOW_RATE_PROP, "Invalid configuration value: '" + below + "': " + e.toString());
+      }
+    } else {
+      belowRate = -1;
+    }
 
-    String rateString = String.valueOf(properties.get("rate"));
+    String aboveOpStr = String.valueOf(properties.getOrDefault(ABOVE_OP_PROP, CollectionParams.CollectionAction.ADDREPLICA.toLower()));
+    String belowOpStr = String.valueOf(properties.getOrDefault(BELOW_OP_PROP, CollectionParams.CollectionAction.DELETEREPLICA.toLower()));
+    aboveOp = CollectionParams.CollectionAction.get(aboveOpStr);
+    if (aboveOp == null) {
+      throw new TriggerValidationException(getName(), ABOVE_OP_PROP, "unrecognized value: '" + aboveOpStr + "'");
+    }
+    belowOp = CollectionParams.CollectionAction.get(belowOpStr);
+    if (belowOp == null) {
+      throw new TriggerValidationException(getName(), BELOW_OP_PROP, "unrecognized value: '" + belowOpStr + "'");
+    }
+    Object aboveNodeObj = properties.getOrDefault(ABOVE_NODE_OP_PROP, CollectionParams.CollectionAction.MOVEREPLICA.toLower());
+    // do NOT set the default to DELETENODE
+    Object belowNodeObj = properties.get(BELOW_NODE_OP_PROP);
     try {
-      rate = Double.parseDouble(rateString);
+      aboveNodeOp = CollectionParams.CollectionAction.get(String.valueOf(aboveNodeObj));
     } catch (Exception e) {
-      throw new TriggerValidationException(name, "rate", "Invalid 'rate' configuration value: '" + rateString + "': " + e.toString());
+      throw new TriggerValidationException(getName(), ABOVE_NODE_OP_PROP, "unrecognized value: '" + aboveNodeObj + "'");
+    }
+    if (belowNodeObj != null) {
+      try {
+        belowNodeOp = CollectionParams.CollectionAction.get(String.valueOf(belowNodeObj));
+      } catch (Exception e) {
+        throw new TriggerValidationException(getName(), BELOW_NODE_OP_PROP, "unrecognized value: '" + belowNodeObj + "'");
+      }
     }
   }
 
+  @VisibleForTesting
+  Map<String, Object> getConfig() {
+    Map<String, Object> config = new HashMap<>();
+    config.put("name", name);
+    config.put(COLLECTIONS_PROP, collections);
+    config.put(AutoScalingParams.SHARD, shard);
+    config.put(AutoScalingParams.NODE, node);
+    config.put(METRIC_PROP, metric);
+    config.put(MAX_OPS_PROP, maxOps);
+    config.put(MIN_REPLICAS_PROP, minReplicas);
+    config.put(ABOVE_RATE_PROP, aboveRate);
+    config.put(BELOW_RATE_PROP, belowRate);
+    config.put(ABOVE_OP_PROP, aboveOp);
+    config.put(ABOVE_NODE_OP_PROP, aboveNodeOp);
+    config.put(BELOW_OP_PROP, belowOp);
+    config.put(BELOW_NODE_OP_PROP, belowNodeOp);
+    return config;
+  }
+
   @Override
   protected Map<String, Object> getState() {
     return state;
@@ -146,26 +295,42 @@ public class SearchRateTrigger extends TriggerBase {
       return;
     }
 
+    // collection, shard, list(replica + rate)
     Map<String, Map<String, List<ReplicaInfo>>> collectionRates = new HashMap<>();
+    // node, rate
     Map<String, AtomicDouble> nodeRates = new HashMap<>();
-    Map<String, Integer> replicationFactors = new HashMap<>();
+    // this replication factor only considers replica types that are searchable
+    // collection, shard, RF
+    Map<String, Map<String, AtomicInteger>> searchableReplicationFactors = new HashMap<>();
 
+    ClusterState clusterState = null;
+    try {
+      clusterState = cloudManager.getClusterStateProvider().getClusterState();
+    } catch (IOException e) {
+      log.warn("Error getting ClusterState", e);
+      return;
+    }
     for (String node : cloudManager.getClusterStateProvider().getLiveNodes()) {
       Map<String, ReplicaInfo> metricTags = new HashMap<>();
       // coll, shard, replica
       Map<String, Map<String, List<ReplicaInfo>>> infos = cloudManager.getNodeStateProvider().getReplicaInfo(node, Collections.emptyList());
       infos.forEach((coll, shards) -> {
-        replicationFactors.computeIfAbsent(coll, c -> shards.size());
+        Map<String, AtomicInteger> replPerShard = searchableReplicationFactors.computeIfAbsent(coll, c -> new HashMap<>());
         shards.forEach((sh, replicas) -> {
+          AtomicInteger repl = replPerShard.computeIfAbsent(sh, s -> new AtomicInteger());
           replicas.forEach(replica -> {
+            // skip non-active replicas
+            if (replica.getState() != Replica.State.ACTIVE) {
+              return;
+            }
+            repl.incrementAndGet();
             // we have to translate to the metrics registry name, which uses "_replica_nN" as suffix
             String replicaName = Utils.parseMetricsReplicaName(coll, replica.getCore());
             if (replicaName == null) { // should never happen???
               replicaName = replica.getName(); // which is actually coreNode name...
             }
             String registry = SolrCoreMetricManager.createRegistryName(true, coll, sh, replicaName, null);
-            String tag = "metrics:" + registry
-                + ":QUERY." + handler + ".requestTimes:1minRate";
+            String tag = "metrics:" + registry + ":" + metric;
             metricTags.put(tag, replica);
           });
         });
@@ -191,48 +356,100 @@ public class SearchRateTrigger extends TriggerBase {
     }
 
     long now = cloudManager.getTimeSource().getTimeNs();
+    Map<String, Double> hotNodes = new HashMap<>();
+    Map<String, Double> coldNodes = new HashMap<>();
     // check for exceeded rates and filter out those with less than waitFor from previous events
-    Map<String, Double> hotNodes = nodeRates.entrySet().stream()
+    nodeRates.entrySet().stream()
         .filter(entry -> node.equals(Policy.ANY) || node.equals(entry.getKey()))
-        .filter(entry -> waitForElapsed(entry.getKey(), now, lastNodeEvent))
-        .filter(entry -> entry.getValue().get() > rate)
-        .collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue().get()));
+        .forEach(entry -> {
+          if (entry.getValue().get() > aboveRate) {
+            if (waitForElapsed(entry.getKey(), now, lastNodeEvent)) {
+              hotNodes.put(entry.getKey(), entry.getValue().get());
+            }
+          } else if (entry.getValue().get() < belowRate) {
+            if (waitForElapsed(entry.getKey(), now, lastNodeEvent)) {
+              coldNodes.put(entry.getKey(), entry.getValue().get());
+            }
+          } else {
+            // no violation - clear waitForElapsed
+            // (violation is only valid if it persists throughout waitFor)
+            lastNodeEvent.remove(entry.getKey());
+          }
+        });
 
     Map<String, Map<String, Double>> hotShards = new HashMap<>();
+    Map<String, Map<String, Double>> coldShards = new HashMap<>();
     List<ReplicaInfo> hotReplicas = new ArrayList<>();
+    List<ReplicaInfo> coldReplicas = new ArrayList<>();
     collectionRates.forEach((coll, shardRates) -> {
       shardRates.forEach((sh, replicaRates) -> {
         double shardRate = replicaRates.stream()
             .map(r -> {
-              if (waitForElapsed(r.getCollection() + "." + r.getCore(), now, lastReplicaEvent) &&
-                  ((Double)r.getVariable(AutoScalingParams.RATE) > rate)) {
-                hotReplicas.add(r);
+              String elapsedKey = r.getCollection() + "." + r.getCore();
+              if ((Double)r.getVariable(AutoScalingParams.RATE) > aboveRate) {
+                if (waitForElapsed(elapsedKey, now, lastReplicaEvent)) {
+                  hotReplicas.add(r);
+                }
+              } else if ((Double)r.getVariable(AutoScalingParams.RATE) < belowRate) {
+                if (waitForElapsed(elapsedKey, now, lastReplicaEvent)) {
+                  coldReplicas.add(r);
+                }
+              } else {
+                // no violation - clear waitForElapsed
+                lastReplicaEvent.remove(elapsedKey);
               }
               return r;
             })
             .mapToDouble(r -> (Double)r.getVariable(AutoScalingParams.RATE)).sum();
-        if (waitForElapsed(coll + "." + sh, now, lastShardEvent) &&
-            (shardRate > rate) &&
-            (collection.equals(Policy.ANY) || collection.equals(coll)) &&
+        String elapsedKey = coll + "." + sh;
+        if ((collections.isEmpty() || collections.contains(coll)) &&
             (shard.equals(Policy.ANY) || shard.equals(sh))) {
-          hotShards.computeIfAbsent(coll, s -> new HashMap<>()).put(sh, shardRate);
+          if (shardRate > aboveRate) {
+            if (waitForElapsed(elapsedKey, now, lastShardEvent)) {
+              hotShards.computeIfAbsent(coll, s -> new HashMap<>()).put(sh, shardRate);
+            }
+          } else if (shardRate < belowRate) {
+            if (waitForElapsed(elapsedKey, now, lastShardEvent)) {
+              coldShards.computeIfAbsent(coll, s -> new HashMap<>()).put(sh, shardRate);
+            }
+          } else {
+            // no violation - clear waitForElapsed
+            lastShardEvent.remove(elapsedKey);
+          }
         }
       });
     });
 
     Map<String, Double> hotCollections = new HashMap<>();
+    Map<String, Double> coldCollections = new HashMap<>();
     collectionRates.forEach((coll, shardRates) -> {
       double total = shardRates.entrySet().stream()
           .mapToDouble(e -> e.getValue().stream()
               .mapToDouble(r -> (Double)r.getVariable(AutoScalingParams.RATE)).sum()).sum();
-      if (waitForElapsed(coll, now, lastCollectionEvent) &&
-          (total > rate) &&
-          (collection.equals(Policy.ANY) || collection.equals(coll))) {
-        hotCollections.put(coll, total);
+      if (collections.isEmpty() || collections.contains(coll)) {
+        if (total > aboveRate) {
+          if (waitForElapsed(coll, now, lastCollectionEvent)) {
+            hotCollections.put(coll, total);
+          }
+        } else if (total < belowRate) {
+          if (waitForElapsed(coll, now, lastCollectionEvent)) {
+            coldCollections.put(coll, total);
+          }
+        } else {
+          // no violation - clear waitForElapsed
+          lastCollectionEvent.remove(coll);
+        }
       }
     });
 
-    if (hotCollections.isEmpty() && hotShards.isEmpty() && hotReplicas.isEmpty() && hotNodes.isEmpty()) {
+    if (hotCollections.isEmpty() &&
+        hotShards.isEmpty() &&
+        hotReplicas.isEmpty() &&
+        hotNodes.isEmpty() &&
+        coldCollections.isEmpty() &&
+        coldShards.isEmpty() &&
+        coldReplicas.isEmpty() &&
+        coldNodes.isEmpty()) {
       return;
     }
 
@@ -246,6 +463,12 @@ public class SearchRateTrigger extends TriggerBase {
         eventTime.set(time);
       }
     });
+    coldCollections.forEach((c, r) -> {
+      long time = lastCollectionEvent.get(c);
+      if (eventTime.get() > time) {
+        eventTime.set(time);
+      }
+    });
     hotShards.forEach((c, shards) -> {
       shards.forEach((s, r) -> {
         long time = lastShardEvent.get(c + "." + s);
@@ -254,27 +477,83 @@ public class SearchRateTrigger extends TriggerBase {
         }
       });
     });
+    coldShards.forEach((c, shards) -> {
+      shards.forEach((s, r) -> {
+        long time = lastShardEvent.get(c + "." + s);
+        if (eventTime.get() > time) {
+          eventTime.set(time);
+        }
+      });
+    });
     hotReplicas.forEach(r -> {
       long time = lastReplicaEvent.get(r.getCollection() + "." + r.getCore());
       if (eventTime.get() > time) {
         eventTime.set(time);
       }
     });
+    coldReplicas.forEach(r -> {
+      long time = lastReplicaEvent.get(r.getCollection() + "." + r.getCore());
+      if (eventTime.get() > time) {
+        eventTime.set(time);
+      }
+    });
     hotNodes.forEach((n, r) -> {
       long time = lastNodeEvent.get(n);
       if (eventTime.get() > time) {
         eventTime.set(time);
       }
     });
+    coldNodes.forEach((n, r) -> {
+      long time = lastNodeEvent.get(n);
+      if (eventTime.get() > time) {
+        eventTime.set(time);
+      }
+    });
+
+    final List<TriggerEvent.Op> ops = new ArrayList<>();
+
+    calculateHotOps(ops, searchableReplicationFactors, hotNodes, hotCollections, hotShards, hotReplicas);
+    calculateColdOps(ops, clusterState, searchableReplicationFactors, coldNodes, coldCollections, coldShards, coldReplicas);
+
+    if (ops.isEmpty()) {
+      return;
+    }
 
+    if (processor.process(new SearchRateEvent(getName(), eventTime.get(), ops,
+        hotNodes, hotCollections, hotShards, hotReplicas,
+        coldNodes, coldCollections, coldShards, coldReplicas))) {
+      // update lastEvent times
+      hotNodes.keySet().forEach(node -> lastNodeEvent.put(node, now));
+      coldNodes.keySet().forEach(node -> lastNodeEvent.put(node, now));
+      hotCollections.keySet().forEach(coll -> lastCollectionEvent.put(coll, now));
+      coldCollections.keySet().forEach(coll -> lastCollectionEvent.put(coll, now));
+      hotShards.entrySet().forEach(e -> e.getValue()
+          .forEach((sh, rate) -> lastShardEvent.put(e.getKey() + "." + sh, now)));
+      coldShards.entrySet().forEach(e -> e.getValue()
+          .forEach((sh, rate) -> lastShardEvent.put(e.getKey() + "." + sh, now)));
+      hotReplicas.forEach(r -> lastReplicaEvent.put(r.getCollection() + "." + r.getCore(), now));
+      coldReplicas.forEach(r -> lastReplicaEvent.put(r.getCollection() + "." + r.getCore(), now));
+    }
+  }
+
+  private void calculateHotOps(List<TriggerEvent.Op> ops,
+                               Map<String, Map<String, AtomicInteger>> searchableReplicationFactors,
+                               Map<String, Double> hotNodes,
+                               Map<String, Double> hotCollections,
+                               Map<String, Map<String, Double>> hotShards,
+                               List<ReplicaInfo> hotReplicas) {
     // calculate the number of replicas to add to each hot shard, based on how much the rate was
     // exceeded - but within limits.
-    final List<TriggerEvent.Op> ops = new ArrayList<>();
-    if (hotShards.isEmpty() && hotCollections.isEmpty() && hotReplicas.isEmpty()) {
+
+    // first resolve a situation when only a node is hot but no collection / shard / replica is hot
+    // TODO: eventually we may want to commission a new node
+    if (!hotNodes.isEmpty() && hotShards.isEmpty() && hotCollections.isEmpty() && hotReplicas.isEmpty()) {
       // move replicas around
-      hotNodes.forEach((n, r) -> {
-        ops.add(new TriggerEvent.Op(CollectionParams.CollectionAction.MOVEREPLICA, Suggester.Hint.SRC_NODE, n));
-      });
+      if (aboveNodeOp != null) {
+        hotNodes.forEach((n, r) -> {
+          ops.add(new TriggerEvent.Op(aboveNodeOp, Suggester.Hint.SRC_NODE, n));
+        });
+      }
     } else {
       // add replicas
       Map<String, Map<String, List<Pair<String, String>>>> hints = new HashMap<>();
@@ -283,7 +562,7 @@ public class SearchRateTrigger extends TriggerBase {
         List<Pair<String, String>> perShard = hints
             .computeIfAbsent(coll, c -> new HashMap<>())
             .computeIfAbsent(s, sh -> new ArrayList<>());
-        addHints(coll, s, r, replicationFactors.get(coll), perShard);
+        addReplicaHints(coll, s, r, searchableReplicationFactors.get(coll).get(s).get(), perShard);
       }));
       hotReplicas.forEach(ri -> {
         double r = (Double)ri.getVariable(AutoScalingParams.RATE);
@@ -292,38 +571,120 @@ public class SearchRateTrigger extends TriggerBase {
             .computeIfAbsent(ri.getCollection(), c -> new HashMap<>())
             .computeIfAbsent(ri.getShard(), sh -> new ArrayList<>());
         if (perShard.isEmpty()) {
-          addHints(ri.getCollection(), ri.getShard(), r, replicationFactors.get(ri.getCollection()), perShard);
+          addReplicaHints(ri.getCollection(), ri.getShard(), r, searchableReplicationFactors.get(ri.getCollection()).get(ri.getShard()).get(), perShard);
         }
       });
 
       hints.values().forEach(m -> m.values().forEach(lst -> lst.forEach(p -> {
-        ops.add(new TriggerEvent.Op(CollectionParams.CollectionAction.ADDREPLICA, Suggester.Hint.COLL_SHARD, p));
+        ops.add(new TriggerEvent.Op(aboveOp, Suggester.Hint.COLL_SHARD, p));
       })));
     }
 
-    if (processor.process(new SearchRateEvent(getName(), eventTime.get(), ops, hotNodes, hotCollections, hotShards, hotReplicas))) {
-      // update lastEvent times
-      hotNodes.keySet().forEach(node -> lastNodeEvent.put(node, now));
-      hotCollections.keySet().forEach(coll -> lastCollectionEvent.put(coll, now));
-      hotShards.entrySet().forEach(e -> e.getValue()
-          .forEach((sh, rate) -> lastShardEvent.put(e.getKey() + "." + sh, now)));
-      hotReplicas.forEach(r -> lastReplicaEvent.put(r.getCollection() + "." + r.getCore(), now));
-    }
   }
 
-  private void addHints(String collection, String shard, double r, int replicationFactor, List<Pair<String, String>> hints) {
-    int numReplicas = (int)Math.round((r - rate) / (double) replicationFactor);
+  /**
+   * This method implements a primitive form of proportional controller with a limiter.
+   */
+  private void addReplicaHints(String collection, String shard, double r, int replicationFactor, List<Pair<String, String>> hints) {
+    int numReplicas = (int)Math.round((r - aboveRate) / (double) replicationFactor);
+    // in one event add at least 1 replica
     if (numReplicas < 1) {
       numReplicas = 1;
     }
-    if (numReplicas > 3) {
-      numReplicas = 3;
+    // ... and at most maxOps replicas
+    if (numReplicas > maxOps) {
+      numReplicas = maxOps;
     }
     for (int i = 0; i < numReplicas; i++) {
       hints.add(new Pair(collection, shard));
     }
   }
 
+  private void calculateColdOps(List<TriggerEvent.Op> ops,
+                                ClusterState clusterState,
+                                Map<String, Map<String, AtomicInteger>> searchableReplicationFactors,
+                                Map<String, Double> coldNodes,
+                                Map<String, Double> coldCollections,
+                                Map<String, Map<String, Double>> coldShards,
+                                List<ReplicaInfo> coldReplicas) {
+    // COLD COLLECTIONS
+    // Probably can't do anything reasonable about whole cold collections
+    // because they may be needed even if not used.
+
+    // COLD SHARDS:
+    // Cold shards mean that there are too many replicas per shard - but it also
+    // means that all replicas in these shards are cold too, so we can simply
+    // address this by deleting cold replicas
+
+    // COLD REPLICAS:
+    // Remove cold replicas but only when there's at least a minimum number of searchable
+    // replicas still available (additional non-searchable replicas may exist, too)
+    // NOTE: do this before adding ops for DELETENODE because we don't want to attempt
+    // deleting replicas that have been already moved elsewhere
+    Map<String, Map<String, List<ReplicaInfo>>> byCollectionByShard = new HashMap<>();
+    coldReplicas.forEach(ri -> {
+      byCollectionByShard.computeIfAbsent(ri.getCollection(), c -> new HashMap<>())
+          .computeIfAbsent(ri.getShard(), s -> new ArrayList<>())
+          .add(ri);
+    });
+    byCollectionByShard.forEach((coll, shards) -> {
+      shards.forEach((shard, replicas) -> {
+        // only delete if there's at least minRF searchable replicas left
+        int rf = searchableReplicationFactors.get(coll).get(shard).get();
+        // we only really need a leader and we may be allowed to remove other replicas
+        int minRF = 1;
+        // but check the official RF and don't go below that
+        Integer RF = clusterState.getCollection(coll).getReplicationFactor();
+        if (RF != null) {
+          minRF = RF;
+        }
+        // unless minReplicas is set explicitly
+        if (minReplicas != null) {
+          minRF = minReplicas;
+        }
+        if (minRF < 1) {
+          minRF = 1;
+        }
+        if (rf > minRF) {
+          // delete at most maxOps replicas at a time
+          AtomicInteger limit = new AtomicInteger(Math.min(maxOps, rf - minRF));
+          replicas.forEach(ri -> {
+            if (limit.get() == 0) {
+              return;
+            }
+            // don't delete a leader
+            if (ri.getBool(ZkStateReader.LEADER_PROP, false)) {
+              return;
+            }
+            TriggerEvent.Op op = new TriggerEvent.Op(belowOp,
+                Suggester.Hint.COLL_SHARD, new Pair<>(ri.getCollection(), ri.getShard()));
+            op.addHint(Suggester.Hint.REPLICA, ri.getName());
+            ops.add(op);
+            limit.decrementAndGet();
+          });
+        }
+      });
+    });
+
+    // COLD NODES:
+    // Unlike the case of hot nodes, if a node is cold then any monitored
+    // collections / shards / replicas located on that node are cold, too.
+    // HOWEVER, we check only non-pull replicas and only from selected collections / shards,
+    // so deleting a cold node is dangerous because it may interfere with these
+    // non-monitored resources - this is the reason the default belowNodeOp is null / ignored.
+    //
+    // Also, note that due to the way activity is measured only nodes that contain any
+    // monitored resources are considered - there may be cold nodes in the cluster that don't
+    // belong to the monitored collections and they will be ignored.
+    if (belowNodeOp != null) {
+      coldNodes.forEach((node, rate) -> {
+        ops.add(new TriggerEvent.Op(belowNodeOp, Suggester.Hint.SRC_NODE, node));
+      });
+    }
+
+
+  }
+
   private boolean waitForElapsed(String name, long now, Map<String, Long> lastEventMap) {
     Long lastTime = lastEventMap.computeIfAbsent(name, s -> now);
     long elapsed = TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS);
@@ -335,15 +696,25 @@ public class SearchRateTrigger extends TriggerBase {
   }
 
   public static class SearchRateEvent extends TriggerEvent {
-    public SearchRateEvent(String source, long eventTime, List<Op> ops, Map<String, Double> hotNodes,
+    public SearchRateEvent(String source, long eventTime, List<Op> ops,
+                           Map<String, Double> hotNodes,
                            Map<String, Double> hotCollections,
-                           Map<String, Map<String, Double>> hotShards, List<ReplicaInfo> hotReplicas) {
+                           Map<String, Map<String, Double>> hotShards,
+                           List<ReplicaInfo> hotReplicas,
+                           Map<String, Double> coldNodes,
+                           Map<String, Double> coldCollections,
+                           Map<String, Map<String, Double>> coldShards,
+                           List<ReplicaInfo> coldReplicas) {
       super(TriggerEventType.SEARCHRATE, source, eventTime, null);
       properties.put(TriggerEvent.REQUESTED_OPS, ops);
-      properties.put(AutoScalingParams.COLLECTION, hotCollections);
-      properties.put(AutoScalingParams.SHARD, hotShards);
-      properties.put(AutoScalingParams.REPLICA, hotReplicas);
-      properties.put(AutoScalingParams.NODE, hotNodes);
+      properties.put(HOT_NODES, hotNodes);
+      properties.put(HOT_COLLECTIONS, hotCollections);
+      properties.put(HOT_SHARDS, hotShards);
+      properties.put(HOT_REPLICAS, hotReplicas);
+      properties.put(COLD_NODES, coldNodes);
+      properties.put(COLD_COLLECTIONS, coldCollections);
+      properties.put(COLD_SHARDS, coldShards);
+      properties.put(COLD_REPLICAS, coldReplicas);
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9541d7e3/solr/core/src/test/org/apache/solr/cloud/CloudTestUtils.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/CloudTestUtils.java b/solr/core/src/test/org/apache/solr/cloud/CloudTestUtils.java
index 5590252..768cd91 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CloudTestUtils.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CloudTestUtils.java
@@ -88,9 +88,11 @@ public class CloudTestUtils {
                                   final CollectionStatePredicate predicate) throws InterruptedException, TimeoutException, IOException {
     TimeOut timeout = new TimeOut(wait, unit, cloudManager.getTimeSource());
     long timeWarn = timeout.timeLeft(TimeUnit.MILLISECONDS) / 4;
+    ClusterState state = null;
+    DocCollection coll = null;
     while (!timeout.hasTimedOut()) {
-      ClusterState state = cloudManager.getClusterStateProvider().getClusterState();
-      DocCollection coll = state.getCollectionOrNull(collection);
+      state = cloudManager.getClusterStateProvider().getClusterState();
+      coll = state.getCollectionOrNull(collection);
       // due to the way we manage collections in SimClusterStateProvider a null here
       // can mean that a collection is still being created but has no replicas
       if (coll == null) { // does not yet exist?
@@ -106,7 +108,7 @@ public class CloudTestUtils {
         log.trace("-- still not matching predicate: {}", state);
       }
     }
-    throw new TimeoutException();
+    throw new TimeoutException("last state: " + coll);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9541d7e3/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoScalingHandlerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoScalingHandlerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoScalingHandlerTest.java
index b98ee70..2aec88e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoScalingHandlerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoScalingHandlerTest.java
@@ -475,7 +475,7 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
         "'event' : 'searchRate'," +
         "'waitFor' : '10m'," +
         "'enabled' : true," +
-        "'rate': 'foo'," +
+        "'aboveRate': 'foo'," +
         "'actions' : [" +
         "{" +
         "'name' : 'compute_plan'," +
@@ -489,7 +489,7 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
     } catch (HttpSolrClient.RemoteSolrException e) {
       // expected
       assertTrue(String.valueOf(getObjectByPath(((HttpSolrClient.RemoteExecutionException) e).getMetaData(),
-          false, "error/details[0]/errorMessages[0]")).contains("rate=Invalid 'rate' configuration value: 'foo'"));
+          false, "error/details[0]/errorMessages[0]")).contains("aboveRate=Invalid configuration value: 'foo'"));
     }
 
     // unknown trigger action properties

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9541d7e3/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerIntegrationTest.java
index 796670a..098d0a5 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerIntegrationTest.java
@@ -22,10 +22,9 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.util.concurrent.AtomicDouble;
 import org.apache.lucene.util.LuceneTestCase;
@@ -37,12 +36,20 @@ import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
 import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.CloudTestUtils;
 import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CollectionParams;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.SolrResourceLoader;
 import org.apache.solr.util.LogLevel;
+import org.apache.zookeeper.data.Stat;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -51,20 +58,23 @@ import org.slf4j.LoggerFactory;
 import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
 import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.WAIT_FOR_DELTA_NANOS;
 import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.timeSource;
+import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
 
 /**
  * Integration test for {@link SearchRateTrigger}
  */
 @LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
-@LuceneTestCase.BadApple(bugUrl = "https://issues.apache.org/jira/browse/SOLR-12028")
+@LuceneTestCase.Slow
 public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private static CountDownLatch triggerFiredLatch = new CountDownLatch(1);
   private static CountDownLatch listenerCreated = new CountDownLatch(1);
-  private static int waitForSeconds = 1;
-  private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
   private static Map<String, List<CapturedEvent>> listenerEvents = new HashMap<>();
+  private static CountDownLatch finished = new CountDownLatch(1);
+  private static CountDownLatch started = new CountDownLatch(1);
+  private static SolrCloudManager cloudManager;
+
+  private int waitForSeconds;
 
   @BeforeClass
   public static void setupCluster() throws Exception {
@@ -79,74 +89,156 @@ public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
     SolrClient solrClient = cluster.getSolrClient();
     NamedList<Object> response = solrClient.request(req);
     assertEquals(response.get("result").toString(), "success");
+    cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
+  }
+
+  @Before
+  public void beforeTest() throws Exception {
+    cluster.deleteAllCollections();
+    // clear any persisted auto scaling configuration
+    Stat stat = zkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), true);
+    log.info(SOLR_AUTOSCALING_CONF_PATH + " reset, new znode version {}", stat.getVersion());
+    timeSource.sleep(5000);
+    deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH);
+    deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH);
+    deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
+    deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
+
+    finished = new CountDownLatch(1);
+    started = new CountDownLatch(1);
+    listenerEvents = new HashMap<>();
+    waitForSeconds = 3 + random().nextInt(5);
+  }
+
+  private void deleteChildrenRecursively(String path) throws Exception {
+    cloudManager.getDistribStateManager().removeRecursively(path, true, false);
   }
 
   @Test
-  public void testSearchRate() throws Exception {
+  public void testAboveSearchRate() throws Exception {
     CloudSolrClient solrClient = cluster.getSolrClient();
-    String COLL1 = "collection1";
+    String COLL1 = "aboveRate_collection";
     CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
         "conf", 1, 2);
     create.process(solrClient);
+
+    CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS,
+        CloudTestUtils.clusterShape(1, 2));
+
+    // the trigger is initially disabled so that we have the time to set up listeners
+    // and generate the traffic
     String setTriggerCommand = "{" +
         "'set-trigger' : {" +
-        "'name' : 'search_rate_trigger'," +
+        "'name' : 'search_rate_trigger1'," +
         "'event' : 'searchRate'," +
         "'waitFor' : '" + waitForSeconds + "s'," +
-        "'enabled' : true," +
-        "'rate' : 1.0," +
+        "'enabled' : false," +
+        "'collections' : '" + COLL1 + "'," +
+        "'aboveRate' : 1.0," +
+        "'belowRate' : 0.1," +
         "'actions' : [" +
         "{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
-        "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
-        "{'name':'test','class':'" + TestSearchRateAction.class.getName() + "'}" +
+        "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}" +
         "]" +
         "}}";
     SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
     NamedList<Object> response = solrClient.request(req);
     assertEquals(response.get("result").toString(), "success");
 
-    String setListenerCommand1 = "{" +
+    String setListenerCommand = "{" +
+        "'set-listener' : " +
+        "{" +
+        "'name' : 'started'," +
+        "'trigger' : 'search_rate_trigger1'," +
+        "'stage' : ['STARTED']," +
+        "'class' : '" + StartedProcessingListener.class.getName() + "'" +
+        "}" +
+        "}";
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    setListenerCommand = "{" +
         "'set-listener' : " +
         "{" +
         "'name' : 'srt'," +
-        "'trigger' : 'search_rate_trigger'," +
+        "'trigger' : 'search_rate_trigger1'," +
         "'stage' : ['FAILED','SUCCEEDED']," +
-        "'afterAction': ['compute', 'execute', 'test']," +
-        "'class' : '" + TestTriggerListener.class.getName() + "'" +
+        "'afterAction': ['compute', 'execute']," +
+        "'class' : '" + CapturingTriggerListener.class.getName() + "'" +
+        "}" +
+        "}";
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    setListenerCommand = "{" +
+        "'set-listener' : " +
+        "{" +
+        "'name' : 'finished'," +
+        "'trigger' : 'search_rate_trigger1'," +
+        "'stage' : ['SUCCEEDED']," +
+        "'class' : '" + FinishedProcessingListener.class.getName() + "'" +
         "}" +
         "}";
-    req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand1);
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
     response = solrClient.request(req);
     assertEquals(response.get("result").toString(), "success");
+
     SolrParams query = params(CommonParams.Q, "*:*");
     for (int i = 0; i < 500; i++) {
       solrClient.query(COLL1, query);
     }
-    boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+
+    // enable the trigger
+    String resumeTriggerCommand = "{" +
+        "'resume-trigger' : {" +
+        "'name' : 'search_rate_trigger1'" +
+        "}" +
+        "}";
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, resumeTriggerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
+
+    boolean await = started.await(20, TimeUnit.SECONDS);
     assertTrue("The trigger did not fire at all", await);
-    // wait for listener to capture the SUCCEEDED stage
-    Thread.sleep(5000);
+
+    await = finished.await(60, TimeUnit.SECONDS);
+    assertTrue("The trigger did not finish processing", await);
+
+    // suspend the trigger
+    String suspendTriggerCommand = "{" +
+        "'suspend-trigger' : {" +
+        "'name' : 'search_rate_trigger1'" +
+        "}" +
+        "}";
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    timeSource.sleep(5000);
+
     List<CapturedEvent> events = listenerEvents.get("srt");
-    assertEquals(listenerEvents.toString(), 4, events.size());
+    assertEquals(listenerEvents.toString(), 3, events.size());
     assertEquals("AFTER_ACTION", events.get(0).stage.toString());
     assertEquals("compute", events.get(0).actionName);
     assertEquals("AFTER_ACTION", events.get(1).stage.toString());
     assertEquals("execute", events.get(1).actionName);
-    assertEquals("AFTER_ACTION", events.get(2).stage.toString());
-    assertEquals("test", events.get(2).actionName);
-    assertEquals("SUCCEEDED", events.get(3).stage.toString());
-    assertNull(events.get(3).actionName);
+    assertEquals("SUCCEEDED", events.get(2).stage.toString());
+    assertNull(events.get(2).actionName);
 
     CapturedEvent ev = events.get(0);
     long now = timeSource.getTimeNs();
     // verify waitFor
     assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
-    Map<String, Double> nodeRates = (Map<String, Double>) ev.event.getProperties().get("node");
+    Map<String, Double> nodeRates = (Map<String, Double>) ev.event.getProperties().get(SearchRateTrigger.HOT_NODES);
     assertNotNull("nodeRates", nodeRates);
     assertTrue(nodeRates.toString(), nodeRates.size() > 0);
     AtomicDouble totalNodeRate = new AtomicDouble();
     nodeRates.forEach((n, r) -> totalNodeRate.addAndGet(r));
-    List<ReplicaInfo> replicaRates = (List<ReplicaInfo>) ev.event.getProperties().get("replica");
+    List<ReplicaInfo> replicaRates = (List<ReplicaInfo>) ev.event.getProperties().get(SearchRateTrigger.HOT_REPLICAS);
     assertNotNull("replicaRates", replicaRates);
     assertTrue(replicaRates.toString(), replicaRates.size() > 0);
     AtomicDouble totalReplicaRate = new AtomicDouble();
@@ -154,7 +246,7 @@ public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
       assertTrue(r.toString(), r.getVariable("rate") != null);
       totalReplicaRate.addAndGet((Double) r.getVariable("rate"));
     });
-    Map<String, Object> shardRates = (Map<String, Object>) ev.event.getProperties().get("shard");
+    Map<String, Object> shardRates = (Map<String, Object>) ev.event.getProperties().get(SearchRateTrigger.HOT_SHARDS);
     assertNotNull("shardRates", shardRates);
     assertEquals(shardRates.toString(), 1, shardRates.size());
     shardRates = (Map<String, Object>) shardRates.get(COLL1);
@@ -162,7 +254,7 @@ public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
     assertEquals(shardRates.toString(), 1, shardRates.size());
     AtomicDouble totalShardRate = new AtomicDouble();
     shardRates.forEach((s, r) -> totalShardRate.addAndGet((Double) r));
-    Map<String, Double> collectionRates = (Map<String, Double>) ev.event.getProperties().get("collection");
+    Map<String, Double> collectionRates = (Map<String, Double>) ev.event.getProperties().get(SearchRateTrigger.HOT_COLLECTIONS);
     assertNotNull("collectionRates", collectionRates);
     assertEquals(collectionRates.toString(), 1, collectionRates.size());
     Double collectionRate = collectionRates.get(COLL1);
@@ -181,27 +273,414 @@ public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
     }
   }
 
-  public static class TestSearchRateAction extends TriggerActionBase {
+  @Test
+  public void testBelowSearchRate() throws Exception {
+    CloudSolrClient solrClient = cluster.getSolrClient();
+    String COLL1 = "belowRate_collection";
+    // replicationFactor == 2
+    CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
+        "conf", 1, 2);
+    create.process(solrClient);
+    CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS,
+        CloudTestUtils.clusterShape(1, 2));
+
+    // add a couple of spare replicas above RF. Use different types.
+    // these additional replicas will be placed on other nodes in the cluster
+    solrClient.request(CollectionAdminRequest.addReplicaToShard(COLL1, "shard1", Replica.Type.NRT));
+    solrClient.request(CollectionAdminRequest.addReplicaToShard(COLL1, "shard1", Replica.Type.TLOG));
+    solrClient.request(CollectionAdminRequest.addReplicaToShard(COLL1, "shard1", Replica.Type.PULL));
 
-    @Override
-    public void process(TriggerEvent event, ActionContext context) throws Exception {
-      try {
-        events.add(event);
-        long currentTimeNanos = timeSource.getTimeNs();
-        long eventTimeNanos = event.getEventTime();
-        long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
-        if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
-          fail(event.source + " was fired before the configured waitFor period");
+    CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS,
+        CloudTestUtils.clusterShape(1, 5));
+
+    String setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'search_rate_trigger2'," +
+        "'event' : 'searchRate'," +
+        "'waitFor' : '" + waitForSeconds + "s'," +
+        "'enabled' : false," +
+        "'collections' : '" + COLL1 + "'," +
+        "'aboveRate' : 1.0," +
+        "'belowRate' : 0.1," +
+        // do nothing but generate an op
+        "'belowNodeOp' : 'none'," +
+        "'actions' : [" +
+        "{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
+        "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}" +
+        "]" +
+        "}}";
+    SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+    NamedList<Object> response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    String setListenerCommand = "{" +
+        "'set-listener' : " +
+        "{" +
+        "'name' : 'started'," +
+        "'trigger' : 'search_rate_trigger2'," +
+        "'stage' : ['STARTED']," +
+        "'class' : '" + StartedProcessingListener.class.getName() + "'" +
+        "}" +
+        "}";
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    setListenerCommand = "{" +
+        "'set-listener' : " +
+        "{" +
+        "'name' : 'srt'," +
+        "'trigger' : 'search_rate_trigger2'," +
+        "'stage' : ['FAILED','SUCCEEDED']," +
+        "'afterAction': ['compute', 'execute']," +
+        "'class' : '" + CapturingTriggerListener.class.getName() + "'" +
+        "}" +
+        "}";
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    setListenerCommand = "{" +
+        "'set-listener' : " +
+        "{" +
+        "'name' : 'finished'," +
+        "'trigger' : 'search_rate_trigger2'," +
+        "'stage' : ['SUCCEEDED']," +
+        "'class' : '" + FinishedProcessingListener.class.getName() + "'" +
+        "}" +
+        "}";
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
+
+    // enable the trigger
+    String resumeTriggerCommand = "{" +
+        "'resume-trigger' : {" +
+        "'name' : 'search_rate_trigger2'" +
+        "}" +
+        "}";
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, resumeTriggerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
+
+    boolean await = started.await(20, TimeUnit.SECONDS);
+    assertTrue("The trigger did not fire at all", await);
+    await = finished.await(60, TimeUnit.SECONDS);
+    assertTrue("The trigger did not finish processing", await);
+
+    // suspend the trigger
+    String suspendTriggerCommand = "{" +
+        "'suspend-trigger' : {" +
+        "'name' : 'search_rate_trigger2'" +
+        "}" +
+        "}";
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    timeSource.sleep(5000);
+
+    List<CapturedEvent> events = listenerEvents.get("srt");
+    assertEquals(events.toString(), 3, events.size());
+    CapturedEvent ev = events.get(0);
+    assertEquals(ev.toString(), "compute", ev.actionName);
+    List<TriggerEvent.Op> ops = (List<TriggerEvent.Op>)ev.event.getProperty(TriggerEvent.REQUESTED_OPS);
+    assertNotNull("there should be some requestedOps: " + ev.toString(), ops);
+    // 4 cold nodes, 3 cold replicas
+    assertEquals(ops.toString(), 7, ops.size());
+    AtomicInteger coldNodes = new AtomicInteger();
+    AtomicInteger coldReplicas = new AtomicInteger();
+    ops.forEach(op -> {
+      if (op.getAction().equals(CollectionParams.CollectionAction.NONE)) {
+        coldNodes.incrementAndGet();
+      } else if (op.getAction().equals(CollectionParams.CollectionAction.DELETEREPLICA)) {
+        coldReplicas.incrementAndGet();
+      } else {
+        fail("unexpected op: " + op);
+      }
+    });
+    assertEquals("cold nodes", 4, coldNodes.get());
+    assertEquals("cold replicas", 3, coldReplicas.get());
+
+    // now the collection should be down to RF = 2
+    CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS,
+        CloudTestUtils.clusterShape(1, 2));
+
+    listenerEvents.clear();
+    finished = new CountDownLatch(1);
+    started = new CountDownLatch(1);
+
+    // resume trigger
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, resumeTriggerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    // there should be only coldNode ops now, and no coldReplica ops since searchable RF == collection RF
+    timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
+
+    await = started.await(20, TimeUnit.SECONDS);
+    assertTrue("The trigger did not fire at all", await);
+    await = finished.await(60, TimeUnit.SECONDS);
+    assertTrue("The trigger did not finish processing", await);
+
+    // suspend trigger
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    timeSource.sleep(5000);
+
+    events = listenerEvents.get("srt");
+    assertEquals(events.toString(), 3, events.size());
+
+    ev = events.get(0);
+    assertEquals(ev.toString(), "compute", ev.actionName);
+    ops = (List<TriggerEvent.Op>)ev.event.getProperty(TriggerEvent.REQUESTED_OPS);
+    assertNotNull("there should be some requestedOps: " + ev.toString(), ops);
+    assertEquals(ops.toString(), 1, ops.size());
+    assertEquals(ops.toString(), CollectionParams.CollectionAction.NONE, ops.get(0).getAction());
+
+    listenerEvents.clear();
+    finished = new CountDownLatch(1);
+    started = new CountDownLatch(1);
+
+    // now allow single replicas
+    setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'search_rate_trigger2'," +
+        "'event' : 'searchRate'," +
+        "'waitFor' : '" + waitForSeconds + "s'," +
+        "'enabled' : true," +
+        "'collections' : '" + COLL1 + "'," +
+        "'aboveRate' : 1.0," +
+        "'belowRate' : 0.1," +
+        "'minReplicas' : 1," +
+        "'belowNodeOp' : 'none'," +
+        "'actions' : [" +
+        "{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
+        "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}" +
+        "]" +
+        "}}";
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
+
+    await = started.await(20, TimeUnit.SECONDS);
+    assertTrue("The trigger did not fire at all", await);
+    await = finished.await(60, TimeUnit.SECONDS);
+    assertTrue("The trigger did not finish processing", await);
+
+    // suspend trigger
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    timeSource.sleep(5000);
+
+    events = listenerEvents.get("srt");
+    assertEquals(events.toString(), 3, events.size());
+
+    ev = events.get(0);
+    assertEquals(ev.toString(), "compute", ev.actionName);
+    ops = (List<TriggerEvent.Op>)ev.event.getProperty(TriggerEvent.REQUESTED_OPS);
+    assertNotNull("there should be some requestedOps: " + ev.toString(), ops);
+    assertEquals(ops.toString(), 2, ops.size());
+    AtomicInteger coldNodes2 = new AtomicInteger();
+    AtomicInteger coldReplicas2 = new AtomicInteger();
+    ops.forEach(op -> {
+      if (op.getAction().equals(CollectionParams.CollectionAction.NONE)) {
+        coldNodes2.incrementAndGet();
+      } else if (op.getAction().equals(CollectionParams.CollectionAction.DELETEREPLICA)) {
+        coldReplicas2.incrementAndGet();
+      } else {
+        fail("unexpected op: " + op);
+      }
+    });
+
+    assertEquals("coldNodes", 1, coldNodes2.get());
+    assertEquals("colReplicas", 1, coldReplicas2.get());
+
+    // now the collection should be at RF == 1, with one additional PULL replica
+    CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS,
+        CloudTestUtils.clusterShape(1, 1));
+  }
+
+  @Test
+  public void testDeleteNode() throws Exception {
+    CloudSolrClient solrClient = cluster.getSolrClient();
+    String COLL1 = "deleteNode_collection";
+    CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
+        "conf", 1, 2);
+
+    create.process(solrClient);
+    CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS,
+        CloudTestUtils.clusterShape(1, 2));
+
+    // add a couple of spare replicas above RF. Use different types to verify that only
+    // searchable replicas are considered
+    // these additional replicas will be placed on other nodes in the cluster
+    solrClient.request(CollectionAdminRequest.addReplicaToShard(COLL1, "shard1", Replica.Type.NRT));
+    solrClient.request(CollectionAdminRequest.addReplicaToShard(COLL1, "shard1", Replica.Type.TLOG));
+    solrClient.request(CollectionAdminRequest.addReplicaToShard(COLL1, "shard1", Replica.Type.PULL));
+
+    CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS,
+        CloudTestUtils.clusterShape(1, 5));
+
+    String setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'search_rate_trigger3'," +
+        "'event' : 'searchRate'," +
+        "'waitFor' : '" + waitForSeconds + "s'," +
+        "'enabled' : false," +
+        "'collections' : '" + COLL1 + "'," +
+        "'aboveRate' : 1.0," +
+        "'belowRate' : 0.1," +
+        // allow deleting all spare replicas
+        "'minReplicas' : 1," +
+        // allow requesting all deletions in one event
+        "'maxOps' : 10," +
+        // delete underutilised nodes
+        "'belowNodeOp' : 'DELETENODE'," +
+        "'actions' : [" +
+        "{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
+        "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}" +
+        "]" +
+        "}}";
+    SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+    NamedList<Object> response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    String setListenerCommand = "{" +
+        "'set-listener' : " +
+        "{" +
+        "'name' : 'started'," +
+        "'trigger' : 'search_rate_trigger3'," +
+        "'stage' : ['STARTED']," +
+        "'class' : '" + StartedProcessingListener.class.getName() + "'" +
+        "}" +
+        "}";
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    setListenerCommand = "{" +
+        "'set-listener' : " +
+        "{" +
+        "'name' : 'srt'," +
+        "'trigger' : 'search_rate_trigger3'," +
+        "'stage' : ['FAILED','SUCCEEDED']," +
+        "'afterAction': ['compute', 'execute']," +
+        "'class' : '" + CapturingTriggerListener.class.getName() + "'" +
+        "}" +
+        "}";
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    setListenerCommand = "{" +
+        "'set-listener' : " +
+        "{" +
+        "'name' : 'finished'," +
+        "'trigger' : 'search_rate_trigger3'," +
+        "'stage' : ['SUCCEEDED']," +
+        "'class' : '" + FinishedProcessingListener.class.getName() + "'" +
+        "}" +
+        "}";
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
+
+    // enable the trigger
+    String resumeTriggerCommand = "{" +
+        "'resume-trigger' : {" +
+        "'name' : 'search_rate_trigger3'" +
+        "}" +
+        "}";
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, resumeTriggerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    timeSource.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds + 1, TimeUnit.SECONDS));
+
+    boolean await = started.await(20, TimeUnit.SECONDS);
+    assertTrue("The trigger did not fire at all", await);
+    await = finished.await(90, TimeUnit.SECONDS);
+    assertTrue("The trigger did not finish processing", await);
+
+    // suspend the trigger
+    String suspendTriggerCommand = "{" +
+        "'suspend-trigger' : {" +
+        "'name' : 'search_rate_trigger3'" +
+        "}" +
+        "}";
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    timeSource.sleep(5000);
+
+    List<CapturedEvent> events = listenerEvents.get("srt");
+    assertEquals(events.toString(), 3, events.size());
+
+    CapturedEvent ev = events.get(0);
+    assertEquals(ev.toString(), "compute", ev.actionName);
+    List<TriggerEvent.Op> ops = (List<TriggerEvent.Op>)ev.event.getProperty(TriggerEvent.REQUESTED_OPS);
+    assertNotNull("there should be some requestedOps: " + ev.toString(), ops);
+    // 4 DELETEREPLICA, 4 DELETENODE
+    assertEquals(ops.toString(), 8, ops.size());
+    AtomicInteger replicas = new AtomicInteger();
+    AtomicInteger nodes = new AtomicInteger();
+    ops.forEach(op -> {
+      if (op.getAction().equals(CollectionParams.CollectionAction.DELETEREPLICA)) {
+        replicas.incrementAndGet();
+      } else if (op.getAction().equals(CollectionParams.CollectionAction.DELETENODE)) {
+        nodes.incrementAndGet();
+      } else {
+        fail("unexpected op: " + op);
+      }
+    });
+    assertEquals(ops.toString(), 4, replicas.get());
+    assertEquals(ops.toString(), 4, nodes.get());
+    // check status
+    ev = events.get(1);
+    assertEquals(ev.toString(), "execute", ev.actionName);
+    List<NamedList<Object>> responses = (List<NamedList<Object>>)ev.context.get("properties.responses");
+    assertNotNull(ev.toString(), responses);
+    assertEquals(responses.toString(), 8, responses.size());
+    replicas.set(0);
+    nodes.set(0);
+    responses.forEach(m -> {
+      if (m.get("success") != null) {
+        replicas.incrementAndGet();
+      } else if (m.get("status") != null) {
+        NamedList<Object> status = (NamedList<Object>)m.get("status");
+        if ("completed".equals(status.get("state"))) {
+          nodes.incrementAndGet();
+        } else {
+          fail("unexpected DELETENODE status: " + m);
         }
-        triggerFiredLatch.countDown();
-      } catch (Throwable t) {
-        log.debug("--throwable", t);
-        throw t;
+      } else {
+        fail("unexpected status: " + m);
       }
-    }
+    });
+
+    assertEquals(responses.toString(), 4, replicas.get());
+    assertEquals(responses.toString(), 4, nodes.get());
+
+    // we are left with one searchable replica
+    CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS,
+        CloudTestUtils.clusterShape(1, 1));
   }
 
-  public static class TestTriggerListener extends TriggerListenerBase {
+  public static class CapturingTriggerListener extends TriggerListenerBase {
     @Override
     public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) throws TriggerValidationException {
       super.configure(loader, cloudManager, config);
@@ -212,7 +691,26 @@ public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
     public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
                                      ActionContext context, Throwable error, String message) {
       List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
-      lst.add(new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message));
+      CapturedEvent ev = new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message);
+      log.info("=======> " + ev);
+      lst.add(ev);
+    }
+  }
+
+  public static class StartedProcessingListener extends TriggerListenerBase {
+
+    @Override
+    public void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName, ActionContext context, Throwable error, String message) throws Exception {
+      started.countDown();
     }
   }
+
+  public static class FinishedProcessingListener extends TriggerListenerBase {
+
+    @Override
+    public void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName, ActionContext context, Throwable error, String message) throws Exception {
+      finished.countDown();
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9541d7e3/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerTest.java
index 1c72649..7969869 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerTest.java
@@ -18,25 +18,37 @@ package org.apache.solr.cloud.autoscaling;
 
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.AtomicDouble;
+import org.apache.solr.client.solrj.cloud.NodeStateProvider;
 import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
 import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.impl.SolrClientCloudManager;
+import org.apache.solr.client.solrj.impl.SolrClientNodeStateProvider;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.CloudTestUtils;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.cloud.ZkDistributedQueueFactory;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.params.AutoScalingParams;
+import org.apache.solr.common.params.CollectionParams;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.SolrResourceLoader;
+import org.apache.solr.util.TimeOut;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -59,6 +71,23 @@ public class SearchRateTriggerTest extends SolrCloudTestCase {
     configureCluster(4)
         .addConfig("conf", configset("cloud-minimal"))
         .configure();
+  }
+
+  @Before
+  public void removeCollections() throws Exception {
+    cluster.deleteAllCollections();
+    if (cluster.getJettySolrRunners().size() < 4) {
+      cluster.startJettySolrRunner();
+    }
+  }
+
+  @Test
+  public void testTrigger() throws Exception {
+    SolrZkClient zkClient = cluster.getSolrClient().getZkStateReader().getZkClient();
+    SolrResourceLoader loader = cluster.getJettySolrRunner(0).getCoreContainer().getResourceLoader();
+    CoreContainer container = cluster.getJettySolrRunner(0).getCoreContainer();
+    SolrCloudManager cloudManager = new SolrClientCloudManager(new ZkDistributedQueueFactory(zkClient), cluster.getSolrClient());
+
     CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
         "conf", 2, 2);
     CloudSolrClient solrClient = cluster.getSolrClient();
@@ -68,20 +97,15 @@ public class SearchRateTriggerTest extends SolrCloudTestCase {
         "conf", 2, 2);
     create.setMaxShardsPerNode(1);
     create.process(solrClient);
-  }
 
-  @Test
-  public void testTrigger() throws Exception {
+    CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS, clusterShape(2, 2));
+    CloudTestUtils.waitForState(cloudManager, COLL2, 60, TimeUnit.SECONDS, clusterShape(2, 2));
+
     double rate = 1.0;
-    SolrZkClient zkClient = cluster.getSolrClient().getZkStateReader().getZkClient();
-    SolrResourceLoader loader = cluster.getJettySolrRunner(0).getCoreContainer().getResourceLoader();
-    CoreContainer container = cluster.getJettySolrRunner(0).getCoreContainer();
-    SolrCloudManager cloudManager = new SolrClientCloudManager(new ZkDistributedQueueFactory(zkClient), cluster.getSolrClient());
     URL baseUrl = cluster.getJettySolrRunners().get(1).getBaseUrl();
     long waitForSeconds = 5 + random().nextInt(5);
-    Map<String, Object> props = createTriggerProps(waitForSeconds, rate);
+    Map<String, Object> props = createTriggerProps(Arrays.asList(COLL1, COLL2), waitForSeconds, rate, -1);
     final List<TriggerEvent> events = new ArrayList<>();
-    CloudSolrClient solrClient = cluster.getSolrClient();
 
     try (SearchRateTrigger trigger = new SearchRateTrigger("search_rate_trigger")) {
       trigger.configure(loader, cloudManager, props);
@@ -95,19 +119,21 @@ public class SearchRateTriggerTest extends SolrCloudTestCase {
       String url = baseUrl.toString() + "/" + coreName;
       try (HttpSolrClient simpleClient = new HttpSolrClient.Builder(url).build()) {
         SolrParams query = params(CommonParams.Q, "*:*", CommonParams.DISTRIB, "false");
-        for (int i = 0; i < 200; i++) {
+        for (int i = 0; i < 500; i++) {
           simpleClient.query(query);
         }
         trigger.run();
         // waitFor delay
         assertEquals(0, events.size());
-        Thread.sleep(waitForSeconds * 1000 + 2000);
+        Thread.sleep(waitForSeconds * 1000);
+        trigger.run();
+        Thread.sleep(waitForSeconds * 1000);
         // should generate replica event
         trigger.run();
         assertEquals(1, events.size());
         TriggerEvent event = events.get(0);
         assertEquals(TriggerEventType.SEARCHRATE, event.eventType);
-        List<ReplicaInfo> infos = (List<ReplicaInfo>)event.getProperty(AutoScalingParams.REPLICA);
+        List<ReplicaInfo> infos = (List<ReplicaInfo>)event.getProperty(SearchRateTrigger.HOT_REPLICAS);
         assertEquals(1, infos.size());
         ReplicaInfo info = infos.get(0);
         assertEquals(coreName, info.getCore());
@@ -120,12 +146,12 @@ public class SearchRateTriggerTest extends SolrCloudTestCase {
       for (int i = 0; i < 500; i++) {
         solrClient.query(COLL1, query);
       }
-      Thread.sleep(waitForSeconds * 1000 + 2000);
+      Thread.sleep(waitForSeconds * 1000);
       trigger.run();
       // should generate collection event
       assertEquals(1, events.size());
       TriggerEvent event = events.get(0);
-      Map<String, Double> hotCollections = (Map<String, Double>)event.getProperty(AutoScalingParams.COLLECTION);
+      Map<String, Double> hotCollections = (Map<String, Double>)event.getProperty(SearchRateTrigger.HOT_COLLECTIONS);
       assertEquals(1, hotCollections.size());
       Double Rate = hotCollections.get(COLL1);
       assertNotNull(Rate);
@@ -134,21 +160,20 @@ public class SearchRateTriggerTest extends SolrCloudTestCase {
 
       for (int i = 0; i < 1000; i++) {
         solrClient.query(COLL2, query);
+        solrClient.query(COLL1, query);
       }
-      Thread.sleep(waitForSeconds * 1000 + 2000);
+      Thread.sleep(waitForSeconds * 1000);
       trigger.run();
       // should generate node and collection event but not for COLL2 because of waitFor
       assertEquals(1, events.size());
       event = events.get(0);
-      Map<String, Double> hotNodes = (Map<String, Double>)event.getProperty(AutoScalingParams.NODE);
+      Map<String, Double> hotNodes = (Map<String, Double>)event.getProperty(SearchRateTrigger.HOT_NODES);
       assertEquals(3, hotNodes.size());
       hotNodes.forEach((n, r) -> assertTrue(n, r > rate));
-      hotCollections = (Map<String, Double>)event.getProperty(AutoScalingParams.COLLECTION);
-      assertEquals(2, hotCollections.size());
+      hotCollections = (Map<String, Double>)event.getProperty(SearchRateTrigger.HOT_COLLECTIONS);
+      assertEquals(1, hotCollections.size());
       Rate = hotCollections.get(COLL1);
       assertNotNull(Rate);
-      Rate = hotCollections.get(COLL2);
-      assertNotNull(Rate);
 
       events.clear();
       // assert that waitFor prevents new events from being generated
@@ -156,28 +181,154 @@ public class SearchRateTriggerTest extends SolrCloudTestCase {
       // should not generate any events
       assertEquals(0, events.size());
 
-      Thread.sleep(waitForSeconds * 1000 + 2000);
+      Thread.sleep(waitForSeconds * 1000 * 2);
       trigger.run();
       // should generate node and collection event
       assertEquals(1, events.size());
-      hotCollections = (Map<String, Double>)event.getProperty(AutoScalingParams.COLLECTION);
+      event = events.get(0);
+      hotCollections = (Map<String, Double>)event.getProperty(SearchRateTrigger.HOT_COLLECTIONS);
       assertEquals(2, hotCollections.size());
       Rate = hotCollections.get(COLL1);
       assertNotNull(Rate);
       Rate = hotCollections.get(COLL2);
       assertNotNull(Rate);
-      hotNodes = (Map<String, Double>)event.getProperty(AutoScalingParams.NODE);
+      hotNodes = (Map<String, Double>)event.getProperty(SearchRateTrigger.HOT_NODES);
       assertEquals(3, hotNodes.size());
       hotNodes.forEach((n, r) -> assertTrue(n, r > rate));
     }
   }
 
-  private Map<String, Object> createTriggerProps(long waitForSeconds, double rate) {
+  private static final AtomicDouble mockRate = new AtomicDouble();
+
+  @Test
+  public void testWaitForElapsed() throws Exception {
+    SolrResourceLoader loader = cluster.getJettySolrRunner(0).getCoreContainer().getResourceLoader();
+    CloudSolrClient solrClient = cluster.getSolrClient();
+    SolrZkClient zkClient = solrClient.getZkStateReader().getZkClient();
+    SolrCloudManager cloudManager = new SolrClientCloudManager(new ZkDistributedQueueFactory(zkClient), solrClient) {
+      @Override
+      public NodeStateProvider getNodeStateProvider() {
+        return new SolrClientNodeStateProvider(solrClient) {
+          @Override
+          public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
+            Map<String, Object> values = super.getNodeValues(node, tags);
+            values.keySet().forEach(k -> {
+              values.replace(k, mockRate.get());
+            });
+            return values;
+          }
+        };
+      }
+    };
+    TimeSource timeSource = cloudManager.getTimeSource();
+    CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
+        "conf", 2, 2);
+    create.setMaxShardsPerNode(1);
+    create.process(solrClient);
+    CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS, clusterShape(2, 2));
+
+    long waitForSeconds = 5 + random().nextInt(5);
+    Map<String, Object> props = createTriggerProps(Arrays.asList(COLL1, COLL2), waitForSeconds, 1.0, 0.1);
+    final List<TriggerEvent> events = new ArrayList<>();
+
+    try (SearchRateTrigger trigger = new SearchRateTrigger("search_rate_trigger1")) {
+      trigger.configure(loader, cloudManager, props);
+      trigger.init();
+      trigger.setProcessor(noFirstRunProcessor);
+      trigger.run();
+      trigger.setProcessor(event -> events.add(event));
+
+      // set mock rates
+      mockRate.set(2.0);
+      TimeOut timeOut = new TimeOut(waitForSeconds + 2, TimeUnit.SECONDS, timeSource);
+      // simulate ScheduledTriggers
+      while (!timeOut.hasTimedOut()) {
+        trigger.run();
+        timeSource.sleep(1000);
+      }
+      // violation persisted longer than waitFor - there should be events
+      assertTrue(events.toString(), events.size() > 0);
+      TriggerEvent event = events.get(0);
+      assertEquals(event.toString(), TriggerEventType.SEARCHRATE, event.eventType);
+      Map<String, Object> hotNodes, hotCollections, hotShards;
+      List<ReplicaInfo> hotReplicas;
+      hotNodes = (Map<String, Object>)event.properties.get(SearchRateTrigger.HOT_NODES);
+      hotCollections = (Map<String, Object>)event.properties.get(SearchRateTrigger.HOT_COLLECTIONS);
+      hotShards = (Map<String, Object>)event.properties.get(SearchRateTrigger.HOT_SHARDS);
+      hotReplicas = (List<ReplicaInfo>)event.properties.get(SearchRateTrigger.HOT_REPLICAS);
+      assertFalse("no hot nodes?", hotNodes.isEmpty());
+      assertFalse("no hot collections?", hotCollections.isEmpty());
+      assertFalse("no hot shards?", hotShards.isEmpty());
+      assertFalse("no hot replicas?", hotReplicas.isEmpty());
+    }
+
+    mockRate.set(0.0);
+    events.clear();
+
+    try (SearchRateTrigger trigger = new SearchRateTrigger("search_rate_trigger2")) {
+      trigger.configure(loader, cloudManager, props);
+      trigger.init();
+      trigger.setProcessor(noFirstRunProcessor);
+      trigger.run();
+      trigger.setProcessor(event -> events.add(event));
+
+      mockRate.set(2.0);
+      trigger.run();
+      // waitFor not elapsed
+      assertTrue(events.toString(), events.isEmpty());
+      Thread.sleep(1000);
+      trigger.run();
+      assertTrue(events.toString(), events.isEmpty());
+      Thread.sleep(1000);
+      mockRate.set(0.0);
+      trigger.run();
+      Thread.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds - 2, TimeUnit.SECONDS));
+      trigger.run();
+
+      // violations persisted shorter than waitFor - there should be no events
+      assertTrue(events.toString(), events.isEmpty());
+
+    }
+  }
+
+  @Test
+  public void testDefaultsAndBackcompat() throws Exception {
     Map<String, Object> props = new HashMap<>();
-    props.put("rate", rate);
+    props.put("rate", 1.0);
+    props.put("collection", "test");
+    SolrResourceLoader loader = cluster.getJettySolrRunner(0).getCoreContainer().getResourceLoader();
+    SolrZkClient zkClient = cluster.getSolrClient().getZkStateReader().getZkClient();
+    SolrCloudManager cloudManager = new SolrClientCloudManager(new ZkDistributedQueueFactory(zkClient), cluster.getSolrClient());
+    try (SearchRateTrigger trigger = new SearchRateTrigger("search_rate_trigger2")) {
+      trigger.configure(loader, cloudManager, props);
+      Map<String, Object> config = trigger.getConfig();
+      Set<String> collections = (Set<String>)config.get(SearchRateTrigger.COLLECTIONS_PROP);
+      assertEquals(collections.toString(), 1, collections.size());
+      assertEquals("test", collections.iterator().next());
+      assertEquals("#ANY", config.get(AutoScalingParams.SHARD));
+      assertEquals("#ANY", config.get(AutoScalingParams.NODE));
+      assertEquals(1.0, config.get(SearchRateTrigger.ABOVE_RATE_PROP));
+      assertEquals(-1.0, config.get(SearchRateTrigger.BELOW_RATE_PROP));
+      assertEquals(SearchRateTrigger.DEFAULT_METRIC, config.get(SearchRateTrigger.METRIC_PROP));
+      assertEquals(SearchRateTrigger.DEFAULT_MAX_OPS, config.get(SearchRateTrigger.MAX_OPS_PROP));
+      assertNull(config.get(SearchRateTrigger.MIN_REPLICAS_PROP));
+      assertEquals(CollectionParams.CollectionAction.ADDREPLICA, config.get(SearchRateTrigger.ABOVE_OP_PROP));
+      assertEquals(CollectionParams.CollectionAction.MOVEREPLICA, config.get(SearchRateTrigger.ABOVE_NODE_OP_PROP));
+      assertEquals(CollectionParams.CollectionAction.DELETEREPLICA, config.get(SearchRateTrigger.BELOW_OP_PROP));
+      assertNull(config.get(SearchRateTrigger.BELOW_NODE_OP_PROP));
+    }
+  }
+
+  private Map<String, Object> createTriggerProps(List<String> collections, long waitForSeconds, double aboveRate, double belowRate) {
+    Map<String, Object> props = new HashMap<>();
+    props.put("aboveRate", aboveRate);
+    props.put("belowRate", belowRate);
     props.put("event", "searchRate");
     props.put("waitFor", waitForSeconds);
     props.put("enabled", true);
+    if (collections != null && !collections.isEmpty()) {
+      props.put("collections", String.join(",", collections));
+    }
     List<Map<String, String>> actions = new ArrayList<>(3);
     Map<String, String> map = new HashMap<>(2);
     map.put("name", "compute_plan");