You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/12/11 18:52:40 UTC

[21/21] lucene-solr:jira/solr-11285-sim: SOLR-11403: Keep track of /live_nodes entries and ephemeral nodeAdded / nodeLost nodes. Port over NodeAdded and NodeLost trigger tests.

SOLR-11403: Keep track of /live_nodes entries and ephemeral nodeAdded / nodeLost nodes.
Port over NodeAdded and NodeLost trigger tests.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/9e1c2490
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/9e1c2490
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/9e1c2490

Branch: refs/heads/jira/solr-11285-sim
Commit: 9e1c2490f33b3d215fd90d2ee1c6262380180ac1
Parents: 403812c
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Mon Dec 11 19:51:28 2017 +0100
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Mon Dec 11 19:51:28 2017 +0100

----------------------------------------------------------------------
 .../cloud/autoscaling/ExecutePlanAction.java    |   8 +-
 .../cloud/autoscaling/sim/LiveNodesSet.java     |  99 ++++++
 .../cloud/autoscaling/sim/SimCloudManager.java  |  98 +++---
 .../sim/SimClusterStateProvider.java            |  70 +++-
 .../autoscaling/sim/SimDistribStateManager.java |  17 +
 .../autoscaling/sim/SimNodeStateProvider.java   |   8 +-
 .../sim/TestClusterStateProvider.java           |  25 +-
 .../autoscaling/sim/TestNodeAddedTrigger.java   | 306 +++++++++++++++++
 .../autoscaling/sim/TestNodeLostTrigger.java    | 334 +++++++++++++++++++
 .../autoscaling/sim/TestTriggerIntegration.java | 146 ++++----
 10 files changed, 949 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9e1c2490/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java
index 841856e..1e1b0ac 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java
@@ -108,15 +108,13 @@ public class ExecutePlanAction extends TriggerActionBase {
         } catch (IOException e) {
           throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
               "Unexpected exception executing operation: " + operation.getParams(), e);
-//        } catch (InterruptedException e) {
-//          Thread.currentThread().interrupt();
-//          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "ExecutePlanAction was interrupted", e);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "ExecutePlanAction was interrupted", e);
         } catch (Exception e) {
           throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
               "Unexpected exception executing operation: " + operation.getParams(), e);
         }
