You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/12/14 11:56:50 UTC

[3/9] lucene-solr:master: SOLR-11285: Simulation framework for autoscaling.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeAddedTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeAddedTrigger.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeAddedTrigger.java
new file mode 100644
index 0000000..9b4e2bc
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeAddedTrigger.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.solr.cloud.autoscaling.ActionContext;
+import org.apache.solr.cloud.autoscaling.AutoScaling;
+import org.apache.solr.cloud.autoscaling.NodeAddedTrigger;
+import org.apache.solr.cloud.autoscaling.TriggerAction;
+import org.apache.solr.cloud.autoscaling.TriggerEvent;
+import org.apache.solr.common.util.TimeSource;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test for {@link NodeAddedTrigger}
+ */
+public class TestNodeAddedTrigger extends SimSolrCloudTestCase {
+  private static AtomicBoolean actionConstructorCalled = new AtomicBoolean(false);
+  private static AtomicBoolean actionInitCalled = new AtomicBoolean(false);
+  private static AtomicBoolean actionCloseCalled = new AtomicBoolean(false);
+
+  private AutoScaling.TriggerEventProcessor noFirstRunProcessor = event -> {
+    fail("Did not expect the listener to fire on first run!");
+    return true;
+  };
+
+  private static int SPEED = 50;
+
+  // currentTimeMillis is not as precise so to avoid false positives while comparing time of fire, we add some delta
+  private static final long WAIT_FOR_DELTA_NANOS = TimeUnit.MILLISECONDS.toNanos(2);
+
+  private static TimeSource timeSource;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    configureCluster(1, TimeSource.get("simTime:" + SPEED));
+    timeSource = cluster.getTimeSource();
+  }
+
+  @Before
+  public void beforeTest() throws Exception {
+    actionConstructorCalled = new AtomicBoolean(false);
+    actionInitCalled = new AtomicBoolean(false);
+    actionCloseCalled = new AtomicBoolean(false);
+  }
+
+  @Test
+  public void testTrigger() throws Exception {
+    long waitForSeconds = 1 + random().nextInt(5);
+    Map<String, Object> props = createTriggerProps(waitForSeconds);
+
+    try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, cluster.getLoader(), cluster)) {
+      trigger.setProcessor(noFirstRunProcessor);
+      trigger.run();
+
+      String newNode1 = cluster.simAddNode();
+      String newNode2 = cluster.simAddNode();
+      AtomicBoolean fired = new AtomicBoolean(false);
+      AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
+      trigger.setProcessor(event -> {
+        if (fired.compareAndSet(false, true)) {
+          eventRef.set(event);
+          long currentTimeNanos = timeSource.getTime();
+          long eventTimeNanos = event.getEventTime();
+          long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+          if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+            fail("NodeAddedListener was fired before the configured waitFor period: currentTimeNanos=" + currentTimeNanos + ", eventTimeNanos=" +  eventTimeNanos + ",waitForNanos=" + waitForNanos);
+          }
+        } else {
+          fail("NodeAddedTrigger was fired more than once!");
+        }
+        return true;
+      });
+      int counter = 0;
+      do {
+        trigger.run();
+        timeSource.sleep(1000);
+        if (counter++ > 10) {
+          fail("Newly added node was not discovered by trigger even after 10 seconds");
+        }
+      } while (!fired.get());
+
+      TriggerEvent nodeAddedEvent = eventRef.get();
+      assertNotNull(nodeAddedEvent);
+      List<String> nodeNames = (List<String>)nodeAddedEvent.getProperty(TriggerEvent.NODE_NAMES);
+      assertTrue(nodeNames.contains(newNode1));
+      assertTrue(nodeNames.contains(newNode2));
+    }
+
+    // add a new node but remove it before the waitFor period expires
+    // and assert that the trigger doesn't fire at all
+    try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, cluster.getLoader(), cluster)) {
+      final long waitTime = 2;
+      props.put("waitFor", waitTime);
+      trigger.setProcessor(noFirstRunProcessor);
+      trigger.run();
+
+      String newNode = cluster.simAddNode();
+      AtomicBoolean fired = new AtomicBoolean(false);
+      trigger.setProcessor(event -> {
+        if (fired.compareAndSet(false, true)) {
+          long currentTimeNanos = timeSource.getTime();
+          long eventTimeNanos = event.getEventTime();
+          long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+          if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+            fail("NodeAddedListener was fired before the configured waitFor period: currentTimeNanos=" + currentTimeNanos + ", eventTimeNanos=" +  eventTimeNanos + ",waitForNanos=" + waitForNanos);
+          }
+        } else {
+          fail("NodeAddedTrigger was fired more than once!");
+        }
+        return true;
+      });
+      trigger.run(); // first run should detect the new node
+      cluster.simRemoveNode(newNode, true);
+      int counter = 0;
+      do {
+        trigger.run();
+        timeSource.sleep(1000);
+        if (counter++ > waitTime + 1) { // run it a little more than the wait time
+          break;
+        }
+      } while (true);
+
+      // ensure the event was not fired
+      assertFalse(fired.get());
+    }
+  }
+
+  public void testActionLifecycle() throws Exception {
+    Map<String, Object> props = createTriggerProps(0);
+    List<Map<String, String>> actions = (List<Map<String, String>>) props.get("actions");
+    Map<String, String> action = new HashMap<>(2);
+    action.put("name", "testActionInit");
+    action.put("class", TestNodeAddedTrigger.AssertInitTriggerAction.class.getName());
+    actions.add(action);
+    try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, cluster.getLoader(), cluster)) {
+      assertEquals(true, actionConstructorCalled.get());
+      assertEquals(false, actionInitCalled.get());
+      assertEquals(false, actionCloseCalled.get());
+      trigger.init();
+      assertEquals(true, actionInitCalled.get());
+      assertEquals(false, actionCloseCalled.get());
+    }
+    assertEquals(true, actionCloseCalled.get());
+  }
+
+  public static class AssertInitTriggerAction implements TriggerAction  {
+    public AssertInitTriggerAction() {
+      actionConstructorCalled.set(true);
+    }
+
+    @Override
+    public String getName() {
+      return "";
+    }
+
+    @Override
+    public void process(TriggerEvent event, ActionContext actionContext) {
+
+    }
+
+    @Override
+    public void close() throws IOException {
+      actionCloseCalled.compareAndSet(false, true);
+    }
+
+    @Override
+    public void init(Map<String, String> args) {
+      actionInitCalled.compareAndSet(false, true);
+    }
+  }
+
+  @Test
+  public void testListenerAcceptance() throws Exception {
+    Map<String, Object> props = createTriggerProps(0);
+    try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, cluster.getLoader(), cluster)) {
+      trigger.setProcessor(noFirstRunProcessor);
+      trigger.run(); // starts tracking live nodes
+
+      String newNode = cluster.simAddNode();
+      AtomicInteger callCount = new AtomicInteger(0);
+      AtomicBoolean fired = new AtomicBoolean(false);
+
+      trigger.setProcessor(event -> {
+        if (callCount.incrementAndGet() < 2) {
+          return false;
+        } else  {
+          fired.compareAndSet(false, true);
+          return true;
+        }
+      });
+
+      trigger.run(); // first run should detect the new node and fire immediately but listener isn't ready
+      assertEquals(1, callCount.get());
+      assertFalse(fired.get());
+      trigger.run(); // second run should again fire
+      assertEquals(2, callCount.get());
+      assertTrue(fired.get());
+      trigger.run(); // should not fire
+      assertEquals(2, callCount.get());
+    }
+  }
+
+  @Test
+  public void testRestoreState() throws Exception {
+    long waitForSeconds = 1 + random().nextInt(5);
+    Map<String, Object> props = createTriggerProps(waitForSeconds);
+
+    // add a new node but update the trigger before the waitFor period expires
+    // and assert that the new trigger still fires
+    NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, cluster.getLoader(), cluster);
+    trigger.setProcessor(noFirstRunProcessor);
+    trigger.run();
+
+    String newNode = cluster.simAddNode();
+    trigger.run(); // this run should detect the new node
+    trigger.close(); // close the old trigger
+
+    try (NodeAddedTrigger newTrigger = new NodeAddedTrigger("some_different_name", props, cluster.getLoader(), cluster))  {
+      try {
+        newTrigger.restoreState(trigger);
+        fail("Trigger should only be able to restore state from an old trigger of the same name");
+      } catch (AssertionError e) {
+        // expected
+      }
+    }
+
+    try (NodeAddedTrigger newTrigger = new NodeAddedTrigger("node_added_trigger", props, cluster.getLoader(), cluster))  {
+      AtomicBoolean fired = new AtomicBoolean(false);
+      AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
+      newTrigger.setProcessor(event -> {
+        if (fired.compareAndSet(false, true)) {
+          eventRef.set(event);
+          long currentTimeNanos = timeSource.getTime();
+          long eventTimeNanos = event.getEventTime();
+          long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+          if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+            fail("NodeAddedListener was fired before the configured waitFor period: currentTimeNanos=" + currentTimeNanos + ", eventTimeNanos=" +  eventTimeNanos + ",waitForNanos=" + waitForNanos);
+          }
+        } else {
+          fail("NodeAddedTrigger was fired more than once!");
+        }
+        return true;
+      });
+      newTrigger.restoreState(trigger); // restore state from the old trigger
+      int counter = 0;
+      do {
+        newTrigger.run();
+        timeSource.sleep(1000);
+        if (counter++ > 10) {
+          fail("Newly added node was not discovered by trigger even after 10 seconds");
+        }
+      } while (!fired.get());
+
+      // ensure the event was fired
+      assertTrue(fired.get());
+      TriggerEvent nodeAddedEvent = eventRef.get();
+      assertNotNull(nodeAddedEvent);
+      //TODO assertEquals("", newNode.getNodeName(), nodeAddedEvent.getProperty(NodeAddedTrigger.NodeAddedEvent.NODE_NAME));
+    }
+  }
+
+  private Map<String, Object> createTriggerProps(long waitForSeconds) {
+    Map<String, Object> props = new HashMap<>();
+    props.put("event", "nodeLost");
+    props.put("waitFor", waitForSeconds);
+    props.put("enabled", true);
+    List<Map<String, String>> actions = new ArrayList<>(3);
+    Map<String, String> map = new HashMap<>(2);
+    map.put("name", "compute_plan");
+    map.put("class", "solr.ComputePlanAction");
+    actions.add(map);
+    map = new HashMap<>(2);
+    map.put("name", "execute_plan");
+    map.put("class", "solr.ExecutePlanAction");
+    actions.add(map);
+    props.put("actions", actions);
+    return props;
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeLostTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeLostTrigger.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeLostTrigger.java
new file mode 100644
index 0000000..109cee3
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeLostTrigger.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.solr.cloud.autoscaling.ActionContext;
+import org.apache.solr.cloud.autoscaling.AutoScaling;
+import org.apache.solr.cloud.autoscaling.NodeLostTrigger;
+import org.apache.solr.cloud.autoscaling.TriggerAction;
+import org.apache.solr.cloud.autoscaling.TriggerEvent;
+import org.apache.solr.common.util.TimeSource;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test for {@link NodeLostTrigger}
+ */
+public class TestNodeLostTrigger extends SimSolrCloudTestCase {
+  private static AtomicBoolean actionConstructorCalled = new AtomicBoolean(false);
+  private static AtomicBoolean actionInitCalled = new AtomicBoolean(false);
+  private static AtomicBoolean actionCloseCalled = new AtomicBoolean(false);
+
+  private AutoScaling.TriggerEventProcessor noFirstRunProcessor = event -> {
+    fail("Did not expect the listener to fire on first run!");
+    return true;
+  };
+
+  private static final int SPEED = 50;
+  // use the same time source as the trigger
+  private static TimeSource timeSource;
+  // currentTimeMillis is not as precise so to avoid false positives while comparing time of fire, we add some delta
+  private static final long WAIT_FOR_DELTA_NANOS = TimeUnit.MILLISECONDS.toNanos(5);
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    configureCluster(5, TimeSource.get("simTime:" + SPEED));
+    timeSource = cluster.getTimeSource();
+  }
+
+  @Before
+  public void beforeTest() throws Exception {
+    actionConstructorCalled = new AtomicBoolean(false);
+    actionInitCalled = new AtomicBoolean(false);
+    actionCloseCalled = new AtomicBoolean(false);
+  }
+
+  @Test
+  public void testTrigger() throws Exception {
+    long waitForSeconds = 1 + random().nextInt(5);
+    Map<String, Object> props = createTriggerProps(waitForSeconds);
+
+    try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, cluster.getLoader(), cluster)) {
+      trigger.setProcessor(noFirstRunProcessor);
+      trigger.run();
+      Iterator<String> it = cluster.getLiveNodesSet().get().iterator();
+      String lostNodeName1 = it.next();
+      String lostNodeName2 = it.next();
+      cluster.simRemoveNode(lostNodeName1, true);
+      cluster.simRemoveNode(lostNodeName2, true);
+      timeSource.sleep(1000);
+
+      AtomicBoolean fired = new AtomicBoolean(false);
+      AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
+      trigger.setProcessor(event -> {
+        if (fired.compareAndSet(false, true)) {
+          eventRef.set(event);
+          long currentTimeNanos = timeSource.getTime();
+          long eventTimeNanos = event.getEventTime();
+          long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+          if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+            fail("NodeLostListener was fired before the configured waitFor period: currentTimeNanos=" + currentTimeNanos + ", eventTimeNanos=" +  eventTimeNanos + ",waitForNanos=" + waitForNanos);
+          }
+        } else {
+          fail("NodeLostListener was fired more than once!");
+        }
+        return true;
+      });
+      int counter = 0;
+      do {
+        trigger.run();
+        timeSource.sleep(1000);
+        if (counter++ > 10) {
+          fail("Lost node was not discovered by trigger even after 10 seconds");
+        }
+      } while (!fired.get());
+
+      TriggerEvent nodeLostEvent = eventRef.get();
+      assertNotNull(nodeLostEvent);
+      List<String> nodeNames = (List<String>)nodeLostEvent.getProperty(TriggerEvent.NODE_NAMES);
+      assertTrue(nodeNames + " doesn't contain " + lostNodeName1, nodeNames.contains(lostNodeName1));
+      assertTrue(nodeNames + " doesn't contain " + lostNodeName2, nodeNames.contains(lostNodeName2));
+
+    }
+
+    // remove a node but add it back before the waitFor period expires
+    // and assert that the trigger doesn't fire at all
+    try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, cluster.getLoader(), cluster)) {
+      final long waitTime = 2;
+      props.put("waitFor", waitTime);
+      trigger.setProcessor(noFirstRunProcessor);
+      trigger.run();
+
+      String lostNode = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+      cluster.simRemoveNode(lostNode, false);
+      AtomicBoolean fired = new AtomicBoolean(false);
+      trigger.setProcessor(event -> {
+        if (fired.compareAndSet(false, true)) {
+          long currentTimeNanos = timeSource.getTime();
+          long eventTimeNanos = event.getEventTime();
+          long waitForNanos = TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+          if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+            fail("NodeLostListener was fired before the configured waitFor period: currentTimeNanos=" + currentTimeNanos + ", eventTimeNanos=" +  eventTimeNanos + ",waitForNanos=" + waitForNanos);
+          }
+        } else {
+          fail("NodeLostListener was fired more than once!");
+        }
+        return true;
+      });
+      trigger.run(); // first run should detect the lost node
+      int counter = 0;
+      do {
+        if (cluster.getLiveNodesSet().get().size() == 2) {
+          break;
+        }
+        timeSource.sleep(100);
+        if (counter++ > 20) {
+          fail("Live nodes not updated!");
+        }
+      } while (true);
+      counter = 0;
+      cluster.getSimClusterStateProvider().simRestoreNode(lostNode);
+      do {
+        trigger.run();
+        timeSource.sleep(1000);
+        if (counter++ > waitTime + 1) { // run it a little more than the wait time
+          break;
+        }
+      } while (true);
+
+      // ensure the event was not fired
+      assertFalse(fired.get());
+    }
+  }
+
+  public void testActionLifecycle() throws Exception {
+    Map<String, Object> props = createTriggerProps(0);
+    List<Map<String, String>> actions = (List<Map<String, String>>) props.get("actions");
+    Map<String, String> action = new HashMap<>(2);
+    action.put("name", "testActionInit");
+    action.put("class", AssertInitTriggerAction.class.getName());
+    actions.add(action);
+    try (NodeLostTrigger trigger = new NodeLostTrigger("node_added_trigger", props, cluster.getLoader(), cluster)) {
+      assertEquals(true, actionConstructorCalled.get());
+      assertEquals(false, actionInitCalled.get());
+      assertEquals(false, actionCloseCalled.get());
+      trigger.init();
+      assertEquals(true, actionInitCalled.get());
+      assertEquals(false, actionCloseCalled.get());
+    }
+    assertEquals(true, actionCloseCalled.get());
+  }
+
+  public static class AssertInitTriggerAction implements TriggerAction  {
+    public AssertInitTriggerAction() {
+      actionConstructorCalled.set(true);
+    }
+
+    @Override
+    public String getName() {
+      return "";
+    }
+
+    @Override
+    public void process(TriggerEvent event, ActionContext actionContext) {
+
+    }
+
+    @Override
+    public void close() throws IOException {
+      actionCloseCalled.compareAndSet(false, true);
+    }
+
+    @Override
+    public void init(Map<String, String> args) {
+      actionInitCalled.compareAndSet(false, true);
+    }
+  }
+
+  @Test
+  public void testListenerAcceptance() throws Exception {
+    Map<String, Object> props = createTriggerProps(0);
+    try (NodeLostTrigger trigger = new NodeLostTrigger("node_added_trigger", props, cluster.getLoader(), cluster)) {
+      trigger.setProcessor(noFirstRunProcessor);
+
+      String newNode = cluster.simAddNode();
+
+      trigger.run(); // starts tracking live nodes
+
+      // stop the newly created node
+      cluster.simRemoveNode(newNode, true);
+
+      AtomicInteger callCount = new AtomicInteger(0);
+      AtomicBoolean fired = new AtomicBoolean(false);
+
+      trigger.setProcessor(event -> {
+        if (callCount.incrementAndGet() < 2) {
+          return false;
+        } else  {
+          fired.compareAndSet(false, true);
+          return true;
+        }
+      });
+
+      trigger.run(); // first run should detect the lost node and fire immediately but listener isn't ready
+      assertEquals(1, callCount.get());
+      assertFalse(fired.get());
+      trigger.run(); // second run should again fire
+      assertEquals(2, callCount.get());
+      assertTrue(fired.get());
+      trigger.run(); // should not fire
+      assertEquals(2, callCount.get());
+    }
+  }
+
+  @Test
+  public void testRestoreState() throws Exception {
+    long waitForSeconds = 1 + random().nextInt(5);
+    Map<String, Object> props = createTriggerProps(waitForSeconds);
+
+    String newNode = cluster.simAddNode();
+
+    // remove a node but update the trigger before the waitFor period expires
+    // and assert that the new trigger still fires
+
+    NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, cluster.getLoader(), cluster);
+    trigger.setProcessor(noFirstRunProcessor);
+    trigger.run();
+
+    // stop the newly created node
+    cluster.simRemoveNode(newNode, true);
+
+    trigger.run(); // this run should detect the lost node
+    trigger.close(); // close the old trigger
+
+    try (NodeLostTrigger newTrigger = new NodeLostTrigger("some_different_name", props, cluster.getLoader(), cluster))  {
+      try {
+        newTrigger.restoreState(trigger);
+        fail("Trigger should only be able to restore state from an old trigger of the same name");
+      } catch (AssertionError e) {
+        // expected
+      }
+    }
+
+    try (NodeLostTrigger newTrigger = new NodeLostTrigger("node_lost_trigger", props, cluster.getLoader(), cluster)) {
+      AtomicBoolean fired = new AtomicBoolean(false);
+      AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
+      newTrigger.setProcessor(event -> {
+        if (fired.compareAndSet(false, true)) {
+          eventRef.set(event);
+          long currentTimeNanos = timeSource.getTime();
+          long eventTimeNanos = event.getEventTime();
+          long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+          if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+            fail("NodeLostListener was fired before the configured waitFor period: currentTimeNanos=" + currentTimeNanos + ", eventTimeNanos=" + eventTimeNanos + ",waitForNanos=" + waitForNanos);
+          }
+        } else {
+          fail("NodeLostListener was fired more than once!");
+        }
+        return true;
+      });
+      newTrigger.restoreState(trigger); // restore state from the old trigger
+      int counter = 0;
+      do {
+        newTrigger.run();
+        timeSource.sleep(1000);
+        if (counter++ > 10) {
+          fail("Lost node was not discovered by trigger even after 10 seconds");
+        }
+      } while (!fired.get());
+
+      TriggerEvent nodeLostEvent = eventRef.get();
+      assertNotNull(nodeLostEvent);
+      List<String> nodeNames = (List<String>)nodeLostEvent.getProperty(TriggerEvent.NODE_NAMES);
+      assertTrue(nodeNames.contains(newNode));
+    }
+  }
+
+  private Map<String, Object> createTriggerProps(long waitForSeconds) {
+    Map<String, Object> props = new HashMap<>();
+    props.put("event", "nodeLost");
+    props.put("waitFor", waitForSeconds);
+    props.put("enabled", true);
+    List<Map<String, String>> actions = new ArrayList<>(3);
+    Map<String, String> map = new HashMap<>(2);
+    map.put("name", "compute_plan");
+    map.put("class", "solr.ComputePlanAction");
+    actions.add(map);
+    map = new HashMap<>(2);
+    map.put("name", "execute_plan");
+    map.put("class", "solr.ExecutePlanAction");
+    actions.add(map);
+    props.put("actions", actions);
+    return props;
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestPolicyCloud.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestPolicyCloud.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestPolicyCloud.java
new file mode 100644
index 0000000..22736db
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestPolicyCloud.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+
+import org.apache.lucene.util.Constants;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
+import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
+import org.apache.solr.client.solrj.cloud.autoscaling.Row;
+import org.apache.solr.client.solrj.cloud.autoscaling.Suggestion;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.rule.ImplicitSnitch;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.common.util.Utils;
+import org.apache.zookeeper.KeeperException;
+import org.junit.BeforeClass;
+import org.junit.rules.ExpectedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
+
+public class TestPolicyCloud extends SimSolrCloudTestCase {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  @org.junit.Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    configureCluster(5, TimeSource.get("simTime:50"));
+  }
+
+  public void testDataProviderPerReplicaDetails() throws Exception {
+    SolrClient solrClient = cluster.simGetSolrClient();
+    CollectionAdminRequest.createCollection("perReplicaDataColl", "conf", 1, 5)
+        .process(solrClient);
+
+    waitForState("Timeout waiting for collection to become active", "perReplicaDataColl", clusterShape(1, 5));
+    DocCollection coll = getCollectionState("perReplicaDataColl");
+    String autoScaleJson = "{" +
+        "  'cluster-preferences': [" +
+        "    { maximize : freedisk , precision: 50}," +
+        "    { minimize : cores, precision: 2}" +
+        "  ]," +
+        "  'cluster-policy': [" +
+        "    { replica : '0' , 'nodeRole': 'overseer'}," +
+        "    { 'replica': '<2', 'shard': '#ANY', 'node': '#ANY'" +
+        "    }" +
+        "  ]," +
+        "  'policies': {" +
+        "    'policy1': [" +
+        "      { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY'}," +
+        "      { 'replica': '<2', 'shard': '#EACH', 'sysprop.rack': 'rack1'}" +
+        "    ]" +
+        "  }" +
+        "}";
+    AutoScalingConfig config = new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString(autoScaleJson));
+    Policy.Session session = config.getPolicy().createSession(cluster);
+
+    AtomicInteger count = new AtomicInteger(0);
+    for (Row row : session.getSorted()) {
+      row.collectionVsShardVsReplicas.forEach((c, shardVsReplicas) -> shardVsReplicas.forEach((s, replicaInfos) -> {
+        for (ReplicaInfo replicaInfo : replicaInfos) {
+          if (replicaInfo.getVariables().containsKey(Suggestion.ConditionType.CORE_IDX.tagName)) count.incrementAndGet();
+        }
+      }));
+    }
+    assertTrue(count.get() > 0);
+
+    CollectionAdminRequest.deleteCollection("perReplicaDataColl").process(solrClient);
+
+  }
+
+  public void testCreateCollectionAddReplica() throws Exception  {
+    SolrClient solrClient = cluster.simGetSolrClient();
+    String nodeId = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+
+    int port = (Integer)cluster.getSimNodeStateProvider().simGetNodeValue(nodeId, ImplicitSnitch.PORT);
+
+    String commands =  "{set-policy :{c1 : [{replica:0 , shard:'#EACH', port: '!" + port + "'}]}}";
+    solrClient.request(AutoScalingHandlerTest.createAutoScalingRequest(SolrRequest.METHOD.POST, commands));
+
+    String collectionName = "testCreateCollectionAddReplica";
+    CollectionAdminRequest.createCollection(collectionName, "conf", 1, 1)
+        .setPolicy("c1")
+        .process(solrClient);
+    waitForState("Timeout waiting for collection to become active", collectionName, clusterShape(1, 1));
+
+    getCollectionState(collectionName).forEachReplica((s, replica) -> assertEquals(nodeId, replica.getNodeName()));
+
+    CollectionAdminRequest.addReplicaToShard(collectionName, "shard1").process(solrClient);
+    waitForState("Timed out waiting to see 2 replicas for collection: " + collectionName,
+        collectionName, (liveNodes, collectionState) -> collectionState.getReplicas().size() == 2);
+
+    getCollectionState(collectionName).forEachReplica((s, replica) -> assertEquals(nodeId, replica.getNodeName()));
+  }
+
+  public void testCreateCollectionSplitShard() throws Exception  {
+    SolrClient solrClient = cluster.simGetSolrClient();
+    String firstNode = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+    int firstNodePort = (Integer)cluster.getSimNodeStateProvider().simGetNodeValue(firstNode, ImplicitSnitch.PORT);
+
+    String secondNode;
+    int secondNodePort;
+    while (true)  {
+      secondNode = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+      secondNodePort = (Integer)cluster.getSimNodeStateProvider().simGetNodeValue(secondNode, ImplicitSnitch.PORT);
+      if (secondNodePort != firstNodePort)  break;
+    }
+
+    String commands =  "{set-policy :{c1 : [{replica:1 , shard:'#EACH', port: '" + firstNodePort + "'}, {replica:1, shard:'#EACH', port:'" + secondNodePort + "'}]}}";
+    NamedList<Object> response = solrClient.request(AutoScalingHandlerTest.createAutoScalingRequest(SolrRequest.METHOD.POST, commands));
+    assertEquals("success", response.get("result"));
+
+    String collectionName = "testCreateCollectionSplitShard";
+    CollectionAdminRequest.createCollection(collectionName, "conf", 1, 2)
+        .setPolicy("c1")
+        .process(solrClient);
+    waitForState("Timeout waiting for collection to become active", collectionName, clusterShape(1, 2));
+
+    DocCollection docCollection = getCollectionState(collectionName);
+    List<Replica> list = docCollection.getReplicas(firstNode);
+    int replicasOnNode1 = list != null ? list.size() : 0;
+    list = docCollection.getReplicas(secondNode);
+    int replicasOnNode2 = list != null ? list.size() : 0;
+
+    assertEquals("Expected exactly one replica of collection on node with port: " + firstNodePort, 1, replicasOnNode1);
+    assertEquals("Expected exactly one replica of collection on node with port: " + secondNodePort, 1, replicasOnNode2);
+
+    CollectionAdminRequest.splitShard(collectionName).setShardName("shard1").process(solrClient);
+
+    waitForState("Timed out waiting to see 6 replicas for collection: " + collectionName,
+        collectionName, (liveNodes, collectionState) -> collectionState.getReplicas().size() == 6);
+
+    docCollection = getCollectionState(collectionName);
+    list = docCollection.getReplicas(firstNode);
+    replicasOnNode1 = list != null ? list.size() : 0;
+    list = docCollection.getReplicas(secondNode);
+    replicasOnNode2 = list != null ? list.size() : 0;
+
+    assertEquals("Expected exactly three replica of collection on node with port: " + firstNodePort, 3, replicasOnNode1);
+    assertEquals("Expected exactly three replica of collection on node with port: " + secondNodePort, 3, replicasOnNode2);
+    CollectionAdminRequest.deleteCollection(collectionName).process(solrClient);
+
+  }
+
+  public void testMetricsTag() throws Exception {
+    SolrClient solrClient = cluster.simGetSolrClient();
+    String setClusterPolicyCommand = "{" +
+        " 'set-cluster-policy': [" +
+        "      {'cores':'<10', 'node':'#ANY'}," +
+        "      {'replica':'<2', 'shard': '#EACH', 'node': '#ANY'}," +
+        "      {'metrics:abc':'overseer', 'replica':0}" +
+        "    ]" +
+        "}";
+    SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicyCommand);
+    try {
+      solrClient.request(req);
+      fail("expected exception");
+    } catch (Exception e) {
+      // expected
+      assertTrue(e.toString().contains("Invalid metrics: param in"));
+    }
+    setClusterPolicyCommand = "{" +
+        " 'set-cluster-policy': [" +
+        "      {'cores':'<10', 'node':'#ANY'}," +
+        "      {'replica':'<2', 'shard': '#EACH', 'node': '#ANY'}," +
+        "      {'metrics:solr.node:ADMIN./admin/authorization.clientErrors:count':'>58768765', 'replica':0}" +
+        "    ]" +
+        "}";
+    req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicyCommand);
+    solrClient.request(req);
+
+    //org.eclipse.jetty.server.handler.DefaultHandler.2xx-responses
+    CollectionAdminRequest.createCollection("metricsTest", "conf", 1, 1)
+        .process(solrClient);
+    waitForState("Timeout waiting for collection to become active", "metricsTest", clusterShape(1, 1));
+
+    DocCollection collection = getCollectionState("metricsTest");
+    List<String> tags = Arrays.asList("metrics:solr.node:ADMIN./admin/authorization.clientErrors:count",
+        "metrics:solr.jvm:buffers.direct.Count");
+    Map<String, Object> val = cluster.getNodeStateProvider().getNodeValues(collection.getReplicas().get(0).getNodeName(), tags);
+    for (String tag : tags) {
+      assertNotNull( "missing : "+ tag , val.get(tag));
+    }
+
+
+  }
+
+  public void testCreateCollectionAddShardWithReplicaTypeUsingPolicy() throws Exception {
+    SolrClient solrClient = cluster.simGetSolrClient();
+    List<String> nodes = new ArrayList<>(cluster.getClusterStateProvider().getLiveNodes());
+    String nrtNodeName = nodes.get(0);
+    int nrtPort = (Integer)cluster.getSimNodeStateProvider().simGetNodeValue(nrtNodeName, ImplicitSnitch.PORT);
+
+
+    String pullNodeName = nodes.get(1);
+    int pullPort = (Integer)cluster.getSimNodeStateProvider().simGetNodeValue(pullNodeName, ImplicitSnitch.PORT);
+
+    String tlogNodeName = nodes.get(1);
+    int tlogPort = (Integer)cluster.getSimNodeStateProvider().simGetNodeValue(tlogNodeName, ImplicitSnitch.PORT);
+    log.info("NRT {} PULL {} , TLOG {} ", nrtNodeName, pullNodeName, tlogNodeName);
+
+    String commands = "{set-cluster-policy :[" +
+        "{replica:0 , shard:'#EACH', type: NRT, port: '!" + nrtPort + "'}" +
+        "{replica:0 , shard:'#EACH', type: PULL, port: '!" + pullPort + "'}" +
+        "{replica:0 , shard:'#EACH', type: TLOG, port: '!" + tlogPort + "'}" +
+        "]}";
+
+
+    solrClient.request(AutoScalingHandlerTest.createAutoScalingRequest(SolrRequest.METHOD.POST, commands));
+    Map<String, Object> json = Utils.getJson(cluster.getDistribStateManager(), ZkStateReader.SOLR_AUTOSCALING_CONF_PATH);
+    assertEquals("full json:" + Utils.toJSONString(json), "!" + nrtPort,
+        Utils.getObjectByPath(json, true, "cluster-policy[0]/port"));
+    assertEquals("full json:" + Utils.toJSONString(json), "!" + pullPort,
+        Utils.getObjectByPath(json, true, "cluster-policy[1]/port"));
+    assertEquals("full json:" + Utils.toJSONString(json), "!" + tlogPort,
+        Utils.getObjectByPath(json, true, "cluster-policy[2]/port"));
+
+    CollectionAdminRequest.createCollectionWithImplicitRouter("policiesTest", "conf", "s1", 1, 1, 1)
+        .process(solrClient);
+    waitForState("Timeout waiting for collection to become active", "policiesTest", clusterShape(1, 3));
+
+    DocCollection coll = getCollectionState("policiesTest");
+
+
+    BiConsumer<String, Replica> verifyReplicas = (s, replica) -> {
+      switch (replica.getType()) {
+        case NRT: {
+          assertTrue("NRT replica should be in " + nrtNodeName, replica.getNodeName().equals(nrtNodeName));
+          break;
+        }
+        case TLOG: {
+          assertTrue("TLOG replica should be in " + tlogNodeName, replica.getNodeName().equals(tlogNodeName));
+          break;
+        }
+        case PULL: {
+          assertTrue("PULL replica should be in " + pullNodeName, replica.getNodeName().equals(pullNodeName));
+          break;
+        }
+      }
+
+    };
+    coll.forEachReplica(verifyReplicas);
+
+    CollectionAdminRequest.createShard("policiesTest", "s3").
+        process(solrClient);
+    coll = getCollectionState("policiesTest");
+    assertEquals(3, coll.getSlice("s3").getReplicas().size());
+    coll.forEachReplica(verifyReplicas);
+  }
+
+  public void testCreateCollectionAddShardUsingPolicy() throws Exception {
+    SolrClient solrClient = cluster.simGetSolrClient();
+    String nodeId = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+    int port = (Integer)cluster.getSimNodeStateProvider().simGetNodeValue(nodeId, ImplicitSnitch.PORT);
+
+    String commands =  "{set-policy :{c1 : [{replica:1 , shard:'#EACH', port: '" + port + "'}]}}";
+    solrClient.request(AutoScalingHandlerTest.createAutoScalingRequest(SolrRequest.METHOD.POST, commands));
+    Map<String, Object> json = Utils.getJson(cluster.getDistribStateManager(), ZkStateReader.SOLR_AUTOSCALING_CONF_PATH);
+    assertEquals("full json:"+ Utils.toJSONString(json) , "#EACH",
+        Utils.getObjectByPath(json, true, "/policies/c1[0]/shard"));
+    CollectionAdminRequest.createCollectionWithImplicitRouter("policiesTest", "conf", "s1,s2", 1)
+        .setPolicy("c1")
+        .process(solrClient);
+    waitForState("Timeout waiting for collection to become active", "policiesTest", clusterShape(2, 1));
+
+    DocCollection coll = getCollectionState("policiesTest");
+    assertEquals("c1", coll.getPolicyName());
+    assertEquals(2,coll.getReplicas().size());
+    coll.forEachReplica((s, replica) -> assertEquals(nodeId, replica.getNodeName()));
+    CollectionAdminRequest.createShard("policiesTest", "s3").process(solrClient);
+    waitForState("Timeout waiting for collection to become active", "policiesTest", clusterShape(3, 1));
+
+    coll = getCollectionState("policiesTest");
+    assertEquals(1, coll.getSlice("s3").getReplicas().size());
+    coll.getSlice("s3").forEach(replica -> assertEquals(nodeId, replica.getNodeName()));
+  }
+
+  public void testDataProvider() throws IOException, SolrServerException, KeeperException, InterruptedException {
+    SolrClient solrClient = cluster.simGetSolrClient();
+    CollectionAdminRequest.createCollectionWithImplicitRouter("policiesTest", "conf", "shard1", 2)
+        .process(solrClient);
+    waitForState("Timeout waiting for collection to become active", "policiesTest", clusterShape(1, 2));
+    DocCollection rulesCollection = getCollectionState("policiesTest");
+
+    Map<String, Object> val = cluster.getNodeStateProvider().getNodeValues(rulesCollection.getReplicas().get(0).getNodeName(), Arrays.asList(
+        "freedisk",
+        "cores",
+        "heapUsage",
+        "sysLoadAvg"));
+    assertNotNull(val.get("freedisk"));
+    assertNotNull(val.get("heapUsage"));
+    assertNotNull(val.get("sysLoadAvg"));
+    assertTrue(((Number) val.get("cores")).intValue() > 0);
+    assertTrue("freedisk value is " + ((Number) val.get("freedisk")).doubleValue(),  Double.compare(((Number) val.get("freedisk")).doubleValue(), 0.0d) > 0);
+    assertTrue("heapUsage value is " + ((Number) val.get("heapUsage")).doubleValue(), Double.compare(((Number) val.get("heapUsage")).doubleValue(), 0.0d) > 0);
+    if (!Constants.WINDOWS)  {
+      // the system load average metrics is not available on windows platform
+      assertTrue("sysLoadAvg value is " + ((Number) val.get("sysLoadAvg")).doubleValue(), Double.compare(((Number) val.get("sysLoadAvg")).doubleValue(), 0.0d) > 0);
+    }
+    // simulator doesn't have Overseer, so just pick a random node
+    String overseerNode = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+    solrClient.request(CollectionAdminRequest.addRole(overseerNode, "overseer"));
+    for (int i = 0; i < 10; i++) {
+      Map<String, Object> data = Utils.getJson(cluster.getDistribStateManager(), ZkStateReader.ROLES);
+      if (i >= 9 && data.isEmpty()) {
+        throw new RuntimeException("NO overseer node created");
+      }
+      cluster.getTimeSource().sleep(100);
+    }
+    val = cluster.getNodeStateProvider().getNodeValues(overseerNode, Arrays.asList(
+        "nodeRole",
+        "ip_1", "ip_2", "ip_3", "ip_4",
+        "sysprop.java.version",
+        "sysprop.java.vendor"));
+    assertEquals("overseer", val.get("nodeRole"));
+    assertNotNull(val.get("ip_1"));
+    assertNotNull(val.get("ip_2"));
+    assertNotNull(val.get("ip_3"));
+    assertNotNull(val.get("ip_4"));
+    assertNotNull(val.get("sysprop.java.version"));
+    assertNotNull(val.get("sysprop.java.vendor"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimDistributedQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimDistributedQueue.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimDistributedQueue.java
new file mode 100644
index 0000000..1c41795
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSimDistributedQueue.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.nio.charset.Charset;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.cloud.DistributedQueue;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
+import org.junit.After;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class TestSimDistributedQueue extends SolrTestCaseJ4 {
+  private static final Charset UTF8 = Charset.forName("UTF-8");
+  protected ExecutorService executor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrjNamedThreadFactory("sdqtest-"));
+
+  @Test
+  public void testDistributedQueue() throws Exception {
+    String dqZNode = "/distqueue/test1";
+    byte[] data = "hello world".getBytes(UTF8);
+
+    DistributedQueue dq = makeDistributedQueue(dqZNode);
+
+    // basic ops
+    assertNull(dq.poll());
+    try {
+      dq.remove();
+      fail("NoSuchElementException expected");
+    } catch (NoSuchElementException expected) {
+      // expected
+    }
+
+    dq.offer(data);
+    assertArrayEquals(dq.peek(500), data);
+    assertArrayEquals(dq.remove(), data);
+    assertNull(dq.poll());
+
+    dq.offer(data);
+    assertArrayEquals(dq.take(), data); // waits for data
+    assertNull(dq.poll());
+
+    dq.offer(data);
+    dq.peek(true); // wait until data is definitely there before calling remove
+    assertArrayEquals(dq.remove(), data);
+    assertNull(dq.poll());
+
+    // should block until the background thread makes the offer
+    (new QueueChangerThread(dq, 1000)).start();
+    assertNotNull(dq.peek(true));
+    assertNotNull(dq.remove());
+    assertNull(dq.poll());
+
+    // timeout scenario ... background thread won't offer until long after the peek times out
+    QueueChangerThread qct = new QueueChangerThread(dq, 1000);
+    qct.start();
+    assertNull(dq.peek(500));
+    qct.join();
+  }
+
+  @Test
+  public void testDistributedQueueBlocking() throws Exception {
+    String dqZNode = "/distqueue/test2";
+    String testData = "hello world";
+
+    DistributedQueue dq = makeDistributedQueue(dqZNode);
+
+    assertNull(dq.peek());
+    Future<String> future = executor.submit(() -> new String(dq.peek(true), UTF8));
+    try {
+      future.get(1000, TimeUnit.MILLISECONDS);
+      fail("TimeoutException expected");
+    } catch (TimeoutException expected) {
+      assertFalse(future.isDone());
+    }
+
+    dq.offer(testData.getBytes(UTF8));
+    assertEquals(testData, future.get(1000, TimeUnit.MILLISECONDS));
+    assertNotNull(dq.poll());
+
+    assertNull(dq.peek(100));
+
+    // Rerun the earlier test make sure updates are still seen, post reconnection.
+    future = executor.submit(() -> new String(dq.peek(true), UTF8));
+    try {
+      future.get(1000, TimeUnit.MILLISECONDS);
+      fail("TimeoutException expected");
+    } catch (TimeoutException expected) {
+      assertFalse(future.isDone());
+    }
+
+    dq.offer(testData.getBytes(UTF8));
+    assertEquals(testData, future.get(1000, TimeUnit.MILLISECONDS));
+    assertNotNull(dq.poll());
+    assertNull(dq.poll());
+  }
+
+  @Test
+  public void testLocallyOffer() throws Exception {
+    String dqZNode = "/distqueue/test3";
+    DistributedQueue dq = makeDistributedQueue(dqZNode);
+    dq.peekElements(1, 1, s -> true);
+    for (int i = 0; i < 100; i++) {
+      byte[] data = String.valueOf(i).getBytes(UTF8);
+      dq.offer(data);
+      assertNotNull(dq.peek());
+      dq.poll();
+      dq.peekElements(1, 1, s -> true);
+    }
+  }
+
+
+  @Test
+  public void testPeekElements() throws Exception {
+    String dqZNode = "/distqueue/test4";
+    byte[] data = "hello world".getBytes(UTF8);
+
+    DistributedQueue dq = makeDistributedQueue(dqZNode);
+
+    // Populate with data.
+    dq.offer(data);
+    dq.offer(data);
+    dq.offer(data);
+
+    Predicate<String> alwaysTrue = s -> true;
+    Predicate<String> alwaysFalse = s -> false;
+
+    // Should be able to get 0, 1, 2, or 3 instantly
+    for (int i = 0; i <= 3; ++i) {
+      assertEquals(i, dq.peekElements(i, 0, alwaysTrue).size());
+    }
+
+    // Asking for more should return only 3.
+    assertEquals(3, dq.peekElements(4, 0, alwaysTrue).size());
+
+    // If we filter everything out, we should block for the full time.
+    long start = System.nanoTime();
+    assertEquals(0, dq.peekElements(4, 1000, alwaysFalse).size());
+    assertTrue(System.nanoTime() - start >= TimeUnit.MILLISECONDS.toNanos(500));
+
+    // If someone adds a new matching element while we're waiting, we should return immediately.
+    executor.submit(() -> {
+      try {
+        Thread.sleep(500);
+        dq.offer(data);
+      } catch (Exception e) {
+        // ignore
+      }
+    });
+    start = System.nanoTime();
+    assertEquals(1, dq.peekElements(4, 2000, child -> {
+      // The 4th element in the queue will end with a "3".
+      return child.endsWith("3");
+    }).size());
+    long elapsed = System.nanoTime() - start;
+    assertTrue(elapsed < TimeUnit.MILLISECONDS.toNanos(1000));
+    assertTrue(elapsed >= TimeUnit.MILLISECONDS.toNanos(250));
+  }
+
+
+  protected DistributedQueue makeDistributedQueue(String dqZNode) throws Exception {
+    return new SimDistributedQueueFactory.SimDistributedQueue(dqZNode);
+  }
+
+  private static class QueueChangerThread extends Thread {
+
+    DistributedQueue dq;
+    long waitBeforeOfferMs;
+
+    QueueChangerThread(DistributedQueue dq, long waitBeforeOfferMs) {
+      this.dq = dq;
+      this.waitBeforeOfferMs = waitBeforeOfferMs;
+    }
+
+    public void run() {
+      try {
+        Thread.sleep(waitBeforeOfferMs);
+        dq.offer(getName().getBytes(UTF8));
+      } catch (InterruptedException ie) {
+        // do nothing
+      } catch (Exception exc) {
+        throw new RuntimeException(exc);
+      }
+    }
+  }
+
+  @Override
+  @After
+  public void tearDown() throws Exception {
+    try {
+      super.tearDown();
+    } catch (Exception exc) {
+    }
+    executor.shutdown();
+  }
+
+}