You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/12/14 21:31:12 UTC

[02/11] lucene-solr:branch_7x: SOLR-11285: Simulation framework for autoscaling.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
new file mode 100644
index 0000000..a05eb78
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
@@ -0,0 +1,1217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+import com.google.common.util.concurrent.AtomicDouble;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
+import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.autoscaling.ActionContext;
+import org.apache.solr.cloud.autoscaling.NodeLostTrigger;
+import org.apache.solr.cloud.autoscaling.ScheduledTriggers;
+import org.apache.solr.cloud.autoscaling.TriggerActionBase;
+import org.apache.solr.cloud.autoscaling.TriggerEvent;
+import org.apache.solr.cloud.autoscaling.TriggerEventQueue;
+import org.apache.solr.cloud.autoscaling.TriggerListenerBase;
+import org.apache.solr.cloud.autoscaling.CapturedEvent;
+import org.apache.solr.common.cloud.LiveNodesListener;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.util.LogLevel;
+import org.apache.solr.util.TimeOut;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
+import static org.apache.solr.cloud.autoscaling.ScheduledTriggers.DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS;
+
+/**
+ * An end-to-end integration test for triggers
+ */
+@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG")
+public class TestTriggerIntegration extends SimSolrCloudTestCase {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  public static final int SPEED = 50;
+
+  private static CountDownLatch actionConstructorCalled;
+  private static CountDownLatch actionInitCalled;
+  private static CountDownLatch triggerFiredLatch;
+  private static int waitForSeconds = 1;
+  private static CountDownLatch actionStarted;
+  private static CountDownLatch actionInterrupted;
+  private static CountDownLatch actionCompleted;
+  private static AtomicBoolean triggerFired;
+  private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
+
+  private static final long WAIT_FOR_DELTA_NANOS = TimeUnit.MILLISECONDS.toNanos(5);
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    configureCluster(2, TimeSource.get("simTime:" + SPEED));
+  }
+
+  private static CountDownLatch getTriggerFiredLatch() {
+    return triggerFiredLatch;
+  }
+
+  private static CountDownLatch getActionStarted() {
+    return actionStarted;
+  }
+
+  private static CountDownLatch getActionInterrupted() {
+    return actionInterrupted;
+  }
+
+  private static CountDownLatch getActionCompleted() {
+    return actionCompleted;
+  }
+
+  @Before
+  public void setupTest() throws Exception {
+
+    waitForSeconds = 1 + random().nextInt(3);
+    actionConstructorCalled = new CountDownLatch(1);
+    actionInitCalled = new CountDownLatch(1);
+    triggerFiredLatch = new CountDownLatch(1);
+    triggerFired = new AtomicBoolean(false);
+    actionStarted = new CountDownLatch(1);
+    actionInterrupted = new CountDownLatch(1);
+    actionCompleted = new CountDownLatch(1);
+    events.clear();
+    listenerEvents.clear();
+    while (cluster.getClusterStateProvider().getLiveNodes().size() < 2) {
+      // perhaps a test stopped a node but didn't start it back
+      // lets start a node
+      cluster.simAddNode();
+    }
+  }
+
+  @Test
+  public void testTriggerThrottling() throws Exception  {
+    // for this test we want to create two triggers so we must assert that the actions were created twice
+    actionInitCalled = new CountDownLatch(2);
+    // similarly we want both triggers to fire
+    triggerFiredLatch = new CountDownLatch(2);
+
+    SolrClient solrClient = cluster.simGetSolrClient();
+
+    // first trigger
+    String setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'node_added_trigger1'," +
+        "'event' : 'nodeAdded'," +
+        "'waitFor' : '0s'," +
+        "'enabled' : true," +
+        "'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
+        "}}";
+    SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+    NamedList<Object> response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    // second trigger
+    setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'node_added_trigger2'," +
+        "'event' : 'nodeAdded'," +
+        "'waitFor' : '0s'," +
+        "'enabled' : true," +
+        "'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
+        "}}";
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    // wait until the two instances of action are created
+    if (!actionInitCalled.await(3000 / SPEED, TimeUnit.MILLISECONDS))  {
+      fail("Two TriggerAction instances should have been created by now");
+    }
+
+    String newNode = cluster.simAddNode();
+
+    if (!triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS)) {
+      fail("Both triggers should have fired by now");
+    }
+
+    // reset shared state
+    lastActionExecutedAt.set(0);
+    TestTriggerIntegration.actionInitCalled = new CountDownLatch(2);
+    triggerFiredLatch = new CountDownLatch(2);
+
+    setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'node_lost_trigger1'," +
+        "'event' : 'nodeLost'," +
+        "'waitFor' : '0s'," +
+        "'enabled' : true," +
+        "'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
+        "}}";
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'node_lost_trigger2'," +
+        "'event' : 'nodeLost'," +
+        "'waitFor' : '0s'," +
+        "'enabled' : true," +
+        "'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
+        "}}";
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    // wait until the two instances of action are created
+    if (!actionInitCalled.await(3000 / SPEED, TimeUnit.MILLISECONDS))  {
+      fail("Two TriggerAction instances should have been created by now");
+    }
+
+    // stop the node we had started earlier
+    cluster.simRemoveNode(newNode, false);
+
+    if (!triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS)) {
+      fail("Both triggers should have fired by now");
+    }
+  }
+
+  static AtomicLong lastActionExecutedAt = new AtomicLong(0);
+  static ReentrantLock lock = new ReentrantLock();
+  public static class ThrottlingTesterAction extends TestTriggerAction {
+    // nanos are very precise so we need a delta for comparison with ms
+    private static final long DELTA_MS = 2;
+
+    // sanity check that an action instance is only invoked once
+    private final AtomicBoolean onlyOnce = new AtomicBoolean(false);
+
+    @Override
+    public void process(TriggerEvent event, ActionContext actionContext) {
+      boolean locked = lock.tryLock();
+      if (!locked)  {
+        log.info("We should never have a tryLock fail because actions are never supposed to be executed concurrently");
+        return;
+      }
+      try {
+        if (lastActionExecutedAt.get() != 0)  {
+          log.info("last action at " + lastActionExecutedAt.get() + " time = " + cluster.getTimeSource().getTime());
+          if (TimeUnit.NANOSECONDS.toMillis(cluster.getTimeSource().getTime() - lastActionExecutedAt.get()) <
+              TimeUnit.SECONDS.toMillis(ScheduledTriggers.DEFAULT_ACTION_THROTTLE_PERIOD_SECONDS) - DELTA_MS) {
+            log.info("action executed again before minimum wait time from {}", event.getSource());
+            fail("TriggerListener was fired before the throttling period");
+          }
+        }
+        if (onlyOnce.compareAndSet(false, true)) {
+          log.info("action executed from {}", event.getSource());
+          lastActionExecutedAt.set(cluster.getTimeSource().getTime());
+          getTriggerFiredLatch().countDown();
+        } else  {
+          log.info("action executed more than once from {}", event.getSource());
+          fail("Trigger should not have fired more than once!");
+        }
+      } finally {
+        lock.unlock();
+      }
+    }
+  }
+
+  @Test
+  public void testNodeLostTriggerRestoreState() throws Exception {
+    // for this test we want to update the trigger so we must assert that the actions were created twice
+    TestTriggerIntegration.actionInitCalled = new CountDownLatch(2);
+
+    // start a new node
+    String nodeName = cluster.simAddNode();
+
+    SolrClient solrClient = cluster.simGetSolrClient();
+    waitForSeconds = 5;
+    String setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'node_lost_restore_trigger'," +
+        "'event' : 'nodeLost'," +
+        "'waitFor' : '5s'," + // should be enough for us to update the trigger
+        "'enabled' : true," +
+        "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+        "}}";
+    SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+    NamedList<Object> response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    TimeOut timeOut = new TimeOut(2, TimeUnit.SECONDS, cluster.getTimeSource());
+    while (actionInitCalled.getCount() == 0 && !timeOut.hasTimedOut()) {
+      timeOut.sleep(200);
+    }
+    assertTrue("The action specified in node_lost_restore_trigger was not instantiated even after 2 seconds", actionInitCalled.getCount() > 0);
+
+    cluster.simRemoveNode(nodeName, false);
+
+    // ensure that the old trigger sees the stopped node, todo find a better way to do this
+    timeOut.sleep(500 + TimeUnit.SECONDS.toMillis(DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS));
+
+    waitForSeconds = 0;
+    setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'node_lost_restore_trigger'," +
+        "'event' : 'nodeLost'," +
+        "'waitFor' : '0s'," + // update a property so that it replaces the old trigger, also we want it to fire immediately
+        "'enabled' : true," +
+        "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+        "}}";
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    // wait until the second instance of action is created
+    if (!actionInitCalled.await(3000 / SPEED, TimeUnit.MILLISECONDS))  {
+      fail("Two TriggerAction instances should have been created by now");
+    }
+
+    boolean await = triggerFiredLatch.await(5000 / SPEED, TimeUnit.MILLISECONDS);
+    assertTrue("The trigger did not fire at all", await);
+    assertTrue(triggerFired.get());
+    NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) events.iterator().next();
+    assertNotNull(nodeLostEvent);
+    List<String> nodeNames = (List<String>)nodeLostEvent.getProperty(TriggerEvent.NODE_NAMES);
+    assertTrue(nodeNames.contains(nodeName));
+  }
+
+  @Test
+  public void testNodeAddedTriggerRestoreState() throws Exception {
+    // for this test we want to update the trigger so we must assert that the actions were created twice
+    TestTriggerIntegration.actionInitCalled = new CountDownLatch(2);
+
+    SolrClient solrClient = cluster.simGetSolrClient();
+    waitForSeconds = 5;
+    String setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'node_added_restore_trigger'," +
+        "'event' : 'nodeAdded'," +
+        "'waitFor' : '5s'," + // should be enough for us to update the trigger
+        "'enabled' : true," +
+        "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+        "}}";
+    SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+    NamedList<Object> response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    TimeOut timeOut = new TimeOut(2, TimeUnit.SECONDS, cluster.getTimeSource());
+    while (actionInitCalled.getCount() == 0 && !timeOut.hasTimedOut()) {
+      timeOut.sleep(200);
+    }
+    assertTrue("The action specified in node_added_restore_trigger was not instantiated even after 2 seconds", actionInitCalled.getCount() > 0);
+
+    // start a new node
+    String newNode = cluster.simAddNode();
+
+    // ensure that the old trigger sees the new node, todo find a better way to do this
+    cluster.getTimeSource().sleep(500 + TimeUnit.SECONDS.toMillis(DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS));
+
+    waitForSeconds = 0;
+    setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'node_added_restore_trigger'," +
+        "'event' : 'nodeAdded'," +
+        "'waitFor' : '0s'," + // update a property so that it replaces the old trigger, also we want it to fire immediately
+        "'enabled' : true," +
+        "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+        "}}";
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    // wait until the second instance of action is created
+    if (!actionInitCalled.await(3000 / SPEED, TimeUnit.MILLISECONDS))  {
+      fail("Two TriggerAction instances should have been created by now");
+    }
+
+    boolean await = triggerFiredLatch.await(5000 / SPEED, TimeUnit.MILLISECONDS);
+    assertTrue("The trigger did not fire at all", await);
+    assertTrue(triggerFired.get());
+    TriggerEvent nodeAddedEvent = events.iterator().next();
+    assertNotNull(nodeAddedEvent);
+    List<String> nodeNames = (List<String>)nodeAddedEvent.getProperty(TriggerEvent.NODE_NAMES);
+    assertTrue(nodeNames.toString(), nodeNames.contains(newNode));
+  }
+
+  @Test
+  public void testNodeAddedTrigger() throws Exception {
+    SolrClient solrClient = cluster.simGetSolrClient();
+    String setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'node_added_trigger'," +
+        "'event' : 'nodeAdded'," +
+        "'waitFor' : '" + waitForSeconds + "s'," +
+        "'enabled' : true," +
+        "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+        "}}";
+    SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+    NamedList<Object> response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    if (!actionInitCalled.await(5000 / SPEED, TimeUnit.MILLISECONDS))  {
+      fail("The TriggerAction should have been created by now");
+    }
+
+    String newNode = cluster.simAddNode();
+    boolean await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
+    assertTrue("The trigger did not fire at all", await);
+    assertTrue(triggerFired.get());
+    TriggerEvent nodeAddedEvent = events.iterator().next();
+    assertNotNull(nodeAddedEvent);
+    List<String> nodeNames = (List<String>)nodeAddedEvent.getProperty(TriggerEvent.NODE_NAMES);
+    assertTrue(nodeAddedEvent.toString(), nodeNames.contains(newNode));
+
+    // reset
+    actionConstructorCalled = new CountDownLatch(1);
+    actionInitCalled = new CountDownLatch(1);
+
+    // update the trigger with exactly the same data
+    setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'node_added_trigger'," +
+        "'event' : 'nodeAdded'," +
+        "'waitFor' : '" + waitForSeconds + "s'," +
+        "'enabled' : true," +
+        "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+        "}}";
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    // this should be a no-op so the action should have been created but init should not be called
+    if (!actionConstructorCalled.await(3000 / SPEED, TimeUnit.MILLISECONDS))  {
+      fail("The TriggerAction should have been created by now");
+    }
+
+    assertFalse(actionInitCalled.await(2000 / SPEED, TimeUnit.MILLISECONDS));
+  }
+
+  @Test
+  public void testNodeLostTrigger() throws Exception {
+    SolrClient solrClient = cluster.simGetSolrClient();
+    String setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'node_lost_trigger'," +
+        "'event' : 'nodeLost'," +
+        "'waitFor' : '" + waitForSeconds + "s'," +
+        "'enabled' : true," +
+        "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+        "}}";
+    SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+    NamedList<Object> response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    if (!actionInitCalled.await(5000 / SPEED, TimeUnit.MILLISECONDS))  {
+      fail("The TriggerAction should have been created by now");
+    }
+
+    String lostNodeName = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+    cluster.simRemoveNode(lostNodeName, false);
+    boolean await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
+    assertTrue("The trigger did not fire at all", await);
+    assertTrue(triggerFired.get());
+    TriggerEvent nodeLostEvent = events.iterator().next();
+    assertNotNull(nodeLostEvent);
+    List<String> nodeNames = (List<String>)nodeLostEvent.getProperty(TriggerEvent.NODE_NAMES);
+    assertTrue(nodeNames.contains(lostNodeName));
+
+    // reset
+    actionConstructorCalled = new CountDownLatch(1);
+    actionInitCalled = new CountDownLatch(1);
+
+    // update the trigger with exactly the same data
+    setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'node_lost_trigger'," +
+        "'event' : 'nodeLost'," +
+        "'waitFor' : '" + waitForSeconds + "s'," +
+        "'enabled' : true," +
+        "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+        "}}";
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    // this should be a no-op so the action should have been created but init should not be called
+    if (!actionConstructorCalled.await(3000 / SPEED, TimeUnit.MILLISECONDS))  {
+      fail("The TriggerAction should have been created by now");
+    }
+
+    assertFalse(actionInitCalled.await(2000 / SPEED, TimeUnit.MILLISECONDS));
+  }
+
+  // simulator doesn't support overseer functionality yet
+  /*
+  @Test
+  public void testContinueTriggersOnOverseerRestart() throws Exception  {
+    CollectionAdminRequest.OverseerStatus status = new CollectionAdminRequest.OverseerStatus();
+    CloudSolrClient solrClient = cluster.getSolrClient();
+    CollectionAdminResponse adminResponse = status.process(solrClient);
+    NamedList<Object> response = adminResponse.getResponse();
+    String leader = (String) response.get("leader");
+    JettySolrRunner overseerNode = null;
+    int index = -1;
+    List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
+    for (int i = 0; i < jettySolrRunners.size(); i++) {
+      JettySolrRunner runner = jettySolrRunners.get(i);
+      if (runner.getNodeName().equals(leader)) {
+        overseerNode = runner;
+        index = i;
+        break;
+      }
+    }
+    assertNotNull(overseerNode);
+
+    String setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'node_added_trigger'," +
+        "'event' : 'nodeAdded'," +
+        "'waitFor' : '" + waitForSeconds + "s'," +
+        "'enabled' : true," +
+        "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+        "}}";
+    SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+    if (!actionInitCalled.await(3, TimeUnit.SECONDS))  {
+      fail("The TriggerAction should have been created by now");
+    }
+
+    // stop the overseer, somebody else will take over as the overseer
+    cluster.stopJettySolrRunner(index);
+    Thread.sleep(10000);
+    JettySolrRunner newNode = cluster.startJettySolrRunner();
+    boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+    assertTrue("The trigger did not fire at all", await);
+    assertTrue(triggerFired.get());
+    NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
+    assertNotNull(nodeAddedEvent);
+    List<String> nodeNames = (List<String>)nodeAddedEvent.getProperty(TriggerEvent.NODE_NAMES);
+    assertTrue(nodeNames.contains(newNode.getNodeName()));
+  }
+
+*/
+
+  public static class TestTriggerAction extends TriggerActionBase {
+
+    public TestTriggerAction() {
+      actionConstructorCalled.countDown();
+    }
+
+    @Override
+    public void process(TriggerEvent event, ActionContext actionContext) {
+      try {
+        if (triggerFired.compareAndSet(false, true))  {
+          events.add(event);
+          long currentTimeNanos = cluster.getTimeSource().getTime();
+          long eventTimeNanos = event.getEventTime();
+          long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+          if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+            fail(event.getSource() + " was fired before the configured waitFor period");
+          }
+          getTriggerFiredLatch().countDown();
+        } else  {
+          fail(event.getSource() + " was fired more than once!");
+        }
+      } catch (Throwable t) {
+        log.debug("--throwable", t);
+        throw t;
+      }
+    }
+
+    @Override
+    public void init(Map<String, String> args) {
+      log.info("TestTriggerAction init");
+      actionInitCalled.countDown();
+      super.init(args);
+    }
+  }
+
+  public static class TestEventQueueAction extends TriggerActionBase {
+
+    public TestEventQueueAction() {
+      log.info("TestEventQueueAction instantiated");
+    }
+
+    @Override
+    public void process(TriggerEvent event, ActionContext actionContext) {
+      log.info("-- event: " + event);
+      events.add(event);
+      getActionStarted().countDown();
+      try {
+        Thread.sleep(eventQueueActionWait);
+        triggerFired.compareAndSet(false, true);
+        getActionCompleted().countDown();
+      } catch (InterruptedException e) {
+        getActionInterrupted().countDown();
+        return;
+      }
+    }
+
+    @Override
+    public void init(Map<String, String> args) {
+      log.debug("TestTriggerAction init");
+      actionInitCalled.countDown();
+      super.init(args);
+    }
+  }
+
+  public static long eventQueueActionWait = 5000;
+
+  @Test
+  public void testEventQueue() throws Exception {
+    waitForSeconds = 1;
+    SolrClient solrClient = cluster.simGetSolrClient();
+    String setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'node_added_trigger1'," +
+        "'event' : 'nodeAdded'," +
+        "'waitFor' : '" + waitForSeconds + "s'," +
+        "'enabled' : true," +
+        "'actions' : [{'name':'test','class':'" + TestEventQueueAction.class.getName() + "'}]" +
+        "}}";
+
+    String overseerLeader = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+
+    SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+    NamedList<Object> response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    if (!actionInitCalled.await(3000 / SPEED, TimeUnit.MILLISECONDS))  {
+      fail("The TriggerAction should have been created by now");
+    }
+
+    // add node to generate the event
+    String newNode = cluster.simAddNode();
+    boolean await = actionStarted.await(60000 / SPEED, TimeUnit.MILLISECONDS);
+    assertTrue("action did not start", await);
+    // event should be there
+    TriggerEvent nodeAddedEvent = events.iterator().next();
+    assertNotNull(nodeAddedEvent);
+    // but action did not complete yet so the event is still enqueued
+    assertFalse(triggerFired.get());
+    events.clear();
+    actionStarted = new CountDownLatch(1);
+    eventQueueActionWait = 1;
+    // kill overseer
+    cluster.simRestartOverseer(overseerLeader);
+    cluster.getTimeSource().sleep(5000);
+    // new overseer leader should be elected and run triggers
+    await = actionInterrupted.await(3000 / SPEED, TimeUnit.MILLISECONDS);
+    assertTrue("action wasn't interrupted", await);
+    // it should fire again from enqueued event
+    await = actionStarted.await(60000 / SPEED, TimeUnit.MILLISECONDS);
+    assertTrue("action wasn't started", await);
+    TriggerEvent replayedEvent = events.iterator().next();
+    assertTrue(replayedEvent.getProperty(TriggerEventQueue.ENQUEUE_TIME) != null);
+    assertTrue(events + "\n" + replayedEvent.toString(), replayedEvent.getProperty(TriggerEventQueue.DEQUEUE_TIME) != null);
+    await = actionCompleted.await(10000 / SPEED, TimeUnit.MILLISECONDS);
+    assertTrue("action wasn't completed", await);
+    assertTrue(triggerFired.get());
+  }
+
+  @Test
+  public void testEventFromRestoredState() throws Exception {
+    SolrClient solrClient = cluster.simGetSolrClient();
+    String setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'node_added_trigger'," +
+        "'event' : 'nodeAdded'," +
+        "'waitFor' : '10s'," +
+        "'enabled' : true," +
+        "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+        "}}";
+    SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+    NamedList<Object> response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    if (!actionInitCalled.await(10000 / SPEED, TimeUnit.MILLISECONDS))  {
+      fail("The TriggerAction should have been created by now");
+    }
+
+    events.clear();
+
+    String newNode = cluster.simAddNode();
+    boolean await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
+    assertTrue("The trigger did not fire at all", await);
+    assertTrue(triggerFired.get());
+    // reset
+    triggerFired.set(false);
+    triggerFiredLatch = new CountDownLatch(1);
+    TriggerEvent nodeAddedEvent = events.iterator().next();
+    assertNotNull(nodeAddedEvent);
+    List<String> nodeNames = (List<String>)nodeAddedEvent.getProperty(TriggerEvent.NODE_NAMES);
+    assertTrue(nodeNames.contains(newNode));
+    // add a second node - state of the trigger will change but it won't fire for waitFor sec.
+    String newNode2 = cluster.simAddNode();
+    cluster.getTimeSource().sleep(10000);
+    // kill overseer
+    cluster.simRestartOverseer(null);
+    await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
+    assertTrue("The trigger did not fire at all", await);
+    assertTrue(triggerFired.get());
+  }
+
+  private static class TestLiveNodesListener implements LiveNodesListener {
+    Set<String> lostNodes = new HashSet<>();
+    Set<String> addedNodes = new HashSet<>();
+    CountDownLatch onChangeLatch = new CountDownLatch(1);
+
+    public void reset() {
+      lostNodes.clear();
+      addedNodes.clear();
+      onChangeLatch = new CountDownLatch(1);
+    }
+
+    @Override
+    public void onChange(SortedSet<String> oldLiveNodes, SortedSet<String> newLiveNodes) {
+      onChangeLatch.countDown();
+      Set<String> old = new HashSet<>(oldLiveNodes);
+      old.removeAll(newLiveNodes);
+      if (!old.isEmpty()) {
+        lostNodes.addAll(old);
+      }
+      newLiveNodes.removeAll(oldLiveNodes);
+      if (!newLiveNodes.isEmpty()) {
+        addedNodes.addAll(newLiveNodes);
+      }
+    }
+  }
+
+  private TestLiveNodesListener registerLiveNodesListener() {
+    TestLiveNodesListener listener = new TestLiveNodesListener();
+    cluster.getLiveNodesSet().registerLiveNodesListener(listener);
+    return listener;
+  }
+
+  public static class TestEventMarkerAction extends TriggerActionBase {
+
+    public TestEventMarkerAction() {
+      actionConstructorCalled.countDown();
+    }
+
+    @Override
+    public void process(TriggerEvent event, ActionContext actionContext) {
+      boolean locked = lock.tryLock();
+      if (!locked)  {
+        log.info("We should never have a tryLock fail because actions are never supposed to be executed concurrently");
+        return;
+      }
+      try {
+        events.add(event);
+        getTriggerFiredLatch().countDown();
+      } catch (Throwable t) {
+        log.debug("--throwable", t);
+        throw t;
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    @Override
+    public void init(Map<String, String> args) {
+      log.info("TestEventMarkerAction init");
+      actionInitCalled.countDown();
+      super.init(args);
+    }
+  }
+
+  @Test
+  public void testNodeMarkersRegistration() throws Exception {
+    // for this test we want to create two triggers so we must assert that the actions were created twice
+    actionInitCalled = new CountDownLatch(2);
+    // similarly we want both triggers to fire
+    triggerFiredLatch = new CountDownLatch(2);
+    TestLiveNodesListener listener = registerLiveNodesListener();
+
+    SolrClient solrClient = cluster.simGetSolrClient();
+
+    // pick overseer node
+    String overseerLeader = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+
+    // add a node
+    String node = cluster.simAddNode();
+    if (!listener.onChangeLatch.await(10000 / SPEED, TimeUnit.MILLISECONDS)) {
+      fail("onChange listener didn't execute on cluster change");
+    }
+    assertEquals(1, listener.addedNodes.size());
+    assertEquals(node, listener.addedNodes.iterator().next());
+    // verify that a znode doesn't exist (no trigger)
+    String pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node;
+    assertFalse("Path " + pathAdded + " was created but there are no nodeAdded triggers",
+        cluster.getDistribStateManager().hasData(pathAdded));
+    listener.reset();
+    // stop overseer
+    log.info("====== KILL OVERSEER 1");
+    cluster.simRestartOverseer(overseerLeader);
+    if (!listener.onChangeLatch.await(10000 / SPEED, TimeUnit.MILLISECONDS)) {
+      fail("onChange listener didn't execute on cluster change");
+    }
+    assertEquals(1, listener.lostNodes.size());
+    assertEquals(overseerLeader, listener.lostNodes.iterator().next());
+    assertEquals(0, listener.addedNodes.size());
+    // wait until the new overseer is up
+    cluster.getTimeSource().sleep(5000);
+    // verify that a znode does NOT exist - there's no nodeLost trigger,
+    // so the new overseer cleaned up existing nodeLost markers
+    String pathLost = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + overseerLeader;
+    assertFalse("Path " + pathLost + " exists", cluster.getDistribStateManager().hasData(pathLost));
+
+    listener.reset();
+
+    // set up triggers
+
+    log.info("====== ADD TRIGGERS");
+    String setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'node_added_trigger'," +
+        "'event' : 'nodeAdded'," +
+        "'waitFor' : '1s'," +
+        "'enabled' : true," +
+        "'actions' : [{'name':'test','class':'" + TestEventMarkerAction.class.getName() + "'}]" +
+        "}}";
+    SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+    NamedList<Object> response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'node_lost_trigger'," +
+        "'event' : 'nodeLost'," +
+        "'waitFor' : '1s'," +
+        "'enabled' : true," +
+        "'actions' : [{'name':'test','class':'" + TestEventMarkerAction.class.getName() + "'}]" +
+        "}}";
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    overseerLeader = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+
+    // create another node
+    log.info("====== ADD NODE 1");
+    String node1 = cluster.simAddNode();
+    if (!listener.onChangeLatch.await(10000 / SPEED, TimeUnit.MILLISECONDS)) {
+      fail("onChange listener didn't execute on cluster change");
+    }
+    assertEquals(1, listener.addedNodes.size());
+    assertEquals(node1, listener.addedNodes.iterator().next());
+    // verify that a znode exists
+    pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node1;
+    assertTrue("Path " + pathAdded + " wasn't created", cluster.getDistribStateManager().hasData(pathAdded));
+
+    cluster.getTimeSource().sleep(5000);
+    // nodeAdded marker should be consumed now by nodeAdded trigger
+    assertFalse("Path " + pathAdded + " should have been deleted",
+        cluster.getDistribStateManager().hasData(pathAdded));
+
+    listener.reset();
+    events.clear();
+    triggerFiredLatch = new CountDownLatch(1);
+    // kill overseer again
+    log.info("====== KILL OVERSEER 2");
+    cluster.simRestartOverseer(overseerLeader);
+    if (!listener.onChangeLatch.await(10000 / SPEED, TimeUnit.MILLISECONDS)) {
+      fail("onChange listener didn't execute on cluster change");
+    }
+
+
+    if (!triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS)) {
+      fail("Trigger should have fired by now");
+    }
+    assertEquals(1, events.size());
+    TriggerEvent ev = events.iterator().next();
+    List<String> nodeNames = (List<String>)ev.getProperty(TriggerEvent.NODE_NAMES);
+    assertTrue(nodeNames.contains(overseerLeader));
+    assertEquals(TriggerEventType.NODELOST, ev.getEventType());
+  }
+
+  static Map<String, List<CapturedEvent>> listenerEvents = new ConcurrentHashMap<>();
+  static CountDownLatch listenerCreated = new CountDownLatch(1);
+  static boolean failDummyAction = false;
+
+  public static class TestTriggerListener extends TriggerListenerBase {
+    @Override
+    public void init(SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) {
+      super.init(cloudManager, config);
+      listenerCreated.countDown();
+    }
+
+    @Override
+    public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
+                                     ActionContext context, Throwable error, String message) {
+      List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
+      lst.add(new CapturedEvent(cluster.getTimeSource().getTime(), context, config, stage, actionName, event, message));
+    }
+  }
+
+  public static class TestDummyAction extends TriggerActionBase {
+
+    @Override
+    public void process(TriggerEvent event, ActionContext context) {
+      if (failDummyAction) {
+        throw new RuntimeException("failure");
+      }
+
+    }
+  }
+
+  @Test
+  public void testListeners() throws Exception {
+    SolrClient solrClient = cluster.simGetSolrClient();
+    String setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'node_added_trigger'," +
+        "'event' : 'nodeAdded'," +
+        "'waitFor' : '" + waitForSeconds + "s'," +
+        "'enabled' : true," +
+        "'actions' : [" +
+        "{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}," +
+        "{'name':'test1','class':'" + TestDummyAction.class.getName() + "'}," +
+        "]" +
+        "}}";
+    SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+    NamedList<Object> response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    if (!actionInitCalled.await(3000 / SPEED, TimeUnit.MILLISECONDS))  {
+      fail("The TriggerAction should have been created by now");
+    }
+
+    String setListenerCommand = "{" +
+        "'set-listener' : " +
+        "{" +
+        "'name' : 'foo'," +
+        "'trigger' : 'node_added_trigger'," +
+        "'stage' : ['STARTED','ABORTED','SUCCEEDED', 'FAILED']," +
+        "'beforeAction' : 'test'," +
+        "'afterAction' : ['test', 'test1']," +
+        "'class' : '" + TestTriggerListener.class.getName() + "'" +
+        "}" +
+        "}";
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    String setListenerCommand1 = "{" +
+        "'set-listener' : " +
+        "{" +
+        "'name' : 'bar'," +
+        "'trigger' : 'node_added_trigger'," +
+        "'stage' : ['FAILED','SUCCEEDED']," +
+        "'beforeAction' : ['test', 'test1']," +
+        "'afterAction' : 'test'," +
+        "'class' : '" + TestTriggerListener.class.getName() + "'" +
+        "}" +
+        "}";
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand1);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    listenerEvents.clear();
+    failDummyAction = false;
+
+    String newNode = cluster.simAddNode();
+    boolean await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
+    assertTrue("The trigger did not fire at all", await);
+    assertTrue(triggerFired.get());
+
+    assertEquals("both listeners should have fired", 2, listenerEvents.size());
+
+    cluster.getTimeSource().sleep(2000);
+
+    // check foo events
+    List<CapturedEvent> testEvents = listenerEvents.get("foo");
+    assertNotNull("foo events: " + testEvents, testEvents);
+    assertEquals("foo events: " + testEvents, 5, testEvents.size());
+
+    assertEquals(TriggerEventProcessorStage.STARTED, testEvents.get(0).stage);
+
+    assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, testEvents.get(1).stage);
+    assertEquals("test", testEvents.get(1).actionName);
+
+    assertEquals(TriggerEventProcessorStage.AFTER_ACTION, testEvents.get(2).stage);
+    assertEquals("test", testEvents.get(2).actionName);
+
+    assertEquals(TriggerEventProcessorStage.AFTER_ACTION, testEvents.get(3).stage);
+    assertEquals("test1", testEvents.get(3).actionName);
+
+    assertEquals(TriggerEventProcessorStage.SUCCEEDED, testEvents.get(4).stage);
+
+    // check bar events
+    testEvents = listenerEvents.get("bar");
+    assertNotNull("bar events", testEvents);
+    assertEquals("bar events", 4, testEvents.size());
+
+    assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, testEvents.get(0).stage);
+    assertEquals("test", testEvents.get(0).actionName);
+
+    assertEquals(TriggerEventProcessorStage.AFTER_ACTION, testEvents.get(1).stage);
+    assertEquals("test", testEvents.get(1).actionName);
+
+    assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, testEvents.get(2).stage);
+    assertEquals("test1", testEvents.get(2).actionName);
+
+    assertEquals(TriggerEventProcessorStage.SUCCEEDED, testEvents.get(3).stage);
+
+    // reset
+    triggerFired.set(false);
+    triggerFiredLatch = new CountDownLatch(1);
+    listenerEvents.clear();
+    failDummyAction = true;
+
+    newNode = cluster.simAddNode();
+    await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
+    assertTrue("The trigger did not fire at all", await);
+
+    cluster.getTimeSource().sleep(2000);
+
+    // check foo events
+    testEvents = listenerEvents.get("foo");
+    assertNotNull("foo events: " + testEvents, testEvents);
+    assertEquals("foo events: " + testEvents, 4, testEvents.size());
+
+    assertEquals(TriggerEventProcessorStage.STARTED, testEvents.get(0).stage);
+
+    assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, testEvents.get(1).stage);
+    assertEquals("test", testEvents.get(1).actionName);
+
+    assertEquals(TriggerEventProcessorStage.AFTER_ACTION, testEvents.get(2).stage);
+    assertEquals("test", testEvents.get(2).actionName);
+
+    assertEquals(TriggerEventProcessorStage.FAILED, testEvents.get(3).stage);
+    assertEquals("test1", testEvents.get(3).actionName);
+
+    // check bar events
+    testEvents = listenerEvents.get("bar");
+    assertNotNull("bar events", testEvents);
+    assertEquals("bar events", 4, testEvents.size());
+
+    assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, testEvents.get(0).stage);
+    assertEquals("test", testEvents.get(0).actionName);
+
+    assertEquals(TriggerEventProcessorStage.AFTER_ACTION, testEvents.get(1).stage);
+    assertEquals("test", testEvents.get(1).actionName);
+
+    assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, testEvents.get(2).stage);
+    assertEquals("test1", testEvents.get(2).actionName);
+
+    assertEquals(TriggerEventProcessorStage.FAILED, testEvents.get(3).stage);
+    assertEquals("test1", testEvents.get(3).actionName);
+  }
+
+  @Test
+  public void testCooldown() throws Exception {
+    SolrClient solrClient = cluster.simGetSolrClient();
+    failDummyAction = false;
+    waitForSeconds = 1;
+    String setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'node_added_cooldown_trigger'," +
+        "'event' : 'nodeAdded'," +
+        "'waitFor' : '" + waitForSeconds + "s'," +
+        "'enabled' : true," +
+        "'actions' : [" +
+        "{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}" +
+        "]" +
+        "}}";
+    SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+    NamedList<Object> response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    String setListenerCommand1 = "{" +
+        "'set-listener' : " +
+        "{" +
+        "'name' : 'bar'," +
+        "'trigger' : 'node_added_cooldown_trigger'," +
+        "'stage' : ['FAILED','SUCCEEDED', 'IGNORED']," +
+        "'class' : '" + TestTriggerListener.class.getName() + "'" +
+        "}" +
+        "}";
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand1);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    listenerCreated = new CountDownLatch(1);
+    listenerEvents.clear();
+
+    String newNode = cluster.simAddNode();
+    boolean await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
+    assertTrue("The trigger did not fire at all", await);
+    assertTrue(triggerFired.get());
+    // wait for listener to capture the SUCCEEDED stage
+    cluster.getTimeSource().sleep(1000);
+
+    List<CapturedEvent> capturedEvents = listenerEvents.get("bar");
+    // we may get a few IGNORED events if other tests caused events within cooldown period
+    assertTrue(capturedEvents.toString(), capturedEvents.size() > 0);
+    long prevTimestamp = capturedEvents.get(capturedEvents.size() - 1).timestamp;
+
+    // reset the trigger and captured events
+    listenerEvents.clear();
+    triggerFiredLatch = new CountDownLatch(1);
+    triggerFired.compareAndSet(true, false);
+
+    String newNode2 = cluster.simAddNode();
+    await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
+    assertTrue("The trigger did not fire at all", await);
+    // wait for listener to capture the SUCCEEDED stage
+    cluster.getTimeSource().sleep(2000);
+
+    // there must be at least one IGNORED event due to cooldown, and one SUCCEEDED event
+    capturedEvents = listenerEvents.get("bar");
+    assertTrue(capturedEvents.toString(), capturedEvents.size() > 1);
+    for (int i = 0; i < capturedEvents.size() - 1; i++) {
+      CapturedEvent ev = capturedEvents.get(i);
+      assertEquals(ev.toString(), TriggerEventProcessorStage.IGNORED, ev.stage);
+      assertTrue(ev.toString(), ev.message.contains("cooldown"));
+    }
+    CapturedEvent ev = capturedEvents.get(capturedEvents.size() - 1);
+    assertEquals(ev.toString(), TriggerEventProcessorStage.SUCCEEDED, ev.stage);
+    // the difference between timestamps of the first SUCCEEDED and the last SUCCEEDED
+    // must be larger than cooldown period
+    assertTrue("timestamp delta is less than default cooldown period", ev.timestamp - prevTimestamp > TimeUnit.SECONDS.toNanos(ScheduledTriggers.DEFAULT_COOLDOWN_PERIOD_SECONDS));
+  }
+
+  public static class TestSearchRateAction extends TriggerActionBase {
+
+    @Override
+    public void process(TriggerEvent event, ActionContext context) throws Exception {
+      try {
+        events.add(event);
+        long currentTimeNanos = cluster.getTimeSource().getTime();
+        long eventTimeNanos = event.getEventTime();
+        long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+        if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+          fail(event.getSource() + " was fired before the configured waitFor period");
+        }
+        getTriggerFiredLatch().countDown();
+      } catch (Throwable t) {
+        log.debug("--throwable", t);
+        throw t;
+      }
+    }
+  }
+
+  @Test
+  public void testSearchRate() throws Exception {
+    SolrClient solrClient = cluster.simGetSolrClient();
+    String COLL1 = "collection1";
+    CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
+        "conf", 1, 2);
+    create.process(solrClient);
+    waitForState(COLL1, 10, TimeUnit.SECONDS, clusterShape(1, 2));
+
+    String setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'search_rate_trigger'," +
+        "'event' : 'searchRate'," +
+        "'waitFor' : '" + waitForSeconds + "s'," +
+        "'enabled' : true," +
+        "'rate' : 1.0," +
+        "'actions' : [" +
+        "{'name':'test','class':'" + TestSearchRateAction.class.getName() + "'}" +
+        "]" +
+        "}}";
+    SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+    NamedList<Object> response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    String setListenerCommand1 = "{" +
+        "'set-listener' : " +
+        "{" +
+        "'name' : 'srt'," +
+        "'trigger' : 'search_rate_trigger'," +
+        "'stage' : ['FAILED','SUCCEEDED']," +
+        "'class' : '" + TestTriggerListener.class.getName() + "'" +
+        "}" +
+        "}";
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand1);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+//    SolrParams query = params(CommonParams.Q, "*:*");
+//    for (int i = 0; i < 500; i++) {
+//      solrClient.query(COLL1, query);
+//    }
+
+    cluster.getSimClusterStateProvider().simSetCollectionValue(COLL1, "QUERY./select.requestTimes:1minRate", 500, true);
+
+    boolean await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
+    assertTrue("The trigger did not fire at all", await);
+    // wait for listener to capture the SUCCEEDED stage
+    cluster.getTimeSource().sleep(2000);
+    assertEquals(listenerEvents.toString(), 1, listenerEvents.get("srt").size());
+    CapturedEvent ev = listenerEvents.get("srt").get(0);
+    long now = cluster.getTimeSource().getTime();
+    // verify waitFor
+    assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
+    Map<String, Double> nodeRates = (Map<String, Double>)ev.event.getProperties().get("node");
+    assertNotNull("nodeRates", nodeRates);
+    assertTrue(nodeRates.toString(), nodeRates.size() > 0);
+    AtomicDouble totalNodeRate = new AtomicDouble();
+    nodeRates.forEach((n, r) -> totalNodeRate.addAndGet(r));
+    List<ReplicaInfo> replicaRates = (List<ReplicaInfo>)ev.event.getProperties().get("replica");
+    assertNotNull("replicaRates", replicaRates);
+    assertTrue(replicaRates.toString(), replicaRates.size() > 0);
+    AtomicDouble totalReplicaRate = new AtomicDouble();
+    replicaRates.forEach(r -> {
+      assertTrue(r.toString(), r.getVariable("rate") != null);
+      totalReplicaRate.addAndGet((Double)r.getVariable("rate"));
+    });
+    Map<String, Object> shardRates = (Map<String, Object>)ev.event.getProperties().get("shard");
+    assertNotNull("shardRates", shardRates);
+    assertEquals(shardRates.toString(), 1, shardRates.size());
+    shardRates = (Map<String, Object>)shardRates.get(COLL1);
+    assertNotNull("shardRates", shardRates);
+    assertEquals(shardRates.toString(), 1, shardRates.size());
+    AtomicDouble totalShardRate = new AtomicDouble();
+    shardRates.forEach((s, r) -> totalShardRate.addAndGet((Double)r));
+    Map<String, Double> collectionRates = (Map<String, Double>)ev.event.getProperties().get("collection");
+    assertNotNull("collectionRates", collectionRates);
+    assertEquals(collectionRates.toString(), 1, collectionRates.size());
+    Double collectionRate = collectionRates.get(COLL1);
+    assertNotNull(collectionRate);
+    assertTrue(collectionRate > 5.0);
+    assertEquals(collectionRate, totalNodeRate.get(), 5.0);
+    assertEquals(collectionRate, totalShardRate.get(), 5.0);
+    assertEquals(collectionRate, totalReplicaRate.get(), 5.0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/package-info.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/package-info.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/package-info.java
new file mode 100644
index 0000000..0b412cb
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Simulated environment for autoscaling tests.
+ */
+package org.apache.solr.cloud.autoscaling.sim;
+

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/test/org/apache/solr/cloud/cdcr/BaseCdcrDistributedZkTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/cdcr/BaseCdcrDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/cdcr/BaseCdcrDistributedZkTest.java
index 42af083..c242809 100644
--- a/solr/core/src/test/org/apache/solr/cloud/cdcr/BaseCdcrDistributedZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/cdcr/BaseCdcrDistributedZkTest.java
@@ -59,6 +59,7 @@ import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.SolrCore;
@@ -794,7 +795,7 @@ public class BaseCdcrDistributedZkTest extends AbstractDistribZkTestBase {
 
   protected void waitForBootstrapToComplete(String collectionName, String shardId) throws Exception {
     NamedList rsp;// we need to wait until bootstrap is complete otherwise the replicator thread will never start
-    TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS);
+    TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS, TimeSource.NANO_TIME);
     while (!timeOut.hasTimedOut())  {
       rsp = invokeCdcrAction(shardToLeaderJetty.get(collectionName).get(shardId), CdcrParams.CdcrAction.BOOTSTRAP_STATUS);
       if (rsp.get(RESPONSE_STATUS).toString().equals(COMPLETED))  {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/test/org/apache/solr/cloud/hdfs/StressHdfsTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/hdfs/StressHdfsTest.java b/solr/core/src/test/org/apache/solr/cloud/hdfs/StressHdfsTest.java
index 329de79..500655d 100644
--- a/solr/core/src/test/org/apache/solr/cloud/hdfs/StressHdfsTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/hdfs/StressHdfsTest.java
@@ -39,6 +39,7 @@ import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.params.CollectionParams.CollectionAction;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.util.BadHdfsThreadsFilter;
 import org.apache.solr.util.TimeOut;
 import org.apache.zookeeper.KeeperException;
@@ -221,7 +222,7 @@ public class StressHdfsTest extends BasicDistributedZkTest {
     request.setPath("/admin/collections");
     cloudClient.request(request);
 
-    final TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS);
+    final TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
     while (cloudClient.getZkStateReader().getClusterState().hasCollection(DELETE_DATA_DIR_COLLECTION)) {
       if (timeout.hasTimedOut()) {
         throw new AssertionError("Timeout waiting to see removed collection leave clusterstate");

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
index 906e27b..0639479 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
@@ -33,6 +33,7 @@ import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.util.TimeOut;
 
@@ -213,7 +214,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
       writer.writePendingUpdates();
 
       boolean found = false;
-      TimeOut timeOut = new TimeOut(5, TimeUnit.SECONDS);
+      TimeOut timeOut = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME);
       while (!timeOut.hasTimedOut())  {
         DocCollection c1 = reader.getClusterState().getCollection("c1");
         if ("y".equals(c1.getStr("x"))) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java b/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java
index 76c5c0f..626374c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java
@@ -201,6 +201,9 @@ public class RuleEngineTest extends SolrTestCaseJ4{
       public NodeStateProvider getNodeStateProvider() {
         return new NodeStateProvider() {
           @Override
+          public void close() throws IOException { }
+
+          @Override
           public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
             return (Map<String, Object>) MockSnitch.nodeVsTags.get(node);
           }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/test/org/apache/solr/core/OpenCloseCoreStressTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/core/OpenCloseCoreStressTest.java b/solr/core/src/test/org/apache/solr/core/OpenCloseCoreStressTest.java
index 0bdf90c..f85b293 100644
--- a/solr/core/src/test/org/apache/solr/core/OpenCloseCoreStressTest.java
+++ b/solr/core/src/test/org/apache/solr/core/OpenCloseCoreStressTest.java
@@ -26,6 +26,7 @@ import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.client.solrj.response.UpdateResponse;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.util.TimeOut;
 import org.junit.After;
 import org.junit.Before;
@@ -318,8 +319,8 @@ class Indexer {
   ArrayList<OneIndexer> _threads = new ArrayList<>();
 
   public Indexer(OpenCloseCoreStressTest OCCST, String url, List<HttpSolrClient> clients, int numThreads, int secondsToRun, Random random) {
-    stopTimeout = new TimeOut(secondsToRun, TimeUnit.SECONDS);
-    nextTimeout = new TimeOut(60, TimeUnit.SECONDS);
+    stopTimeout = new TimeOut(secondsToRun, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+    nextTimeout = new TimeOut(60, TimeUnit.SECONDS, TimeSource.NANO_TIME);
     docsThisCycle.set(0);
     qTimesAccum.set(0);
     updateCounts.set(0);
@@ -353,7 +354,7 @@ class Indexer {
       log.info(String.format(Locale.ROOT, " s indexed: [run %,8d] [cycle %,8d] [last minute %,8d] Last core updated: %s. Seconds left in cycle %,4d",
           myId, docsThisCycle.get(), myId - lastCount, core, stopTimeout.timeLeft(TimeUnit.SECONDS)));
       lastCount = myId;
-      nextTimeout = new TimeOut(60, TimeUnit.SECONDS);
+      nextTimeout = new TimeOut(60, TimeUnit.SECONDS, TimeSource.NANO_TIME);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/AutoScalingConfig.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/AutoScalingConfig.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/AutoScalingConfig.java
index e439d03..01f9199 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/AutoScalingConfig.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/AutoScalingConfig.java
@@ -520,6 +520,15 @@ public class AutoScalingConfig implements MapWriter {
     return withTriggerListenerConfigs(configs);
   }
 
+  @Override
+  public Object clone() {
+    if (jsonMap != null) {
+      return new AutoScalingConfig(jsonMap);
+    } else {
+      return new AutoScalingConfig(getPolicy(), getTriggerConfigs(), getTriggerListenerConfigs(), getProperties(), zkVersion);
+    }
+  }
+
   /**
    * Return the znode version that was used to create this configuration.
    */

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingCloudManager.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingCloudManager.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingCloudManager.java
index 09b6193..17c48d5 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingCloudManager.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingCloudManager.java
@@ -24,6 +24,7 @@ import org.apache.solr.client.solrj.SolrResponse;
 import org.apache.solr.client.solrj.cloud.DistributedQueueFactory;
 import org.apache.solr.client.solrj.impl.ClusterStateProvider;
 import org.apache.solr.common.util.ObjectCache;
+import org.apache.solr.common.util.TimeSource;
 
 /**
  * Base class for overriding some behavior of {@link SolrCloudManager}.
@@ -31,6 +32,7 @@ import org.apache.solr.common.util.ObjectCache;
 public class DelegatingCloudManager implements SolrCloudManager {
   private final SolrCloudManager delegate;
   private ObjectCache objectCache = new ObjectCache();
+  private TimeSource timeSource = TimeSource.NANO_TIME;
 
   public DelegatingCloudManager(SolrCloudManager delegate) {
     this.delegate = delegate;
@@ -62,6 +64,16 @@ public class DelegatingCloudManager implements SolrCloudManager {
   }
 
   @Override
+  public boolean isClosed() {
+    return delegate.isClosed();
+  }
+
+  @Override
+  public TimeSource getTimeSource() {
+    return delegate == null ? timeSource : delegate.getTimeSource();
+  }
+
+  @Override
   public SolrResponse request(SolrRequest req) throws IOException {
     return delegate.request(req);
   }
@@ -70,4 +82,9 @@ public class DelegatingCloudManager implements SolrCloudManager {
   public byte[] httpRequest(String url, SolrRequest.METHOD method, Map<String, String> headers, String payload, int timeout, boolean followRedirects) throws IOException {
     return delegate.httpRequest(url, method, headers, payload, timeout, followRedirects);
   }
+
+  @Override
+  public void close() throws IOException {
+    delegate.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingDistribStateManager.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingDistribStateManager.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingDistribStateManager.java
index b47d1c8..2fea23b 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingDistribStateManager.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingDistribStateManager.java
@@ -45,6 +45,11 @@ public class DelegatingDistribStateManager implements DistribStateManager {
   }
 
   @Override
+  public List<String> listData(String path, Watcher watcher) throws NoSuchElementException, IOException, KeeperException, InterruptedException {
+    return delegate.listData(path, watcher);
+  }
+
+  @Override
   public VersionedData getData(String path, Watcher watcher) throws NoSuchElementException, IOException, KeeperException, InterruptedException {
     return delegate.getData(path, watcher);
   }
@@ -60,12 +65,17 @@ public class DelegatingDistribStateManager implements DistribStateManager {
   }
 
   @Override
+  public void makePath(String path, byte[] data, CreateMode createMode, boolean failOnExists) throws AlreadyExistsException, IOException, KeeperException, InterruptedException {
+    delegate.makePath(path, data, createMode, failOnExists);
+  }
+
+  @Override
   public String createData(String path, byte[] data, CreateMode mode) throws AlreadyExistsException, IOException, KeeperException, InterruptedException {
     return delegate.createData(path, data, mode);
   }
 
   @Override
-  public void removeData(String path, int version) throws NoSuchElementException, IOException, KeeperException, InterruptedException {
+  public void removeData(String path, int version) throws NoSuchElementException, IOException, BadVersionException, KeeperException, InterruptedException {
     delegate.removeData(path, version);
   }
 
@@ -88,4 +98,9 @@ public class DelegatingDistribStateManager implements DistribStateManager {
   public AutoScalingConfig getAutoScalingConfig() throws InterruptedException, IOException {
     return delegate.getAutoScalingConfig();
   }
+
+  @Override
+  public void close() throws IOException {
+    delegate.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingNodeStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingNodeStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingNodeStateProvider.java
index 8b717f8..9ffde0f 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingNodeStateProvider.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingNodeStateProvider.java
@@ -17,6 +17,7 @@
 
 package org.apache.solr.client.solrj.cloud.autoscaling;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -40,4 +41,14 @@ public class DelegatingNodeStateProvider implements NodeStateProvider {
   public Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
     return delegate.getReplicaInfo(node, keys);
   }
+
+  @Override
+  public void close() throws IOException {
+    delegate.close();
+  }
+
+  @Override
+  public boolean isClosed() {
+    return delegate.isClosed();
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DistribStateManager.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DistribStateManager.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DistribStateManager.java
index 4318418..26aaead 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DistribStateManager.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DistribStateManager.java
@@ -16,11 +16,11 @@
  */
 package org.apache.solr.client.solrj.cloud.autoscaling;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 import java.util.NoSuchElementException;
 
+import org.apache.solr.common.SolrCloseable;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Op;
@@ -30,7 +30,7 @@ import org.apache.zookeeper.Watcher;
 /**
  * This interface represents a distributed state repository.
  */
-public interface DistribStateManager extends Closeable {
+public interface DistribStateManager extends SolrCloseable {
 
   // state accessors
 
@@ -38,6 +38,8 @@ public interface DistribStateManager extends Closeable {
 
   List<String> listData(String path) throws NoSuchElementException, IOException, KeeperException, InterruptedException;
 
+  List<String> listData(String path, Watcher watcher) throws NoSuchElementException, IOException, KeeperException, InterruptedException;
+
   VersionedData getData(String path, Watcher watcher) throws NoSuchElementException, IOException, KeeperException, InterruptedException;
 
   default VersionedData getData(String path) throws NoSuchElementException, IOException, KeeperException, InterruptedException {
@@ -48,9 +50,19 @@ public interface DistribStateManager extends Closeable {
 
   void makePath(String path) throws AlreadyExistsException, IOException, KeeperException, InterruptedException;
 
+  void makePath(String path, byte[] data, CreateMode createMode, boolean failOnExists) throws AlreadyExistsException, IOException, KeeperException, InterruptedException;
+
+  /**
+   * Create data (leaf) node at specified path.
+   * @param path base path name of the node.
+   * @param data data to be stored.
+   * @param mode creation mode.
+   * @return actual path of the node - in case of sequential nodes this will differ from the base path because
+   * of the appended sequence number.
+   */
   String createData(String path, byte[] data, CreateMode mode) throws AlreadyExistsException, IOException, KeeperException, InterruptedException;
 
-  void removeData(String path, int version) throws NoSuchElementException, IOException, KeeperException, InterruptedException;
+  void removeData(String path, int version) throws NoSuchElementException, IOException, KeeperException, InterruptedException, BadVersionException;
 
   void setData(String path, byte[] data, int version) throws BadVersionException, NoSuchElementException, IOException, KeeperException, InterruptedException;
 
@@ -61,9 +73,4 @@ public interface DistribStateManager extends Closeable {
   default AutoScalingConfig getAutoScalingConfig() throws InterruptedException, IOException {
     return getAutoScalingConfig(null);
   }
-
-  @Override
-  default void close() throws IOException {
-
-  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/NodeStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/NodeStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/NodeStateProvider.java
index dbf6836..68dfa39 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/NodeStateProvider.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/NodeStateProvider.java
@@ -20,10 +20,12 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.solr.common.SolrCloseable;
+
 /**
  * This interface models the access to node and replica information.
  */
-public interface NodeStateProvider {
+public interface NodeStateProvider extends SolrCloseable {
   /**
    * Get the value of each tag for a given node
    *

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
index d73ae6c..f11121d 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
@@ -18,6 +18,7 @@
 package org.apache.solr.client.solrj.cloud.autoscaling;
 
 import java.io.IOException;
+import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -37,9 +38,12 @@ import java.util.stream.Collectors;
 import org.apache.solr.client.solrj.impl.ClusterStateProvider;
 import org.apache.solr.common.IteratorWriter;
 import org.apache.solr.common.MapWriter;
+import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.params.CollectionParams.CollectionAction;
 import org.apache.solr.common.util.StrUtils;
 import org.apache.solr.common.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
@@ -56,6 +60,8 @@ import static java.util.stream.Collectors.toList;
  *
  */
 public class Policy implements MapWriter {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   public static final String POLICY = "policy";
   public static final String EACH = "#EACH";
   public static final String ANY = "#ANY";
@@ -211,16 +217,26 @@ public class Policy implements MapWriter {
     Set<String> collections = new HashSet<>();
     List<Clause> expandedClauses;
     List<Violation> violations = new ArrayList<>();
+    final int znodeVersion;
 
     private Session(List<String> nodes, SolrCloudManager cloudManager,
-                    List<Row> matrix, List<Clause> expandedClauses) {
+                    List<Row> matrix, List<Clause> expandedClauses, int znodeVersion) {
       this.nodes = nodes;
       this.cloudManager = cloudManager;
       this.matrix = matrix;
       this.expandedClauses = expandedClauses;
+      this.znodeVersion = znodeVersion;
     }
 
     Session(SolrCloudManager cloudManager) {
+      ClusterState state = null;
+      try {
+        state = cloudManager.getClusterStateProvider().getClusterState();
+        LOG.trace("-- session created with cluster state: {}", state);
+      } catch (Exception e) {
+        LOG.trace("-- session created, can't obtain cluster state", e);
+      }
+      this.znodeVersion = state != null ? state.getZNodeVersion() : -1;
       this.nodes = new ArrayList<>(cloudManager.getClusterStateProvider().getLiveNodes());
       this.cloudManager = cloudManager;
       for (String node : nodes) {
@@ -256,7 +272,7 @@ public class Policy implements MapWriter {
     }
 
     Session copy() {
-      return new Session(nodes, cloudManager, getMatrixCopy(), expandedClauses);
+      return new Session(nodes, cloudManager, getMatrixCopy(), expandedClauses, znodeVersion);
     }
 
     List<Row> getMatrixCopy() {
@@ -297,6 +313,7 @@ public class Policy implements MapWriter {
 
     @Override
     public void writeMap(EntryWriter ew) throws IOException {
+      ew.put("znodeVersion", znodeVersion);
       for (int i = 0; i < matrix.size(); i++) {
         Row row = matrix.get(i);
         ew.put(row.node, row);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
index 1112127..d091e34 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
@@ -30,12 +30,12 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.cloud.autoscaling.Suggester.Hint;
 import org.apache.solr.client.solrj.impl.ClusterStateProvider;
-import org.apache.solr.client.solrj.impl.SolrClientCloudManager;
 import org.apache.solr.common.MapWriter;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.ReplicaPosition;
 import org.apache.solr.common.util.Pair;
+import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.common.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -138,7 +138,7 @@ public class PolicyHelper {
 
   public static final int SESSION_EXPIRY = 180;//3 seconds
 
-  public static MapWriter getDiagnostics(Policy policy, SolrClientCloudManager cloudManager) {
+  public static MapWriter getDiagnostics(Policy policy, SolrCloudManager cloudManager) {
     Policy.Session session = policy.createSession(cloudManager);
     List<Row> sorted = session.getSorted();
     List<Violation> violations = session.getViolations();
@@ -233,9 +233,10 @@ public class PolicyHelper {
      *
      */
     private void returnSession(SessionWrapper sessionWrapper) {
+      TimeSource timeSource = sessionWrapper.session.cloudManager.getTimeSource();
       synchronized (lockObj) {
         sessionWrapper.status = Status.EXECUTING;
-        log.info("returnSession, curr-time {} sessionWrapper.createTime {}, this.sessionWrapper.createTime {} ", time(MILLISECONDS),
+        log.info("returnSession, curr-time {} sessionWrapper.createTime {}, this.sessionWrapper.createTime {} ", time(timeSource, MILLISECONDS),
             sessionWrapper.createTime,
             this.sessionWrapper.createTime);
         if (sessionWrapper.createTime == this.sessionWrapper.createTime) {
@@ -255,13 +256,14 @@ public class PolicyHelper {
 
 
     public SessionWrapper get(SolrCloudManager cloudManager) throws IOException, InterruptedException {
+      TimeSource timeSource = cloudManager.getTimeSource();
       synchronized (lockObj) {
         if (sessionWrapper.status == Status.NULL ||
-            TimeUnit.SECONDS.convert(System.nanoTime() - sessionWrapper.lastUpdateTime, TimeUnit.NANOSECONDS) > SESSION_EXPIRY) {
+            TimeUnit.SECONDS.convert(timeSource.getTime() - sessionWrapper.lastUpdateTime, TimeUnit.NANOSECONDS) > SESSION_EXPIRY) {
           //no session available or the session is expired
           return createSession(cloudManager);
         } else {
-          long waitStart = time(MILLISECONDS);
+          long waitStart = time(timeSource, MILLISECONDS);
           //the session is not expired
           log.debug("reusing a session {}", this.sessionWrapper.createTime);
           if (this.sessionWrapper.status == Status.UNUSED || this.sessionWrapper.status == Status.EXECUTING) {
@@ -269,13 +271,13 @@ public class PolicyHelper {
             return sessionWrapper;
           } else {
             //status= COMPUTING it's being used for computing. computing is
-            log.debug("session being used. waiting... current time {} ", time(MILLISECONDS));
+            log.debug("session being used. waiting... current time {} ", time(timeSource, MILLISECONDS));
             try {
               lockObj.wait(10 * 1000);//wait for a max of 10 seconds
             } catch (InterruptedException e) {
               log.info("interrupted... ");
             }
-            log.debug("out of waiting curr-time:{} time-elapsed {}", time(MILLISECONDS), timeElapsed(waitStart, MILLISECONDS));
+            log.debug("out of waiting curr-time:{} time-elapsed {}", time(timeSource, MILLISECONDS), timeElapsed(timeSource, waitStart, MILLISECONDS));
             // now this thread has woken up because it got timed out after 10 seconds or it is notified after
             //the session was returned from another COMPUTING operation
             if (this.sessionWrapper.status == Status.UNUSED || this.sessionWrapper.status == Status.EXECUTING) {
@@ -289,8 +291,6 @@ public class PolicyHelper {
           }
         }
       }
-
-
     }
 
     private SessionWrapper createSession(SolrCloudManager cloudManager) throws InterruptedException, IOException {
@@ -360,7 +360,9 @@ public class PolicyHelper {
     }
 
     public SessionWrapper(Policy.Session session, SessionRef ref) {
-      lastUpdateTime = createTime = System.nanoTime();
+      lastUpdateTime = createTime = session != null ?
+          session.cloudManager.getTimeSource().getTime() :
+          TimeSource.NANO_TIME.getTime();
       this.session = session;
       this.status = Status.UNUSED;
       this.ref = ref;
@@ -371,7 +373,9 @@ public class PolicyHelper {
     }
 
     public SessionWrapper update(Policy.Session session) {
-      this.lastUpdateTime = System.nanoTime();
+      this.lastUpdateTime = session != null ?
+          session.cloudManager.getTimeSource().getTime() :
+          TimeSource.NANO_TIME.getTime();
       this.session = session;
       return this;
     }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java
index 5f7281f..930ede8 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java
@@ -18,6 +18,7 @@
 package org.apache.solr.client.solrj.cloud.autoscaling;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
 
@@ -33,21 +34,25 @@ public class ReplicaInfo implements MapWriter {
   private String core, collection, shard;
   private Replica.Type type;
   private String node;
-  private Map<String, Object> variables;
+  private final Map<String, Object> variables = new HashMap<>();
 
-  public ReplicaInfo(String coll,String shard, Replica r, Map<String, Object> vals){
+  public ReplicaInfo(String coll, String shard, Replica r, Map<String, Object> vals) {
     this.name = r.getName();
     this.core = r.getCoreName();
     this.collection = coll;
     this.shard = shard;
     this.type = r.getType();
-    this.variables = vals;
+    if (vals != null) {
+      this.variables.putAll(vals);
+    }
     this.node = r.getNodeName();
   }
 
   public ReplicaInfo(String name, String core, String coll, String shard, Replica.Type type, String node, Map<String, Object> vals) {
     this.name = name;
-    this.variables = vals;
+    if (vals != null) {
+      this.variables.putAll(vals);
+    }
     this.collection = coll;
     this.shard = shard;
     this.type = type;
@@ -58,12 +63,22 @@ public class ReplicaInfo implements MapWriter {
   @Override
   public void writeMap(EntryWriter ew) throws IOException {
     ew.put(name, (MapWriter) ew1 -> {
-      if (variables != null) {
-        for (Map.Entry<String, Object> e : variables.entrySet()) {
-          ew1.put(e.getKey(), e.getValue());
-        }
+      for (Map.Entry<String, Object> e : variables.entrySet()) {
+        ew1.put(e.getKey(), e.getValue());
+      }
+      if (core != null && !variables.containsKey(ZkStateReader.CORE_NAME_PROP)) {
+        ew1.put(ZkStateReader.CORE_NAME_PROP, core);
+      }
+      if (shard != null && !variables.containsKey(ZkStateReader.SHARD_ID_PROP)) {
+        ew1.put(ZkStateReader.SHARD_ID_PROP, shard);
+      }
+      if (collection != null && !variables.containsKey(ZkStateReader.COLLECTION_PROP)) {
+        ew1.put(ZkStateReader.COLLECTION_PROP, collection);
+      }
+      if (node != null && !variables.containsKey(ZkStateReader.NODE_NAME_PROP)) {
+        ew1.put(ZkStateReader.NODE_NAME_PROP, node);
       }
-      if (type != null) ew1.put("type", type.toString());
+      if (type != null) ew1.put(ZkStateReader.REPLICA_TYPE, type.toString());
     });
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/SolrCloudManager.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/SolrCloudManager.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/SolrCloudManager.java
index 8a1f8f0..55cdcee 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/SolrCloudManager.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/SolrCloudManager.java
@@ -26,6 +26,7 @@ import org.apache.solr.client.solrj.cloud.DistributedQueueFactory;
 import org.apache.solr.client.solrj.impl.ClusterStateProvider;
 import org.apache.solr.common.SolrCloseable;
 import org.apache.solr.common.util.ObjectCache;
+import org.apache.solr.common.util.TimeSource;
 
 /**
  * This interface abstracts the access to a SolrCloud cluster, including interactions with Zookeeper, Solr
@@ -44,16 +45,11 @@ public interface SolrCloudManager extends SolrCloseable {
 
   ObjectCache getObjectCache();
 
+  TimeSource getTimeSource();
+
   // Solr-like methods
 
   SolrResponse request(SolrRequest req) throws IOException;
 
   byte[] httpRequest(String url, SolrRequest.METHOD method, Map<String, String> headers, String payload, int timeout, boolean followRedirects) throws IOException;
-
-  // distributed queue implementation
-
-  @Override
-  default void close() {
-
-  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java
index aec5f15..070869a 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java
@@ -38,7 +38,7 @@ import static org.apache.solr.client.solrj.cloud.autoscaling.Policy.ANY;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
 
 public class Suggestion {
-  static final String coreidxsize = "INDEX.sizeInBytes";
+  public static final String coreidxsize = "INDEX.sizeInBytes";
   static final Map<String, ConditionType> validatetypes = new HashMap<>();
 
   public static ConditionType getTagType(String name) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
index 2432fb2..dfe15df 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
@@ -81,6 +81,7 @@ import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.SimpleOrderedMap;
 import org.apache.solr.common.util.SolrjNamedThreadFactory;
 import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -1030,13 +1031,13 @@ public class CloudSolrClient extends SolrClient {
       if (!liveNodes.isEmpty()) {
         List<String> liveNodesList = new ArrayList<>(liveNodes);
         Collections.shuffle(liveNodesList, rand);
-        theUrlList.add(ZkStateReader.getBaseUrlForNodeName(liveNodesList.get(0),
+        theUrlList.add(Utils.getBaseUrlForNodeName(liveNodesList.get(0),
             (String) stateProvider.getClusterProperty(ZkStateReader.URL_SCHEME,"http")));
       }
 
     } else if (ADMIN_PATHS.contains(request.getPath())) {
       for (String liveNode : liveNodes) {
-        theUrlList.add(ZkStateReader.getBaseUrlForNodeName(liveNode,
+        theUrlList.add(Utils.getBaseUrlForNodeName(liveNode,
             (String) stateProvider.getClusterProperty(ZkStateReader.URL_SCHEME,"http")));
       }