-
-//        counter++;
       }
     } catch (Exception e) {
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9e1c2490/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/LiveNodesSet.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/LiveNodesSet.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/LiveNodesSet.java
new file mode 100644
index 0000000..45cd66b
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/LiveNodesSet.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.solr.common.cloud.LiveNodesListener;
+
+/**
+ * This class represents a set of live nodes and allows adding listeners to track their state.
+ */
+public class LiveNodesSet {
+
+  private final Set<String> set = ConcurrentHashMap.newKeySet();
+  private final Set<LiveNodesListener> listeners = ConcurrentHashMap.newKeySet();
+
+  public Set<String> get() {
+    return Collections.unmodifiableSet(set);
+  }
+
+  public void registerLiveNodesListener(LiveNodesListener listener) {
+    listeners.add(listener);
+  }
+
+  public void removeLiveNodesListener(LiveNodesListener listener) {
+    listeners.remove(listener);
+  }
+
+  private void fireListeners(SortedSet<String> oldNodes, SortedSet<String> newNodes) {
+    for (LiveNodesListener listener : listeners) {
+      listener.onChange(oldNodes, newNodes);
+    }
+  }
+
+  public boolean isEmpty() {
+    return set.isEmpty();
+  }
+
+  public boolean contains(String id) {
+    return set.contains(id);
+  }
+
+  public synchronized boolean add(String id) {
+    if (set.contains(id)) {
+      return false;
+    }
+    TreeSet<String> oldNodes = new TreeSet<>(set);
+    set.add(id);
+    TreeSet<String> newNodes = new TreeSet<>(set);
+    fireListeners(oldNodes, newNodes);
+    return true;
+  }
+
+  public synchronized boolean addAll(Collection<String> nodes) {
+    TreeSet<String> oldNodes = new TreeSet<>(set);
+    boolean changed = set.addAll(nodes);
+    TreeSet<String> newNodes = new TreeSet<>(set);
+    if (changed) {
+      fireListeners(oldNodes, newNodes);
+    }
+    return changed;
+  }
+
+  public synchronized boolean remove(String id) {
+    if (!set.contains(id)) {
+      return false;
+    }
+    TreeSet<String> oldNodes = new TreeSet<>(set);
+    set.remove(id);
+    TreeSet<String> newNodes = new TreeSet<>(set);
+    fireListeners(oldNodes, newNodes);
+    return true;
+  }
+
+  public synchronized void clear() {
+    TreeSet<String> oldNodes = new TreeSet<>(set);
+    set.clear();
+    fireListeners(oldNodes, Collections.emptySortedSet());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9e1c2490/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
index 9c8cc29..92840c8 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
@@ -26,7 +26,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -91,11 +90,10 @@ public class SimCloudManager implements SolrCloudManager {
   private final SimClusterStateProvider clusterStateProvider;
   private final SimNodeStateProvider nodeStateProvider;
   private final AutoScalingHandler autoScalingHandler;
-  private final Set<String> liveNodes = ConcurrentHashMap.newKeySet();
+  private final LiveNodesSet liveNodesSet = new LiveNodesSet();
   private final DistributedQueueFactory queueFactory;
   private final ObjectCache objectCache = new ObjectCache();
   private TimeSource timeSource;
-  private SolrClient solrClient;
 
   private final List<SolrInputDocument> systemColl = Collections.synchronizedList(new ArrayList<>());
   private final ExecutorService simCloudManagerPool;
@@ -103,11 +101,14 @@ public class SimCloudManager implements SolrCloudManager {
 
 
   private Overseer.OverseerThread triggerThread;
+  private ThreadGroup triggerThreadGroup;
+  private SolrResourceLoader loader;
 
   private static int nodeIdPort = 10000;
 
   public SimCloudManager(TimeSource timeSource) throws Exception {
     this.stateManager = new SimDistribStateManager();
+    this.loader = new SolrResourceLoader();
     // init common paths
     stateManager.makePath(ZkStateReader.CLUSTER_STATE);
     stateManager.makePath(ZkStateReader.CLUSTER_PROPS);
@@ -120,22 +121,18 @@ public class SimCloudManager implements SolrCloudManager {
     stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
 
     this.timeSource = timeSource != null ? timeSource : TimeSource.NANO_TIME;
-    this.clusterStateProvider = new SimClusterStateProvider(liveNodes, this);
-    this.nodeStateProvider = new SimNodeStateProvider(liveNodes, this.stateManager, this.clusterStateProvider, null);
+    this.clusterStateProvider = new SimClusterStateProvider(liveNodesSet, this);
+    this.nodeStateProvider = new SimNodeStateProvider(liveNodesSet, this.stateManager, this.clusterStateProvider, null);
     this.queueFactory = new GenericDistributedQueueFactory(stateManager);
     this.simCloudManagerPool = ExecutorUtil.newMDCAwareFixedThreadPool(200, new DefaultSolrThreadFactory("simCloudManagerPool"));
-    this.autoScalingHandler = new AutoScalingHandler(this, new SolrResourceLoader());
-    ThreadGroup triggerThreadGroup = new ThreadGroup("Simulated Overseer autoscaling triggers");
-    OverseerTriggerThread trigger = new OverseerTriggerThread(new SolrResourceLoader(), this,
+    this.autoScalingHandler = new AutoScalingHandler(this, loader);
+    triggerThreadGroup = new ThreadGroup("Simulated Overseer autoscaling triggers");
+    OverseerTriggerThread trigger = new OverseerTriggerThread(loader, this,
         new CloudConfig.CloudConfigBuilder("nonexistent", 0, "sim").build());
     triggerThread = new Overseer.OverseerThread(triggerThreadGroup, trigger, "Simulated OverseerAutoScalingTriggerThread");
     triggerThread.start();
   }
 
-  public void setSolrClient(SolrClient solrClient) {
-    this.solrClient = solrClient;
-  }
-
   // ---------- simulator setup methods -----------
 
   public static SimCloudManager createCluster(int numNodes, TimeSource timeSource) throws Exception {
@@ -213,6 +210,10 @@ public class SimCloudManager implements SolrCloudManager {
     return values;
   }
 
+  public SolrResourceLoader getLoader() {
+    return loader;
+  }
+
   /**
    * Add a new node and initialize its node values (metrics).
    * @return new node id
@@ -250,7 +251,7 @@ public class SimCloudManager implements SolrCloudManager {
    * @param random random
    */
   public void simRemoveRandomNodes(int number, boolean withValues, Random random) throws Exception {
-    List<String> nodes = new ArrayList<>(liveNodes);
+    List<String> nodes = new ArrayList<>(liveNodesSet.get());
     Collections.shuffle(nodes, random);
     int count = Math.min(number, nodes.size());
     for (int i = 0; i < count; i++) {
@@ -279,22 +280,37 @@ public class SimCloudManager implements SolrCloudManager {
    * @return simulated SolrClient.
    */
   public SolrClient simGetSolrClient() {
-    if (solrClient != null) {
-      return solrClient;
-    } else {
-      return new SolrClient() {
-        @Override
-        public NamedList<Object> request(SolrRequest request, String collection) throws SolrServerException, IOException {
-          SolrResponse rsp = SimCloudManager.this.request(request);
-          return rsp.getResponse();
-        }
-
-        @Override
-        public void close() throws IOException {
-
-        }
-      };
+    return new SolrClient() {
+      @Override
+      public NamedList<Object> request(SolrRequest request, String collection) throws SolrServerException, IOException {
+        SolrResponse rsp = SimCloudManager.this.request(request);
+        return rsp.getResponse();
+      }
+
+      @Override
+      public void close() throws IOException {
+
+      }
+    };
+  }
+
+  /**
+   * Simulate the effect of restarting Overseer leader - in this case this means restarting the
+   * OverseerTriggerThread.
+   * @param killNodeId optional nodeId to kill. If null then don't kill any node, just restart the thread
+   */
+  public void simRestartOverseer(String killNodeId) throws Exception {
+    LOG.info("=== Restarting OverseerTriggerThread...");
+    IOUtils.closeQuietly(triggerThread);
+    triggerThread.interrupt();
+    if (killNodeId != null) {
+      simRemoveNode(killNodeId, true);
     }
+    OverseerTriggerThread trigger = new OverseerTriggerThread(loader, this,
+        new CloudConfig.CloudConfigBuilder("nonexistent", 0, "sim").build());
+    triggerThread = new Overseer.OverseerThread(triggerThreadGroup, trigger, "Simulated OverseerAutoScalingTriggerThread");
+    triggerThread.start();
+
   }
 
   /**
@@ -319,6 +335,10 @@ public class SimCloudManager implements SolrCloudManager {
     return stateManager;
   }
 
+  public LiveNodesSet getLiveNodesSet() {
+    return liveNodesSet;
+  }
+
   public Map<String, AtomicLong> simGetOpCounts() {
     return opCounts;
   }
@@ -363,19 +383,11 @@ public class SimCloudManager implements SolrCloudManager {
 
   @Override
   public SolrResponse request(SolrRequest req) throws IOException {
-    if (solrClient != null) {
-      try {
-        return req.process(solrClient);
-      } catch (SolrServerException e) {
-        throw new IOException(e);
-      }
-    } else {
-      try {
-        Future<SolrResponse> res = submit(() -> simHandleSolrRequest(req));
-        return res.get();
-      } catch (Exception e) {
-        throw new IOException(e);
-      }
+    try {
+      Future<SolrResponse> res = submit(() -> simHandleSolrRequest(req));
+      return res.get();
+    } catch (Exception e) {
+      throw new IOException(e);
     }
   }
 
@@ -499,8 +511,8 @@ public class SimCloudManager implements SolrCloudManager {
           if (req.getParams().get(CommonAdminParams.ASYNC) != null) {
             results.add(REQUESTID, req.getParams().get(CommonAdminParams.ASYNC));
           }
-          if (!liveNodes.isEmpty()) {
-            results.add("leader", liveNodes.iterator().next());
+          if (!liveNodesSet.get().isEmpty()) {
+            results.add("leader", liveNodesSet.get().iterator().next());
           }
           results.add("overseer_queue_size", 0);
           results.add("overseer_work_queue_size", 0);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9e1c2490/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
index d51c059..e63e8ba 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
@@ -38,10 +38,13 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
 import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
 import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
 import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
 import org.apache.solr.client.solrj.cloud.autoscaling.Suggestion;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
 import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
 import org.apache.solr.client.solrj.impl.ClusterStateProvider;
 import org.apache.solr.cloud.ActionThrottle;
@@ -67,6 +70,7 @@ import org.apache.solr.common.params.CommonAdminParams;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.Utils;
+import org.apache.zookeeper.CreateMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,8 +86,8 @@ public class SimClusterStateProvider implements ClusterStateProvider {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final Map<String, List<ReplicaInfo>> nodeReplicaMap = new ConcurrentHashMap<>();
-  private final Set<String> liveNodes;
-  private final DistribStateManager stateManager;
+  private final LiveNodesSet liveNodes;
+  private final SimDistribStateManager stateManager;
   private final SimCloudManager cloudManager;
 
   private final Map<String, Object> clusterProperties = new ConcurrentHashMap<>();
@@ -109,10 +113,13 @@ public class SimClusterStateProvider implements ClusterStateProvider {
    * The instance needs to be initialized using the <code>sim*</code> methods in order
    * to ensure proper behavior, otherwise it will behave as a cluster with zero replicas.
    */
-  public SimClusterStateProvider(Set<String> liveNodes, SimCloudManager cloudManager) {
+  public SimClusterStateProvider(LiveNodesSet liveNodes, SimCloudManager cloudManager) throws Exception {
     this.liveNodes = liveNodes;
+    for (String nodeId : liveNodes.get()) {
+      createEphemeralLiveNode(nodeId);
+    }
     this.cloudManager = cloudManager;
-    this.stateManager = cloudManager.getDistribStateManager();
+    this.stateManager = cloudManager.getSimDistribStateManager();
     this.leaderThrottle = new ActionThrottle("leader", 5000, cloudManager.getTimeSource());
     // names are CollectionAction names, delays are in ms (simulated time)
     defaultOpDelays.put(CollectionParams.CollectionAction.MOVEREPLICA.name(), 5000L);
@@ -138,7 +145,18 @@ public class SimClusterStateProvider implements ClusterStateProvider {
       sliceProperties.clear();
       nodeReplicaMap.clear();
       liveNodes.clear();
+      for (String nodeId : stateManager.listData(ZkStateReader.LIVE_NODES_ZKNODE)) {
+        if (stateManager.hasData(ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeId)) {
+          stateManager.removeData(ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeId, -1);
+        }
+        if (stateManager.hasData(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeId)) {
+          stateManager.removeData(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeId, -1);
+        }
+      }
       liveNodes.addAll(initialState.getLiveNodes());
+      for (String nodeId : liveNodes.get()) {
+        createEphemeralLiveNode(nodeId);
+      }
       initialState.forEachCollection(dc -> {
         collProperties.computeIfAbsent(dc.getName(), name -> new ConcurrentHashMap<>()).putAll(dc.getProperties());
         opDelays.computeIfAbsent(dc.getName(), c -> new HashMap<>()).putAll(defaultOpDelays);
@@ -147,7 +165,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
               .computeIfAbsent(s.getName(), name -> new HashMap<>()).putAll(s.getProperties());
           s.getReplicas().forEach(r -> {
             ReplicaInfo ri = new ReplicaInfo(r.getName(), r.getCoreName(), dc.getName(), s.getName(), r.getType(), r.getNodeName(), r.getProperties());
-            if (liveNodes.contains(r.getNodeName())) {
+            if (liveNodes.get().contains(r.getNodeName())) {
               nodeReplicaMap.computeIfAbsent(r.getNodeName(), rn -> new ArrayList<>()).add(ri);
             }
           });
@@ -175,11 +193,10 @@ public class SimClusterStateProvider implements ClusterStateProvider {
     if (liveNodes.isEmpty()) {
       return null;
     }
-    List<String> nodes = new ArrayList<>(liveNodes);
+    List<String> nodes = new ArrayList<>(liveNodes.get());
     return nodes.get(random.nextInt(nodes.size()));
   }
 
-  // todo: maybe hook up DistribStateManager /live_nodes ?
   // todo: maybe hook up DistribStateManager /clusterstate.json watchers?
 
   /**
@@ -191,6 +208,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
       throw new Exception("Node " + nodeId + " already exists");
     }
     liveNodes.add(nodeId);
+    createEphemeralLiveNode(nodeId);
     nodeReplicaMap.putIfAbsent(nodeId, new ArrayList<>());
   }
 
@@ -218,7 +236,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
     }
   }
 
-  // todo: maybe hook up DistribStateManager /live_nodes ?
   // todo: maybe hook up DistribStateManager /clusterstate.json watchers?
 
   /**
@@ -234,6 +251,13 @@ public class SimClusterStateProvider implements ClusterStateProvider {
       // mark every replica on that node as down
       setReplicaStates(nodeId, Replica.State.DOWN, collections);
       boolean res = liveNodes.remove(nodeId);
+      // remove ephemeral nodes
+      stateManager.getRoot().removeEphemeralChildren(nodeId);
+      // create a nodeLost marker if needed
+      AutoScalingConfig cfg = stateManager.getAutoScalingConfig(null);
+      if (cfg.hasTriggerForEvents(TriggerEventType.NODELOST)) {
+        stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + nodeId);
+      }
       if (!collections.isEmpty()) {
         cloudManager.submit(new LeaderElection(collections, true));
       }
@@ -254,8 +278,19 @@ public class SimClusterStateProvider implements ClusterStateProvider {
     }
   }
 
+  // this method needs to be called under a lock
+  private void createEphemeralLiveNode(String nodeId) throws Exception {
+    DistribStateManager mgr = stateManager.withEphemeralId(nodeId);
+    mgr.makePath(ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeId, null, CreateMode.EPHEMERAL, true);
+    AutoScalingConfig cfg = stateManager.getAutoScalingConfig(null);
+    if (cfg.hasTriggerForEvents(TriggerEventType.NODEADDED)) {
+      mgr.makePath(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeId, null, CreateMode.EPHEMERAL, true);
+    }
+  }
+
   public boolean simRestoreNode(String nodeId) throws Exception {
     liveNodes.add(nodeId);
+    createEphemeralLiveNode(nodeId);
     Set<String> collections = new HashSet<>();
     lock.lock();
     try {
@@ -307,8 +342,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
     simAddReplica(message.getStr(CoreAdminParams.NODE), ri, true);
   }
 
-  // todo: maybe hook up DistribStateManager /clusterstate.json watchers?
-
   /**
    * Add a replica. Note that all details of the replica must be present here, including
    * node, coreNodeName and SolrCore name.
@@ -376,8 +409,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
     }
   }
 
-  // todo: maybe hook up DistribStateManager /clusterstate.json watchers?
-
   /**
    * Remove replica.
    * @param nodeId node id
@@ -485,7 +516,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
               if (ri.getVariables().remove(ZkStateReader.LEADER_PROP) != null) {
                 stateChanged.set(true);
               }
-              if (r.isActive(liveNodes)) {
+              if (r.isActive(liveNodes.get())) {
                 active.add(ri);
               } else { // if it's on a node that is not live mark it down
                 if (!liveNodes.contains(r.getNodeName())) {
@@ -1055,10 +1086,15 @@ public class SimClusterStateProvider implements ClusterStateProvider {
   /**
    * Return all replica infos for a node.
    * @param node node id
-   * @return list of replicas on that node
+   * @return list of replicas on that node, or empty list if none
    */
   public List<ReplicaInfo> simGetReplicaInfos(String node) {
-    return nodeReplicaMap.get(node);
+    List<ReplicaInfo> replicas = nodeReplicaMap.get(node);
+    if (replicas == null) {
+      return Collections.emptyList();
+    } else {
+      return replicas;
+    }
   }
 
   /**
@@ -1091,7 +1127,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
 
   @Override
   public Set<String> getLiveNodes() {
-    return liveNodes;
+    return liveNodes.get();
   }
 
   @Override
@@ -1101,7 +1137,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
 
   @Override
   public ClusterState getClusterState() throws IOException {
-    return new ClusterState(0, liveNodes, getCollectionStates());
+    return new ClusterState(0, liveNodes.get(), getCollectionStates());
   }
 
   private Map<String, DocCollection> getCollectionStates() {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9e1c2490/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java
index a570e6d..12b38e7 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java
@@ -239,6 +239,23 @@ public class SimDistribStateManager implements DistribStateManager {
     this.errorRef.set(actionError);
   }
 
+  private SimDistribStateManager(String id, ExecutorService watchersPool, Node root, ActionThrottle actionThrottle,
+                                 ActionError actionError) {
+    this.id = id;
+    this.watchersPool = watchersPool;
+    this.root = root;
+    this.throttleRef.set(actionThrottle);
+    this.errorRef.set(actionError);
+  }
+
+  public SimDistribStateManager withEphemeralId(String id) {
+    return new SimDistribStateManager(id, watchersPool, root, throttleRef.get(), errorRef.get());
+  }
+
+  public Node getRoot() {
+    return root;
+  }
+
   public void clear() {
     root.clear();
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9e1c2490/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java
index 65fb5b9..a9b3b5b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java
@@ -47,12 +47,12 @@ public class SimNodeStateProvider implements NodeStateProvider {
   private final Map<String, Map<String, Object>> nodeValues = new ConcurrentHashMap<>();
   private final SimClusterStateProvider clusterStateProvider;
   private final SimDistribStateManager stateManager;
-  private final Set<String> liveNodes;
+  private final LiveNodesSet liveNodesSet;
 
-  public SimNodeStateProvider(Set<String> liveNodes, SimDistribStateManager stateManager,
+  public SimNodeStateProvider(LiveNodesSet liveNodesSet, SimDistribStateManager stateManager,
                               SimClusterStateProvider clusterStateProvider,
                               Map<String, Map<String, Object>> nodeValues) {
-    this.liveNodes = liveNodes;
+    this.liveNodesSet = liveNodesSet;
     this.stateManager = stateManager;
     this.clusterStateProvider = clusterStateProvider;
     if (nodeValues != null) {
@@ -226,7 +226,7 @@ public class SimNodeStateProvider implements NodeStateProvider {
   @Override
   public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
     LOG.trace("-- requested values for " + node + ": " + tags);
-    if (!liveNodes.contains(node)) {
+    if (!liveNodesSet.contains(node)) {
       nodeValues.remove(node);
       return Collections.emptyMap();
     }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9e1c2490/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestClusterStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestClusterStateProvider.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestClusterStateProvider.java
index 38dd7dc..cb3bb4c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestClusterStateProvider.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestClusterStateProvider.java
@@ -169,15 +169,30 @@ public class TestClusterStateProvider extends SolrCloudTestCase {
   @Test
   public void testAddRemoveNode() throws Exception {
     Set<String> lastNodes = new HashSet<>(cloudManager.getClusterStateProvider().getLiveNodes());
+    List<String> liveNodes = cloudManager.getDistribStateManager().listData(ZkStateReader.LIVE_NODES_ZKNODE);
+    assertEquals(lastNodes.size(), liveNodes.size());
+    liveNodes.removeAll(lastNodes);
+    assertTrue(liveNodes.isEmpty());
+
     String node = addNode();
-    Thread.sleep(2000);
+    cloudManager.getTimeSource().sleep(2000);
     assertFalse(lastNodes.contains(node));
-    assertTrue(cloudManager.getClusterStateProvider().getLiveNodes().contains(node));
+    lastNodes = new HashSet<>(cloudManager.getClusterStateProvider().getLiveNodes());
+    assertTrue(lastNodes.contains(node));
+    liveNodes = cloudManager.getDistribStateManager().listData(ZkStateReader.LIVE_NODES_ZKNODE);
+    assertEquals(lastNodes.size(), liveNodes.size());
+    liveNodes.removeAll(lastNodes);
+    assertTrue(liveNodes.isEmpty());
+
     node = deleteNode();
-    Thread.sleep(2000);
+    cloudManager.getTimeSource().sleep(2000);
     assertTrue(lastNodes.contains(node));
-    assertFalse(cloudManager.getClusterStateProvider().getLiveNodes().contains(node));
-  }
+    lastNodes = new HashSet<>(cloudManager.getClusterStateProvider().getLiveNodes());
+    assertFalse(lastNodes.contains(node));
+    liveNodes = cloudManager.getDistribStateManager().listData(ZkStateReader.LIVE_NODES_ZKNODE);
+    assertEquals(lastNodes.size(), liveNodes.size());
+    liveNodes.removeAll(lastNodes);
+    assertTrue(liveNodes.isEmpty());  }
 
   @Test
   public void testAutoScalingConfig() throws Exception {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9e1c2490/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeAddedTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeAddedTrigger.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeAddedTrigger.java
new file mode 100644
index 0000000..c1f10d0
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeAddedTrigger.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.solr.cloud.autoscaling.ActionContext;
+import org.apache.solr.cloud.autoscaling.AutoScaling;
+import org.apache.solr.cloud.autoscaling.NodeAddedTrigger;
+import org.apache.solr.cloud.autoscaling.TriggerAction;
+import org.apache.solr.cloud.autoscaling.TriggerEvent;
+import org.apache.solr.common.util.TimeSource;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test for {@link NodeAddedTrigger}
+ */
+public class TestNodeAddedTrigger extends SimSolrCloudTestCase {
+  private static AtomicBoolean actionConstructorCalled = new AtomicBoolean(false);
+  private static AtomicBoolean actionInitCalled = new AtomicBoolean(false);
+  private static AtomicBoolean actionCloseCalled = new AtomicBoolean(false);
+
+  private AutoScaling.TriggerEventProcessor noFirstRunProcessor = event -> {
+    fail("Did not expect the listener to fire on first run!");
+    return true;
+  };
+
+  private static int SPEED = 50;
+
+  // currentTimeMillis is not as precise so to avoid false positives while comparing time of fire, we add some delta
+  private static final long WAIT_FOR_DELTA_NANOS = TimeUnit.MILLISECONDS.toNanos(2);
+
+  private static TimeSource timeSource;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    cluster = SimCloudManager.createCluster(1, TimeSource.get("simTime:" + SPEED));
+    timeSource = cluster.getTimeSource();
+  }
+
+  @Before
+  public void beforeTest() throws Exception {
+    actionConstructorCalled = new AtomicBoolean(false);
+    actionInitCalled = new AtomicBoolean(false);
+    actionCloseCalled = new AtomicBoolean(false);
+  }
+
+  @Test
+  public void testTrigger() throws Exception {
+    long waitForSeconds = 1 + random().nextInt(5);
+    Map<String, Object> props = createTriggerProps(waitForSeconds);
+
+    try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, cluster.getLoader(), cluster)) {
+      trigger.setProcessor(noFirstRunProcessor);
+      trigger.run();
+
+      String newNode1 = cluster.simAddNode();
+      String newNode2 = cluster.simAddNode();
+      AtomicBoolean fired = new AtomicBoolean(false);
+      AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
+      trigger.setProcessor(event -> {
+        if (fired.compareAndSet(false, true)) {
+          eventRef.set(event);
+          long currentTimeNanos = timeSource.getTime();
+          long eventTimeNanos = event.getEventTime();
+          long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+          if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+            fail("NodeAddedListener was fired before the configured waitFor period: currentTimeNanos=" + currentTimeNanos + ", eventTimeNanos=" +  eventTimeNanos + ",waitForNanos=" + waitForNanos);
+          }
+        } else {
+          fail("NodeAddedTrigger was fired more than once!");
+        }
+        return true;
+      });
+      int counter = 0;
+      do {
+        trigger.run();
+        timeSource.sleep(1000);
+        if (counter++ > 10) {
+          fail("Newly added node was not discovered by trigger even after 10 seconds");
+        }
+      } while (!fired.get());
+
+      TriggerEvent nodeAddedEvent = eventRef.get();
+      assertNotNull(nodeAddedEvent);
+      List<String> nodeNames = (List<String>)nodeAddedEvent.getProperty(TriggerEvent.NODE_NAMES);
+      assertTrue(nodeNames.contains(newNode1));
+      assertTrue(nodeNames.contains(newNode2));
+    }
+
+    // add a new node but remove it before the waitFor period expires
+    // and assert that the trigger doesn't fire at all
+    try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, cluster.getLoader(), cluster)) {
+      final long waitTime = 2;
+      props.put("waitFor", waitTime);
+      trigger.setProcessor(noFirstRunProcessor);
+      trigger.run();
+
+      String newNode = cluster.simAddNode();
+      AtomicBoolean fired = new AtomicBoolean(false);
+      trigger.setProcessor(event -> {
+        if (fired.compareAndSet(false, true)) {
+          long currentTimeNanos = timeSource.getTime();
+          long eventTimeNanos = event.getEventTime();
+          long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+          if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+            fail("NodeAddedListener was fired before the configured waitFor period: currentTimeNanos=" + currentTimeNanos + ", eventTimeNanos=" +  eventTimeNanos + ",waitForNanos=" + waitForNanos);
+          }
+        } else {
+          fail("NodeAddedTrigger was fired more than once!");
+        }
+        return true;
+      });
+      trigger.run(); // first run should detect the new node
+      cluster.simRemoveNode(newNode, true);
+      int counter = 0;
+      do {
+        trigger.run();
+        timeSource.sleep(1000);
+        if (counter++ > waitTime + 1) { // run it a little more than the wait time
+          break;
+        }
+      } while (true);
+
+      // ensure the event was not fired
+      assertFalse(fired.get());
+    }
+  }
+
+  public void testActionLifecycle() throws Exception {
+    Map<String, Object> props = createTriggerProps(0);
+    List<Map<String, String>> actions = (List<Map<String, String>>) props.get("actions");
+    Map<String, String> action = new HashMap<>(2);
+    action.put("name", "testActionInit");
+    action.put("class", TestNodeAddedTrigger.AssertInitTriggerAction.class.getName());
+    actions.add(action);
+    try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, cluster.getLoader(), cluster)) {
+      assertEquals(true, actionConstructorCalled.get());
+      assertEquals(false, actionInitCalled.get());
+      assertEquals(false, actionCloseCalled.get());
+      trigger.init();
+      assertEquals(true, actionInitCalled.get());
+      assertEquals(false, actionCloseCalled.get());
+    }
+    assertEquals(true, actionCloseCalled.get());
+  }
+
+  public static class AssertInitTriggerAction implements TriggerAction  {
+    public AssertInitTriggerAction() {
+      actionConstructorCalled.set(true);
+    }
+
+    @Override
+    public String getName() {
+      return "";
+    }
+
+    @Override
+    public void process(TriggerEvent event, ActionContext actionContext) {
+
+    }
+
+    @Override
+    public void close() throws IOException {
+      actionCloseCalled.compareAndSet(false, true);
+    }
+
+    @Override
+    public void init(Map<String, String> args) {
+      actionInitCalled.compareAndSet(false, true);
+    }
+  }
+
+  @Test
+  public void testListenerAcceptance() throws Exception {
+    Map<String, Object> props = createTriggerProps(0);
+    try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, cluster.getLoader(), cluster)) {
+      trigger.setProcessor(noFirstRunProcessor);
+      trigger.run(); // starts tracking live nodes
+
+      String newNode = cluster.simAddNode();
+      AtomicInteger callCount = new AtomicInteger(0);
+      AtomicBoolean fired = new AtomicBoolean(false);
+
+      trigger.setProcessor(event -> {
+        if (callCount.incrementAndGet() < 2) {
+          return false;
+        } else  {
+          fired.compareAndSet(false, true);
+          return true;
+        }
+      });
+
+      trigger.run(); // first run should detect the new node and fire immediately but listener isn't ready
+      assertEquals(1, callCount.get());
+      assertFalse(fired.get());
+      trigger.run(); // second run should again fire
+      assertEquals(2, callCount.get());
+      assertTrue(fired.get());
+      trigger.run(); // should not fire
+      assertEquals(2, callCount.get());
+    }
+  }
+
+  @Test
+  public void testRestoreState() throws Exception {
+    long waitForSeconds = 1 + random().nextInt(5);
+    Map<String, Object> props = createTriggerProps(waitForSeconds);
+
+    // add a new node but update the trigger before the waitFor period expires
+    // and assert that the new trigger still fires
+    NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, cluster.getLoader(), cluster);
+    trigger.setProcessor(noFirstRunProcessor);
+    trigger.run();
+
+    String newNode = cluster.simAddNode();
+    trigger.run(); // this run should detect the new node
+    trigger.close(); // close the old trigger
+
+    try (NodeAddedTrigger newTrigger = new NodeAddedTrigger("some_different_name", props, cluster.getLoader(), cluster))  {
+      try {
+        newTrigger.restoreState(trigger);
+        fail("Trigger should only be able to restore state from an old trigger of the same name");
+      } catch (AssertionError e) {
+        // expected
+      }
+    }
+
+    try (NodeAddedTrigger newTrigger = new NodeAddedTrigger("node_added_trigger", props, cluster.getLoader(), cluster))  {
+      AtomicBoolean fired = new AtomicBoolean(false);
+      AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
+      newTrigger.setProcessor(event -> {
+        if (fired.compareAndSet(false, true)) {
+          eventRef.set(event);
+          long currentTimeNanos = timeSource.getTime();
+          long eventTimeNanos = event.getEventTime();
+          long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+          if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+            fail("NodeAddedListener was fired before the configured waitFor period: currentTimeNanos=" + currentTimeNanos + ", eventTimeNanos=" +  eventTimeNanos + ",waitForNanos=" + waitForNanos);
+          }
+        } else {
+          fail("NodeAddedTrigger was fired more than once!");
+        }
+        return true;
+      });
+      newTrigger.restoreState(trigger); // restore state from the old trigger
+      int counter = 0;
+      do {
+        newTrigger.run();
+        timeSource.sleep(1000);
+        if (counter++ > 10) {
+          fail("Newly added node was not discovered by trigger even after 10 seconds");
+        }
+      } while (!fired.get());
+
+      // ensure the event was fired
+      assertTrue(fired.get());
+      TriggerEvent nodeAddedEvent = eventRef.get();
+      assertNotNull(nodeAddedEvent);
+      //TODO assertEquals("", newNode.getNodeName(), nodeAddedEvent.getProperty(NodeAddedTrigger.NodeAddedEvent.NODE_NAME));
+    }
+  }
+
+  private Map<String, Object> createTriggerProps(long waitForSeconds) {
+    Map<String, Object> props = new HashMap<>();
+    props.put("event", "nodeLost");
+    props.put("waitFor", waitForSeconds);
+    props.put("enabled", true);
+    List<Map<String, String>> actions = new ArrayList<>(3);
+    Map<String, String> map = new HashMap<>(2);
+    map.put("name", "compute_plan");
+    map.put("class", "solr.ComputePlanAction");
+    actions.add(map);
+    map = new HashMap<>(2);
+    map.put("name", "execute_plan");
+    map.put("class", "solr.ExecutePlanAction");
+    actions.add(map);
+    props.put("actions", actions);
+    return props;
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9e1c2490/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeLostTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeLostTrigger.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeLostTrigger.java
new file mode 100644
index 0000000..18ee355
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeLostTrigger.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.cloud.autoscaling.ActionContext;
+import org.apache.solr.cloud.autoscaling.AutoScaling;
+import org.apache.solr.cloud.autoscaling.NodeLostTrigger;
+import org.apache.solr.cloud.autoscaling.TriggerAction;
+import org.apache.solr.cloud.autoscaling.TriggerEvent;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.core.CoreContainer;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test for {@link NodeLostTrigger}
+ */
+public class TestNodeLostTrigger extends SimSolrCloudTestCase {
+  private static AtomicBoolean actionConstructorCalled = new AtomicBoolean(false);
+  private static AtomicBoolean actionInitCalled = new AtomicBoolean(false);
+  private static AtomicBoolean actionCloseCalled = new AtomicBoolean(false);
+
+  private AutoScaling.TriggerEventProcessor noFirstRunProcessor = event -> {
+    fail("Did not expect the listener to fire on first run!");
+    return true;
+  };
+
+  private static final int SPEED = 50;
+  // use the same time source as the trigger
+  private static TimeSource timeSource;
+  // currentTimeMillis is not as precise so to avoid false positives while comparing time of fire, we add some delta
+  private static final long WAIT_FOR_DELTA_NANOS = TimeUnit.MILLISECONDS.toNanos(5);
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    cluster = SimCloudManager.createCluster(5, TimeSource.get("simTime:" + SPEED));
+    timeSource = cluster.getTimeSource();
+  }
+
+  @Before
+  public void beforeTest() throws Exception {
+    actionConstructorCalled = new AtomicBoolean(false);
+    actionInitCalled = new AtomicBoolean(false);
+    actionCloseCalled = new AtomicBoolean(false);
+  }
+
+  @Test
+  public void testTrigger() throws Exception {
+    long waitForSeconds = 1 + random().nextInt(5);
+    Map<String, Object> props = createTriggerProps(waitForSeconds);
+
+    try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, cluster.getLoader(), cluster)) {
+      trigger.setProcessor(noFirstRunProcessor);
+      trigger.run();
+      Iterator<String> it = cluster.getLiveNodesSet().get().iterator();
+      String lostNodeName1 = it.next();
+      String lostNodeName2 = it.next();
+      cluster.simRemoveNode(lostNodeName1, true);
+      cluster.simRemoveNode(lostNodeName2, true);
+      timeSource.sleep(1000);
+
+      AtomicBoolean fired = new AtomicBoolean(false);
+      AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
+      trigger.setProcessor(event -> {
+        if (fired.compareAndSet(false, true)) {
+          eventRef.set(event);
+          long currentTimeNanos = timeSource.getTime();
+          long eventTimeNanos = event.getEventTime();
+          long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+          if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+            fail("NodeLostListener was fired before the configured waitFor period: currentTimeNanos=" + currentTimeNanos + ", eventTimeNanos=" +  eventTimeNanos + ",waitForNanos=" + waitForNanos);
+          }
+        } else {
+          fail("NodeLostListener was fired more than once!");
+        }
+        return true;
+      });
+      int counter = 0;
+      do {
+        trigger.run();
+        timeSource.sleep(1000);
+        if (counter++ > 10) {
+          fail("Lost node was not discovered by trigger even after 10 seconds");
+        }
+      } while (!fired.get());
+
+      TriggerEvent nodeLostEvent = eventRef.get();
+      assertNotNull(nodeLostEvent);
+      List<String> nodeNames = (List<String>)nodeLostEvent.getProperty(TriggerEvent.NODE_NAMES);
+      assertTrue(nodeNames + " doesn't contain " + lostNodeName1, nodeNames.contains(lostNodeName1));
+      assertTrue(nodeNames + " doesn't contain " + lostNodeName2, nodeNames.contains(lostNodeName2));
+
+    }
+
+    // remove a node but add it back before the waitFor period expires
+    // and assert that the trigger doesn't fire at all
+    try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, cluster.getLoader(), cluster)) {
+      final long waitTime = 2;
+      props.put("waitFor", waitTime);
+      trigger.setProcessor(noFirstRunProcessor);
+      trigger.run();
+
+      String lostNode = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+      cluster.simRemoveNode(lostNode, false);
+      AtomicBoolean fired = new AtomicBoolean(false);
+      trigger.setProcessor(event -> {
+        if (fired.compareAndSet(false, true)) {
+          long currentTimeNanos = timeSource.getTime();
+          long eventTimeNanos = event.getEventTime();
+          long waitForNanos = TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+          if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+            fail("NodeLostListener was fired before the configured waitFor period: currentTimeNanos=" + currentTimeNanos + ", eventTimeNanos=" +  eventTimeNanos + ",waitForNanos=" + waitForNanos);
+          }
+        } else {
+          fail("NodeLostListener was fired more than once!");
+        }
+        return true;
+      });
+      trigger.run(); // first run should detect the lost node
+      int counter = 0;
+      do {
+        if (cluster.getLiveNodesSet().get().size() == 2) {
+          break;
+        }
+        timeSource.sleep(100);
+        if (counter++ > 20) {
+          fail("Live nodes not updated!");
+        }
+      } while (true);
+      counter = 0;
+      cluster.getSimClusterStateProvider().simRestoreNode(lostNode);
+      do {
+        trigger.run();
+        timeSource.sleep(1000);
+        if (counter++ > waitTime + 1) { // run it a little more than the wait time
+          break;
+        }
+      } while (true);
+
+      // ensure the event was not fired
+      assertFalse(fired.get());
+    }
+  }
+
+  public void testActionLifecycle() throws Exception {
+    Map<String, Object> props = createTriggerProps(0);
+    List<Map<String, String>> actions = (List<Map<String, String>>) props.get("actions");
+    Map<String, String> action = new HashMap<>(2);
+    action.put("name", "testActionInit");
+    action.put("class", AssertInitTriggerAction.class.getName());
+    actions.add(action);
+    try (NodeLostTrigger trigger = new NodeLostTrigger("node_added_trigger", props, cluster.getLoader(), cluster)) {
+      assertEquals(true, actionConstructorCalled.get());
+      assertEquals(false, actionInitCalled.get());
+      assertEquals(false, actionCloseCalled.get());
+      trigger.init();
+      assertEquals(true, actionInitCalled.get());
+      assertEquals(false, actionCloseCalled.get());
+    }
+    assertEquals(true, actionCloseCalled.get());
+  }
+
+  public static class AssertInitTriggerAction implements TriggerAction  {
+    public AssertInitTriggerAction() {
+      actionConstructorCalled.set(true);
+    }
+
+    @Override
+    public String getName() {
+      return "";
+    }
+
+    @Override
+    public void process(TriggerEvent event, ActionContext actionContext) {
+
+    }
+
+    @Override
+    public void close() throws IOException {
+      actionCloseCalled.compareAndSet(false, true);
+    }
+
+    @Override
+    public void init(Map<String, String> args) {
+      actionInitCalled.compareAndSet(false, true);
+    }
+  }
+
+  @Test
+  public void testListenerAcceptance() throws Exception {
+    Map<String, Object> props = createTriggerProps(0);
+    try (NodeLostTrigger trigger = new NodeLostTrigger("node_added_trigger", props, cluster.getLoader(), cluster)) {
+      trigger.setProcessor(noFirstRunProcessor);
+
+      String newNode = cluster.simAddNode();
+
+      trigger.run(); // starts tracking live nodes
+
+      // stop the newly created node
+      cluster.simRemoveNode(newNode, true);
+
+      AtomicInteger callCount = new AtomicInteger(0);
+      AtomicBoolean fired = new AtomicBoolean(false);
+
+      trigger.setProcessor(event -> {
+        if (callCount.incrementAndGet() < 2) {
+          return false;
+        } else  {
+          fired.compareAndSet(false, true);
+          return true;
+        }
+      });
+
+      trigger.run(); // first run should detect the lost node and fire immediately but listener isn't ready
+      assertEquals(1, callCount.get());
+      assertFalse(fired.get());
+      trigger.run(); // second run should again fire
+      assertEquals(2, callCount.get());
+      assertTrue(fired.get());
+      trigger.run(); // should not fire
+      assertEquals(2, callCount.get());
+    }
+  }
+
+  @Test
+  public void testRestoreState() throws Exception {
+    long waitForSeconds = 1 + random().nextInt(5);
+    Map<String, Object> props = createTriggerProps(waitForSeconds);
+
+    String newNode = cluster.simAddNode();
+
+    // remove a node but update the trigger before the waitFor period expires
+    // and assert that the new trigger still fires
+
+    NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, cluster.getLoader(), cluster);
+    trigger.setProcessor(noFirstRunProcessor);
+    trigger.run();
+
+    // stop the newly created node
+    cluster.simRemoveNode(newNode, true);
+
+    trigger.run(); // this run should detect the lost node
+    trigger.close(); // close the old trigger
+
+    try (NodeLostTrigger newTrigger = new NodeLostTrigger("some_different_name", props, cluster.getLoader(), cluster))  {
+      try {
+        newTrigger.restoreState(trigger);
+        fail("Trigger should only be able to restore state from an old trigger of the same name");
+      } catch (AssertionError e) {
+        // expected
+      }
+    }
+
+    try (NodeLostTrigger newTrigger = new NodeLostTrigger("node_lost_trigger", props, cluster.getLoader(), cluster)) {
+      AtomicBoolean fired = new AtomicBoolean(false);
+      AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
+      newTrigger.setProcessor(event -> {
+        if (fired.compareAndSet(false, true)) {
+          eventRef.set(event);
+          long currentTimeNanos = timeSource.getTime();
+          long eventTimeNanos = event.getEventTime();
+          long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+          if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+            fail("NodeLostListener was fired before the configured waitFor period: currentTimeNanos=" + currentTimeNanos + ", eventTimeNanos=" + eventTimeNanos + ",waitForNanos=" + waitForNanos);
+          }
+        } else {
+          fail("NodeLostListener was fired more than once!");
+        }
+        return true;
+      });
+      newTrigger.restoreState(trigger); // restore state from the old trigger
+      int counter = 0;
+      do {
+        newTrigger.run();
+        timeSource.sleep(1000);
+        if (counter++ > 10) {
+          fail("Lost node was not discovered by trigger even after 10 seconds");
+        }
+      } while (!fired.get());
+
+      TriggerEvent nodeLostEvent = eventRef.get();
+      assertNotNull(nodeLostEvent);
+      List<String> nodeNames = (List<String>)nodeLostEvent.getProperty(TriggerEvent.NODE_NAMES);
+      assertTrue(nodeNames.contains(newNode));
+    }
+  }
+
+  private Map<String, Object> createTriggerProps(long waitForSeconds) {
+    Map<String, Object> props = new HashMap<>();
+    props.put("event", "nodeLost");
+    props.put("waitFor", waitForSeconds);
+    props.put("enabled", true);
+    List<Map<String, String>> actions = new ArrayList<>(3);
+    Map<String, String> map = new HashMap<>(2);
+    map.put("name", "compute_plan");
+    map.put("class", "solr.ComputePlanAction");
+    actions.add(map);
+    map = new HashMap<>(2);
+    map.put("name", "execute_plan");
+    map.put("class", "solr.ExecutePlanAction");
+    actions.add(map);
+    props.put("actions", actions);
+    return props;
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9e1c2490/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
index 179af76..e05b789 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
@@ -20,9 +20,11 @@ package org.apache.solr.cloud.autoscaling.sim;
 import java.lang.invoke.MethodHandles;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -37,14 +39,17 @@ import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
 import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
 import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
 import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.cloud.autoscaling.ActionContext;
 import org.apache.solr.cloud.autoscaling.NodeLostTrigger;
 import org.apache.solr.cloud.autoscaling.ScheduledTriggers;
 import org.apache.solr.cloud.autoscaling.TriggerActionBase;
 import org.apache.solr.cloud.autoscaling.TriggerEvent;
+import org.apache.solr.cloud.autoscaling.TriggerEventQueue;
 import org.apache.solr.cloud.autoscaling.TriggerListenerBase;
 import org.apache.solr.cloud.autoscaling.CapturedEvent;
+import org.apache.solr.common.cloud.LiveNodesListener;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.TimeSource;
@@ -602,12 +607,10 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
 
   public static long eventQueueActionWait = 5000;
 
-  // simulation framework doesn't support overseer
-  /*
   @Test
   public void testEventQueue() throws Exception {
     waitForSeconds = 1;
-    CloudSolrClient solrClient = cluster.getSolrClient();
+    SolrClient solrClient = cluster.simGetSolrClient();
     String setTriggerCommand = "{" +
         "'set-trigger' : {" +
         "'name' : 'node_added_trigger1'," +
@@ -616,56 +619,49 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
         "'enabled' : true," +
         "'actions' : [{'name':'test','class':'" + TestEventQueueAction.class.getName() + "'}]" +
         "}}";
-    NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
-    String overseerLeader = (String) overSeerStatus.get("leader");
-    int overseerLeaderIndex = 0;
-    for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
-      JettySolrRunner jetty = cluster.getJettySolrRunner(i);
-      if (jetty.getNodeName().equals(overseerLeader)) {
-        overseerLeaderIndex = i;
-        break;
-      }
-    }
+
+    String overseerLeader = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+
     SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
     NamedList<Object> response = solrClient.request(req);
     assertEquals(response.get("result").toString(), "success");
 
-    if (!actionInitCalled.await(3, TimeUnit.SECONDS))  {
+    if (!actionInitCalled.await(3000 / SPEED, TimeUnit.MILLISECONDS))  {
       fail("The TriggerAction should have been created by now");
     }
 
     // add node to generate the event
-    JettySolrRunner newNode = cluster.startJettySolrRunner();
-    boolean await = actionStarted.await(60, TimeUnit.SECONDS);
+    String newNode = cluster.simAddNode();
+    boolean await = actionStarted.await(60000 / SPEED, TimeUnit.MILLISECONDS);
     assertTrue("action did not start", await);
     // event should be there
-    NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
+    TriggerEvent nodeAddedEvent = events.iterator().next();
     assertNotNull(nodeAddedEvent);
     // but action did not complete yet so the event is still enqueued
     assertFalse(triggerFired.get());
     events.clear();
     actionStarted = new CountDownLatch(1);
     eventQueueActionWait = 1;
-    // kill overseer leader
-    cluster.stopJettySolrRunner(overseerLeaderIndex);
-    Thread.sleep(5000);
+    // kill overseer
+    cluster.simRestartOverseer(overseerLeader);
+    cluster.getTimeSource().sleep(5000);
     // new overseer leader should be elected and run triggers
-    await = actionInterrupted.await(3, TimeUnit.SECONDS);
+    await = actionInterrupted.await(3000 / SPEED, TimeUnit.MILLISECONDS);
     assertTrue("action wasn't interrupted", await);
     // it should fire again from enqueued event
-    await = actionStarted.await(60, TimeUnit.SECONDS);
+    await = actionStarted.await(60000 / SPEED, TimeUnit.MILLISECONDS);
     assertTrue("action wasn't started", await);
     TriggerEvent replayedEvent = events.iterator().next();
     assertTrue(replayedEvent.getProperty(TriggerEventQueue.ENQUEUE_TIME) != null);
     assertTrue(events + "\n" + replayedEvent.toString(), replayedEvent.getProperty(TriggerEventQueue.DEQUEUE_TIME) != null);
-    await = actionCompleted.await(10, TimeUnit.SECONDS);
+    await = actionCompleted.await(10000 / SPEED, TimeUnit.MILLISECONDS);
     assertTrue("action wasn't completed", await);
     assertTrue(triggerFired.get());
   }
 
   @Test
   public void testEventFromRestoredState() throws Exception {
-    CloudSolrClient solrClient = cluster.getSolrClient();
+    SolrClient solrClient = cluster.simGetSolrClient();
     String setTriggerCommand = "{" +
         "'set-trigger' : {" +
         "'name' : 'node_added_trigger'," +
@@ -678,45 +674,33 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
     NamedList<Object> response = solrClient.request(req);
     assertEquals(response.get("result").toString(), "success");
 
-    if (!actionInitCalled.await(10, TimeUnit.SECONDS))  {
+    if (!actionInitCalled.await(10000 / SPEED, TimeUnit.MILLISECONDS))  {
       fail("The TriggerAction should have been created by now");
     }
 
-    NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
-    String overseerLeader = (String) overSeerStatus.get("leader");
-    int overseerLeaderIndex = 0;
-    for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
-      JettySolrRunner jetty = cluster.getJettySolrRunner(i);
-      if (jetty.getNodeName().equals(overseerLeader)) {
-        overseerLeaderIndex = i;
-        break;
-      }
-    }
-
     events.clear();
 
-    JettySolrRunner newNode = cluster.startJettySolrRunner();
-    boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+    String newNode = cluster.simAddNode();
+    boolean await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
     assertTrue("The trigger did not fire at all", await);
     assertTrue(triggerFired.get());
     // reset
     triggerFired.set(false);
     triggerFiredLatch = new CountDownLatch(1);
-    NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
+    TriggerEvent nodeAddedEvent = events.iterator().next();
     assertNotNull(nodeAddedEvent);
     List<String> nodeNames = (List<String>)nodeAddedEvent.getProperty(TriggerEvent.NODE_NAMES);
-    assertTrue(nodeNames.contains(newNode.getNodeName()));
+    assertTrue(nodeNames.contains(newNode));
     // add a second node - state of the trigger will change but it won't fire for waitFor sec.
-    JettySolrRunner newNode2 = cluster.startJettySolrRunner();
-    Thread.sleep(10000);
-    // kill overseer leader
-    cluster.stopJettySolrRunner(overseerLeaderIndex);
-    await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+    String newNode2 = cluster.simAddNode();
+    cluster.getTimeSource().sleep(10000);
+    // kill overseer
+    cluster.simRestartOverseer(null);
+    await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
     assertTrue("The trigger did not fire at all", await);
     assertTrue(triggerFired.get());
   }
 
-
   private static class TestLiveNodesListener implements LiveNodesListener {
     Set<String> lostNodes = new HashSet<>();
     Set<String> addedNodes = new HashSet<>();
@@ -745,7 +729,7 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
 
   private TestLiveNodesListener registerLiveNodesListener() {
     TestLiveNodesListener listener = new TestLiveNodesListener();
-    zkStateReader.registerLiveNodesListener(listener);
+    cluster.getLiveNodesSet().registerLiveNodesListener(listener);
     return listener;
   }
 
@@ -789,47 +773,42 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
     triggerFiredLatch = new CountDownLatch(2);
     TestLiveNodesListener listener = registerLiveNodesListener();
 
-    NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
-    String overseerLeader = (String) overSeerStatus.get("leader");
-    int overseerLeaderIndex = 0;
-    for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
-      JettySolrRunner jetty = cluster.getJettySolrRunner(i);
-      if (jetty.getNodeName().equals(overseerLeader)) {
-        overseerLeaderIndex = i;
-        break;
-      }
-    }
+    SolrClient solrClient = cluster.simGetSolrClient();
+
+    // pick overseer node
+    String overseerLeader = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+
     // add a node
-    JettySolrRunner node = cluster.startJettySolrRunner();
-    if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
+    String node = cluster.simAddNode();
+    if (!listener.onChangeLatch.await(10000 / SPEED, TimeUnit.MILLISECONDS)) {
       fail("onChange listener didn't execute on cluster change");
     }
     assertEquals(1, listener.addedNodes.size());
-    assertEquals(node.getNodeName(), listener.addedNodes.iterator().next());
+    assertEquals(node, listener.addedNodes.iterator().next());
     // verify that a znode doesn't exist (no trigger)
-    String pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node.getNodeName();
-    assertFalse("Path " + pathAdded + " was created but there are no nodeAdded triggers", zkClient().exists(pathAdded, true));
+    String pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node;
+    assertFalse("Path " + pathAdded + " was created but there are no nodeAdded triggers",
+        cluster.getDistribStateManager().hasData(pathAdded));
     listener.reset();
     // stop overseer
     log.info("====== KILL OVERSEER 1");
-    cluster.stopJettySolrRunner(overseerLeaderIndex);
-    if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
+    cluster.simRestartOverseer(overseerLeader);
+    if (!listener.onChangeLatch.await(10000 / SPEED, TimeUnit.MILLISECONDS)) {
       fail("onChange listener didn't execute on cluster change");
     }
     assertEquals(1, listener.lostNodes.size());
     assertEquals(overseerLeader, listener.lostNodes.iterator().next());
     assertEquals(0, listener.addedNodes.size());
     // wait until the new overseer is up
-    Thread.sleep(5000);
+    cluster.getTimeSource().sleep(5000);
     // verify that a znode does NOT exist - there's no nodeLost trigger,
     // so the new overseer cleaned up existing nodeLost markers
     String pathLost = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + overseerLeader;
-    assertFalse("Path " + pathLost + " exists", zkClient().exists(pathLost, true));
+    assertFalse("Path " + pathLost + " exists", cluster.getDistribStateManager().hasData(pathLost));
 
     listener.reset();
 
     // set up triggers
-    CloudSolrClient solrClient = cluster.getSolrClient();
 
     log.info("====== ADD TRIGGERS");
     String setTriggerCommand = "{" +
@@ -856,45 +835,37 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
     response = solrClient.request(req);
     assertEquals(response.get("result").toString(), "success");
 
-    overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
-    overseerLeader = (String) overSeerStatus.get("leader");
-    overseerLeaderIndex = 0;
-    for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
-      JettySolrRunner jetty = cluster.getJettySolrRunner(i);
-      if (jetty.getNodeName().equals(overseerLeader)) {
-        overseerLeaderIndex = i;
-        break;
-      }
-    }
+    overseerLeader = cluster.getSimClusterStateProvider().simGetRandomNode(random());
 
     // create another node
     log.info("====== ADD NODE 1");
-    JettySolrRunner node1 = cluster.startJettySolrRunner();
-    if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
+    String node1 = cluster.simAddNode();
+    if (!listener.onChangeLatch.await(10000 / SPEED, TimeUnit.MILLISECONDS)) {
       fail("onChange listener didn't execute on cluster change");
     }
     assertEquals(1, listener.addedNodes.size());
-    assertEquals(node1.getNodeName(), listener.addedNodes.iterator().next());
+    assertEquals(node1, listener.addedNodes.iterator().next());
     // verify that a znode exists
-    pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node1.getNodeName();
-    assertTrue("Path " + pathAdded + " wasn't created", zkClient().exists(pathAdded, true));
+    pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node1;
+    assertTrue("Path " + pathAdded + " wasn't created", cluster.getDistribStateManager().hasData(pathAdded));
 
-    Thread.sleep(5000);
+    cluster.getTimeSource().sleep(5000);
     // nodeAdded marker should be consumed now by nodeAdded trigger
-    assertFalse("Path " + pathAdded + " should have been deleted", zkClient().exists(pathAdded, true));
+    assertFalse("Path " + pathAdded + " should have been deleted",
+        cluster.getDistribStateManager().hasData(pathAdded));
 
     listener.reset();
     events.clear();
     triggerFiredLatch = new CountDownLatch(1);
     // kill overseer again
     log.info("====== KILL OVERSEER 2");
-    cluster.stopJettySolrRunner(overseerLeaderIndex);
-    if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
+    cluster.simRestartOverseer(overseerLeader);
+    if (!listener.onChangeLatch.await(10000 / SPEED, TimeUnit.MILLISECONDS)) {
       fail("onChange listener didn't execute on cluster change");
     }
 
 
-    if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) {
+    if (!triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS)) {
       fail("Trigger should have fired by now");
     }
     assertEquals(1, events.size());
@@ -904,7 +875,6 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
     assertEquals(TriggerEventType.NODELOST, ev.getEventType());
   }
 
-*/
   static Map<String, List<CapturedEvent>> listenerEvents = new ConcurrentHashMap<>();
   static CountDownLatch listenerCreated = new CountDownLatch(1);
   static boolean failDummyAction = false;