You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2017/12/27 15:03:59 UTC

[06/54] [abbrv] lucene-solr:jira/solr-11702: SOLR-11285: Simulation framework for autoscaling.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/GenericDistributedQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/GenericDistributedQueue.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/GenericDistributedQueue.java
new file mode 100644
index 0000000..5d7aa4d
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/GenericDistributedQueue.java
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Predicate;
+
+import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.solr.client.solrj.cloud.DistributedQueue;
+import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
+import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
+import org.apache.solr.cloud.OverseerTaskQueue;
+import org.apache.solr.cloud.Stats;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.util.Pair;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A distributed queue that uses {@link DistribStateManager} as the underlying distributed store.
+ * Implementation based on {@link org.apache.solr.cloud.ZkDistributedQueue}
+ */
+public class GenericDistributedQueue implements DistributedQueue {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  static final String PREFIX = "qn-";
+
+  /**
+   * Theory of operation:
+   * <p>
+   * Under ordinary circumstances we neither watch nor poll for children in ZK.
+   * Instead we keep an in-memory list of known child names.  When the in-memory
+   * list is exhausted, we then fetch from ZK.
+   * <p>
+   * We only bother setting a child watcher when the queue has no children in ZK.
+   */
+  private static final Object _IMPLEMENTATION_NOTES = null;
+
+  final String dir;
+
+  final DistribStateManager stateManager;
+
+  final Stats stats;
+
+  /**
+   * A lock that guards all of the mutable state that follows.
+   */
+  private final ReentrantLock updateLock = new ReentrantLock();
+
+  /**
+   * Contains the last set of children fetched from ZK. Elements are removed from the head of
+   * this in-memory set as they are consumed from the queue.  Due to the distributed nature
+   * of the queue, elements may appear in this set whose underlying nodes have been consumed in ZK.
+   * Therefore, methods like {@link #peek()} have to double-check actual node existence, and methods
+   * like {@link #poll()} must resolve any races by attempting to delete the underlying node.
+   */
+  private TreeSet<String> knownChildren = new TreeSet<>();
+
+  /**
+   * Used to wait on ZK changes to the child list; you must hold {@link #updateLock} before waiting on this condition.
+   */
+  private final Condition changed = updateLock.newCondition();
+
+  private boolean isDirty = true;
+
+  private int watcherCount = 0;
+
+  private final int maxQueueSize;
+
+  /**
+   * If {@link #maxQueueSize} is set, the number of items we can queue without rechecking the server.
+   */
+  private final AtomicInteger offerPermits = new AtomicInteger(0);
+
+  public GenericDistributedQueue(DistribStateManager stateManager, String dir) {
+    this(stateManager, dir, new Stats());
+  }
+
+  public GenericDistributedQueue(DistribStateManager stateManager, String dir, Stats stats) {
+    this(stateManager, dir, stats, 0);
+  }
+
+  public GenericDistributedQueue(DistribStateManager stateManager, String dir, Stats stats, int maxQueueSize) {
+    this.dir = dir;
+
+    try {
+      if (!stateManager.hasData(dir)) {
+        try {
+          stateManager.makePath(dir);
+        } catch (AlreadyExistsException e) {
+          // ignore
+        }
+      }
+    } catch (IOException | KeeperException e) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new SolrException(ErrorCode.SERVER_ERROR, e);
+    }
+
+    this.stateManager = stateManager;
+    this.stats = stats;
+    this.maxQueueSize = maxQueueSize;
+  }
+
+  /**
+   * Returns the data at the first element of the queue, or null if the queue is
+   * empty.
+   *
+   * @return data at the first element of the queue, or null.
+   */
+  @Override
+  public byte[] peek() throws Exception {
+    Timer.Context time = stats.time(dir + "_peek");
+    try {
+      return firstElement();
+    } finally {
+      time.stop();
+    }
+  }
+
+  /**
+   * Returns the data at the first element of the queue, or null if the queue is
+   * empty and block is false.
+   *
+   * @param block if true, blocks until an element enters the queue
+   * @return data at the first element of the queue, or null.
+   */
+  @Override
+  public byte[] peek(boolean block) throws Exception {
+    return block ? peek(Long.MAX_VALUE) : peek();
+  }
+
+  /**
+   * Returns the data at the first element of the queue, or null if the queue is
+   * empty after wait ms.
+   *
+   * @param wait max wait time in ms.
+   * @return data at the first element of the queue, or null.
+   */
+  @Override
+  public byte[] peek(long wait) throws Exception {
+    Preconditions.checkArgument(wait > 0);
+    Timer.Context time;
+    if (wait == Long.MAX_VALUE) {
+      time = stats.time(dir + "_peek_wait_forever");
+    } else {
+      time = stats.time(dir + "_peek_wait" + wait);
+    }
+    updateLock.lockInterruptibly();
+    try {
+      long waitNanos = TimeUnit.MILLISECONDS.toNanos(wait);
+      while (waitNanos > 0) {
+        byte[] result = firstElement();
+        if (result != null) {
+          return result;
+        }
+        waitNanos = changed.awaitNanos(waitNanos);
+      }
+      return null;
+    } finally {
+      updateLock.unlock();
+      time.stop();
+    }
+  }
+
+  /**
+   * Attempts to remove the head of the queue and return it. Returns null if the
+   * queue is empty.
+   *
+   * @return Head of the queue or null.
+   */
+  @Override
+  public byte[] poll() throws Exception {
+    Timer.Context time = stats.time(dir + "_poll");
+    try {
+      return removeFirst();
+    } finally {
+      time.stop();
+    }
+  }
+
+  /**
+   * Attempts to remove the head of the queue and return it.
+   *
+   * @return The former head of the queue
+   */
+  @Override
+  public byte[] remove() throws Exception {
+    Timer.Context time = stats.time(dir + "_remove");
+    try {
+      byte[] result = removeFirst();
+      if (result == null) {
+        throw new NoSuchElementException();
+      }
+      return result;
+    } finally {
+      time.stop();
+    }
+  }
+
+  public void remove(Collection<String> paths) throws Exception {
+    if (paths.isEmpty()) return;
+    List<Op> ops = new ArrayList<>();
+    for (String path : paths) {
+      ops.add(Op.delete(dir + "/" + path, -1));
+    }
+    for (int from = 0; from < ops.size(); from += 1000) {
+      int to = Math.min(from + 1000, ops.size());
+      if (from < to) {
+        try {
+          stateManager.multi(ops.subList(from, to));
+        } catch (NoSuchElementException e) {
+          // don't know which nodes are not exist, so try to delete one by one node
+          for (int j = from; j < to; j++) {
+            try {
+              stateManager.removeData(ops.get(j).getPath(), -1);
+            } catch (NoSuchElementException e2) {
+              LOG.debug("Can not remove node which is not exist : " + ops.get(j).getPath());
+            }
+          }
+        }
+      }
+    }
+
+    int cacheSizeBefore = knownChildren.size();
+    knownChildren.removeAll(paths);
+    if (cacheSizeBefore - paths.size() == knownChildren.size() && knownChildren.size() != 0) {
+      stats.setQueueLength(knownChildren.size());
+    } else {
+      // There are elements get deleted but not present in the cache,
+      // the cache seems not valid anymore
+      knownChildren.clear();
+      isDirty = true;
+    }
+  }
+
+  /**
+   * Removes the head of the queue and returns it, blocks until it succeeds.
+   *
+   * @return The former head of the queue
+   */
+  @Override
+  public byte[] take() throws Exception {
+    // Same as for element. Should refactor this.
+    Timer.Context timer = stats.time(dir + "_take");
+    updateLock.lockInterruptibly();
+    try {
+      while (true) {
+        byte[] result = removeFirst();
+        if (result != null) {
+          return result;
+        }
+        changed.await();
+      }
+    } finally {
+      updateLock.unlock();
+      timer.stop();
+    }
+  }
+
+  /**
+   * Inserts data into queue.  If there are no other queue consumers, the offered element
+   * will be immediately visible when this method returns.
+   */
+  @Override
+  public void offer(byte[] data) throws Exception {
+    Timer.Context time = stats.time(dir + "_offer");
+    try {
+      while (true) {
+        try {
+          if (maxQueueSize > 0) {
+            if (offerPermits.get() <= 0 || offerPermits.getAndDecrement() <= 0) {
+              // If a max queue size is set, check it before creating a new queue item.
+              if (!stateManager.hasData(dir)) {
+                // jump to the code below, which tries to create dir if it doesn't exist
+                throw new NoSuchElementException();
+              }
+              List<String> children = stateManager.listData(dir);
+              int remainingCapacity = maxQueueSize - children.size();
+              if (remainingCapacity <= 0) {
+                throw new IllegalStateException("queue is full");
+              }
+
+              // Allow this client to push up to 1% of the remaining queue capacity without rechecking.
+              offerPermits.set(remainingCapacity / 100);
+            }
+          }
+
+          // Explicitly set isDirty here so that synchronous same-thread calls behave as expected.
+          // This will get set again when the watcher actually fires, but that's ok.
+          stateManager.createData(dir + "/" + PREFIX, data, CreateMode.PERSISTENT_SEQUENTIAL);
+          isDirty = true;
+          return;
+        } catch (NoSuchElementException e) {
+          try {
+            stateManager.createData(dir, new byte[0], CreateMode.PERSISTENT);
+          } catch (NoSuchElementException ne) {
+            // someone created it
+          }
+        }
+      }
+    } finally {
+      time.stop();
+    }
+  }
+
+  public Stats getZkStats() {
+    return stats;
+  }
+
+  @Override
+  public Map<String, Object> getStats() {
+    if (stats == null) {
+      return Collections.emptyMap();
+    }
+    Map<String, Object> res = new HashMap<>();
+    res.put("queueLength", stats.getQueueLength());
+    final Map<String, Object> statsMap = new HashMap<>();
+    res.put("stats", statsMap);
+    stats.getStats().forEach((op, stat) -> {
+      final Map<String, Object> statMap = new HashMap<>();
+      statMap.put("success", stat.success.get());
+      statMap.put("errors", stat.errors.get());
+      final List<Map<String, Object>> failed = new ArrayList<>(stat.failureDetails.size());
+      statMap.put("failureDetails", failed);
+      stat.failureDetails.forEach(failedOp -> {
+        Map<String, Object> fo = new HashMap<>();
+        fo.put("req", failedOp.req);
+        fo.put("resp", failedOp.resp);
+      });
+      statsMap.put(op, statMap);
+    });
+    return res;
+  }
+
+  /**
+   * Returns the name if the first known child node, or {@code null} if the queue is empty.
+   * This is the only place {@link #knownChildren} is ever updated!
+   * The caller must double check that the actual node still exists, since the in-memory
+   * list is inherently stale.
+   */
+  private String firstChild(boolean remove, boolean refetchIfDirty) throws Exception {
+    updateLock.lockInterruptibly();
+    try {
+      // We always return from cache first, the cache will be cleared if the node is not exist
+      if (!knownChildren.isEmpty() && !(isDirty && refetchIfDirty)) {
+        return remove ? knownChildren.pollFirst() : knownChildren.first();
+      }
+
+      if (!isDirty && knownChildren.isEmpty()) {
+        return null;
+      }
+
+      // Dirty, try to fetch an updated list of children from ZK.
+      // Only set a new watcher if there isn't already a watcher.
+      ChildWatcher newWatcher = (watcherCount == 0) ? new ChildWatcher() : null;
+      knownChildren = fetchZkChildren(newWatcher);
+      if (newWatcher != null) {
+        watcherCount++; // watcher was successfully set
+      }
+      isDirty = false;
+      if (knownChildren.isEmpty()) {
+        return null;
+      }
+      changed.signalAll();
+      return remove ? knownChildren.pollFirst() : knownChildren.first();
+    } finally {
+      updateLock.unlock();
+    }
+  }
+
+  /**
+   * Return the current set of children from ZK; does not change internal state.
+   */
+  TreeSet<String> fetchZkChildren(Watcher watcher) throws Exception {
+    while (true) {
+      try {
+        TreeSet<String> orderedChildren = new TreeSet<>();
+
+        List<String> childNames = stateManager.listData(dir, watcher);
+        stats.setQueueLength(childNames.size());
+        for (String childName : childNames) {
+          // Check format
+          if (!childName.regionMatches(0, PREFIX, 0, PREFIX.length())) {
+            LOG.debug("Found child node with improper name: " + childName);
+            continue;
+          }
+          orderedChildren.add(childName);
+        }
+        return orderedChildren;
+      } catch (NoSuchElementException e) {
+        try {
+          stateManager.makePath(dir);
+        } catch (AlreadyExistsException e2) {
+          // ignore
+        }
+        // go back to the loop and try again
+      }
+    }
+  }
+
+  /**
+   * Return the currently-known set of elements, using child names from memory. If no children are found, or no
+   * children pass {@code acceptFilter}, waits up to {@code waitMillis} for at least one child to become available.
+   * <p>
+   * Package-private to support {@link OverseerTaskQueue} specifically.</p>
+   */
+  @Override
+  public Collection<Pair<String, byte[]>> peekElements(int max, long waitMillis, Predicate<String> acceptFilter) throws Exception {
+    List<String> foundChildren = new ArrayList<>();
+    long waitNanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
+    boolean first = true;
+    while (true) {
+      // Trigger a refresh, but only force it if this is not the first iteration.
+      firstChild(false, !first);
+
+      updateLock.lockInterruptibly();
+      try {
+        for (String child : knownChildren) {
+          if (acceptFilter.test(child)) {
+            foundChildren.add(child);
+          }
+        }
+        if (!foundChildren.isEmpty()) {
+          break;
+        }
+        if (waitNanos <= 0) {
+          break;
+        }
+
+        // If this is our first time through, force a refresh before waiting.
+        if (first) {
+          first = false;
+          continue;
+        }
+
+        waitNanos = changed.awaitNanos(waitNanos);
+      } finally {
+        updateLock.unlock();
+      }
+
+      if (!foundChildren.isEmpty()) {
+        break;
+      }
+    }
+
+    // Technically we could restart the method if we fail to actually obtain any valid children
+    // from ZK, but this is a super rare case, and the latency of the ZK fetches would require
+    // much more sophisticated waitNanos tracking.
+    List<Pair<String, byte[]>> result = new ArrayList<>();
+    for (String child : foundChildren) {
+      if (result.size() >= max) {
+        break;
+      }
+      try {
+        VersionedData data = stateManager.getData(dir + "/" + child);
+        result.add(new Pair<>(child, data.getData()));
+      } catch (NoSuchElementException e) {
+        // Another client deleted the node first, remove the in-memory and continue.
+        updateLock.lockInterruptibly();
+        try {
+          knownChildren.remove(child);
+        } finally {
+          updateLock.unlock();
+        }
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Return the head of the queue without modifying the queue.
+   *
+   * @return the data at the head of the queue.
+   */
+  private byte[] firstElement() throws Exception {
+    while (true) {
+      String firstChild = firstChild(false, false);
+      if (firstChild == null) {
+        return null;
+      }
+      try {
+        VersionedData data = stateManager.getData(dir + "/" + firstChild);
+        return data != null ? data.getData() : null;
+      } catch (NoSuchElementException e) {
+        // Another client deleted the node first, remove the in-memory and retry.
+        updateLock.lockInterruptibly();
+        try {
+          // Efficient only for single-consumer
+          knownChildren.clear();
+          isDirty = true;
+        } finally {
+          updateLock.unlock();
+        }
+      }
+    }
+  }
+
+  private byte[] removeFirst() throws Exception {
+    while (true) {
+      String firstChild = firstChild(true, false);
+      if (firstChild == null) {
+        return null;
+      }
+      try {
+        String path = dir + "/" + firstChild;
+        VersionedData result = stateManager.getData(path);
+        stateManager.removeData(path, -1);
+        stats.setQueueLength(knownChildren.size());
+        return result.getData();
+      } catch (NoSuchElementException e) {
+        // Another client deleted the node first, remove the in-memory and retry.
+        updateLock.lockInterruptibly();
+        try {
+          // Efficient only for single-consumer
+          knownChildren.clear();
+          isDirty = true;
+        } finally {
+          updateLock.unlock();
+        }
+      }
+    }
+  }
+
+  @VisibleForTesting int watcherCount() throws InterruptedException {
+    updateLock.lockInterruptibly();
+    try {
+      return watcherCount;
+    } finally {
+      updateLock.unlock();
+    }
+  }
+
+  @VisibleForTesting boolean isDirty() throws InterruptedException {
+    updateLock.lockInterruptibly();
+    try {
+      return isDirty;
+    } finally {
+      updateLock.unlock();
+    }
+  }
+
+  @VisibleForTesting class ChildWatcher implements Watcher {
+
+    @Override
+    public void process(WatchedEvent event) {
+      // session events are not change events, and do not remove the watcher; except for Expired
+      if (Event.EventType.None.equals(event.getType()) && !Event.KeeperState.Expired.equals(event.getState())) {
+        return;
+      }
+      updateLock.lock();
+      try {
+        isDirty = true;
+        watcherCount--;
+        // optimistically signal any waiters that the queue may not be empty now, so they can wake up and retry
+        changed.signalAll();
+      } finally {
+        updateLock.unlock();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/GenericDistributedQueueFactory.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/GenericDistributedQueueFactory.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/GenericDistributedQueueFactory.java
new file mode 100644
index 0000000..12b4af8
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/GenericDistributedQueueFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.io.IOException;
+
+import org.apache.solr.client.solrj.cloud.DistributedQueue;
+import org.apache.solr.client.solrj.cloud.DistributedQueueFactory;
+import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
+
+/**
+ * Factory for {@link GenericDistributedQueue}.
+ */
+public class GenericDistributedQueueFactory implements DistributedQueueFactory {
+
+  private final DistribStateManager stateManager;
+
+  public GenericDistributedQueueFactory(DistribStateManager stateManager) {
+    this.stateManager = stateManager;
+  }
+
+  @Override
+  public DistributedQueue makeQueue(String path) throws IOException {
+    return new GenericDistributedQueue(stateManager, path);
+  }
+
+  @Override
+  public void removeQueue(String path) throws IOException {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/LiveNodesSet.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/LiveNodesSet.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/LiveNodesSet.java
new file mode 100644
index 0000000..45cd66b
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/LiveNodesSet.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.solr.common.cloud.LiveNodesListener;
+
+/**
+ * This class represents a set of live nodes and allows adding listeners to track their state.
+ */
+public class LiveNodesSet {
+
+  private final Set<String> set = ConcurrentHashMap.newKeySet();
+  private final Set<LiveNodesListener> listeners = ConcurrentHashMap.newKeySet();
+
+  public Set<String> get() {
+    return Collections.unmodifiableSet(set);
+  }
+
+  public void registerLiveNodesListener(LiveNodesListener listener) {
+    listeners.add(listener);
+  }
+
+  public void removeLiveNodesListener(LiveNodesListener listener) {
+    listeners.remove(listener);
+  }
+
+  private void fireListeners(SortedSet<String> oldNodes, SortedSet<String> newNodes) {
+    for (LiveNodesListener listener : listeners) {
+      listener.onChange(oldNodes, newNodes);
+    }
+  }
+
+  public boolean isEmpty() {
+    return set.isEmpty();
+  }
+
+  public boolean contains(String id) {
+    return set.contains(id);
+  }
+
+  public synchronized boolean add(String id) {
+    if (set.contains(id)) {
+      return false;
+    }
+    TreeSet<String> oldNodes = new TreeSet<>(set);
+    set.add(id);
+    TreeSet<String> newNodes = new TreeSet<>(set);
+    fireListeners(oldNodes, newNodes);
+    return true;
+  }
+
+  public synchronized boolean addAll(Collection<String> nodes) {
+    TreeSet<String> oldNodes = new TreeSet<>(set);
+    boolean changed = set.addAll(nodes);
+    TreeSet<String> newNodes = new TreeSet<>(set);
+    if (changed) {
+      fireListeners(oldNodes, newNodes);
+    }
+    return changed;
+  }
+
+  public synchronized boolean remove(String id) {
+    if (!set.contains(id)) {
+      return false;
+    }
+    TreeSet<String> oldNodes = new TreeSet<>(set);
+    set.remove(id);
+    TreeSet<String> newNodes = new TreeSet<>(set);
+    fireListeners(oldNodes, newNodes);
+    return true;
+  }
+
+  public synchronized void clear() {
+    TreeSet<String> oldNodes = new TreeSet<>(set);
+    set.clear();
+    fireListeners(oldNodes, Collections.emptySortedSet());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
new file mode 100644
index 0000000..bde4b41
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
@@ -0,0 +1,607 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.DistributedQueueFactory;
+import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.NodeStateProvider;
+import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
+import org.apache.solr.client.solrj.impl.ClusterStateProvider;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.RequestWriter;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.RequestStatusState;
+import org.apache.solr.client.solrj.response.SolrResponseBase;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.cloud.autoscaling.AutoScalingHandler;
+import org.apache.solr.cloud.autoscaling.OverseerTriggerThread;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.rule.ImplicitSnitch;
+import org.apache.solr.common.params.CollectionAdminParams;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.CommonAdminParams;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.ContentStreamBase;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.IOUtils;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.ObjectCache;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.core.CloudConfig;
+import org.apache.solr.core.SolrResourceLoader;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.REQUESTID;
+
+/**
+ * Simulated {@link SolrCloudManager}.
+ */
+public class SimCloudManager implements SolrCloudManager {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final SimDistribStateManager stateManager;
+  private final SimClusterStateProvider clusterStateProvider;
+  private final SimNodeStateProvider nodeStateProvider;
+  private final AutoScalingHandler autoScalingHandler;
+  private final LiveNodesSet liveNodesSet = new LiveNodesSet();
+  private final DistributedQueueFactory queueFactory;
+  private final ObjectCache objectCache = new ObjectCache();
+  private TimeSource timeSource;
+
+  private final List<SolrInputDocument> systemColl = Collections.synchronizedList(new ArrayList<>());
+  private final ExecutorService simCloudManagerPool;
+  private final Map<String, AtomicLong> opCounts = new ConcurrentHashMap<>();
+
+
+  private Overseer.OverseerThread triggerThread;
+  private ThreadGroup triggerThreadGroup;
+  private SolrResourceLoader loader;
+
+  private static int nodeIdPort = 10000;
+
+  /**
+   * Create a simulated cluster. This cluster uses the following components:
+   * <ul>
+   *   <li>{@link SimDistribStateManager} with non-shared root node.</li>
+   *   <li>{@link SimClusterStateProvider}</li>
+   *   <li>{@link SimNodeStateProvider}, where node values are automatically initialized when using
+   *   {@link #simAddNode()} method.</li>
+   *   <li>{@link GenericDistributedQueueFactory} that uses {@link SimDistribStateManager} as its storage.</li>
+   *   <li>an instance of {@link AutoScalingHandler} for managing AutoScalingConfig.</li>
+   *   <li>an instance of {@link OverseerTriggerThread} for managing triggers and processing events.</li>
+   * </ul>
+   * @param timeSource time source to use.
+   */
+  public SimCloudManager(TimeSource timeSource) throws Exception {
+    this.stateManager = new SimDistribStateManager(SimDistribStateManager.createNewRootNode());
+    this.loader = new SolrResourceLoader();
+    // init common paths
+    stateManager.makePath(ZkStateReader.CLUSTER_STATE);
+    stateManager.makePath(ZkStateReader.CLUSTER_PROPS);
+    stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH);
+    stateManager.makePath(ZkStateReader.LIVE_NODES_ZKNODE);
+    stateManager.makePath(ZkStateReader.ROLES);
+    stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH);
+    stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH);
+    stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
+    stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
+
+    this.timeSource = timeSource != null ? timeSource : TimeSource.NANO_TIME;
+    this.clusterStateProvider = new SimClusterStateProvider(liveNodesSet, this);
+    this.nodeStateProvider = new SimNodeStateProvider(liveNodesSet, this.stateManager, this.clusterStateProvider, null);
+    this.queueFactory = new GenericDistributedQueueFactory(stateManager);
+    this.simCloudManagerPool = ExecutorUtil.newMDCAwareFixedThreadPool(200, new DefaultSolrThreadFactory("simCloudManagerPool"));
+    this.autoScalingHandler = new AutoScalingHandler(this, loader);
+    triggerThreadGroup = new ThreadGroup("Simulated Overseer autoscaling triggers");
+    OverseerTriggerThread trigger = new OverseerTriggerThread(loader, this,
+        new CloudConfig.CloudConfigBuilder("nonexistent", 0, "sim").build());
+    triggerThread = new Overseer.OverseerThread(triggerThreadGroup, trigger, "Simulated OverseerAutoScalingTriggerThread");
+    triggerThread.start();
+  }
+
+  // ---------- simulator setup methods -----------
+
+  /**
+   * Create a cluster with the specified number of nodes. Node metrics are pre-populated.
+   * @param numNodes number of nodes to create
+   * @param timeSource time source
+   * @return instance of simulated cluster
+   */
+  public static SimCloudManager createCluster(int numNodes, TimeSource timeSource) throws Exception {
+    SimCloudManager cloudManager = new SimCloudManager(timeSource);
+    for (int i = 1; i <= numNodes; i++) {
+      Map<String, Object> values = createNodeValues(null);
+//      if (i == 1) { // designated Overseer ?
+        //values.put(ImplicitSnitch.NODEROLE, "overseer");
+//      }
+      String nodeId = (String)values.get(ImplicitSnitch.NODE);
+      cloudManager.getSimClusterStateProvider().simAddNode(nodeId);
+      cloudManager.getSimNodeStateProvider().simSetNodeValues(nodeId, values);
+    }
+    return cloudManager;
+  }
+
+  /**
+   * Create a cluster initialized from the provided cluster state.
+   * @param initialState existing cluster state
+   * @param timeSource time source
+   * @return instance of simulated cluster with the same layout as the provided cluster state.
+   */
+  public static SimCloudManager createCluster(ClusterState initialState, TimeSource timeSource) throws Exception {
+    SimCloudManager cloudManager = new SimCloudManager(timeSource);
+    cloudManager.getSimClusterStateProvider().simSetClusterState(initialState);
+    for (String node : cloudManager.getClusterStateProvider().getLiveNodes()) {
+      cloudManager.getSimNodeStateProvider().simSetNodeValues(node, createNodeValues(node));
+    }
+    return cloudManager;
+  }
+
+  /**
+   * Create simulated node values (metrics) for a node.
+   * @param nodeName node name (eg. '127.0.0.1:10000_solr'). If null then a new node name will be
+   *                 created using sequentially increasing port number.
+   * @return node values
+   */
+  public static Map<String, Object> createNodeValues(String nodeName) {
+    Map<String, Object> values = new HashMap<>();
+    String host, nodeId;
+    int port;
+    if (nodeName == null) {
+      host = "127.0.0.1";
+      port = nodeIdPort++;
+      nodeId = host + ":" + port + "_solr";
+      values.put("ip_1", "127");
+      values.put("ip_2", "0");
+      values.put("ip_3", "0");
+      values.put("ip_4", "1");
+    } else {
+      String[] hostPortCtx = nodeName.split(":");
+      if (hostPortCtx.length != 2) {
+        throw new RuntimeException("Invalid nodeName " + nodeName);
+      }
+      host = hostPortCtx[0];
+      String[] portCtx = hostPortCtx[1].split("_");
+      if (portCtx.length != 2) {
+        throw new RuntimeException("Invalid port_context in nodeName " + nodeName);
+      }
+      port = Integer.parseInt(portCtx[0]);
+      nodeId = host + ":" + port + "_" + portCtx[1];
+      String[] ip = host.split("\\.");
+      if (ip.length == 4) {
+        values.put("ip_1", ip[0]);
+        values.put("ip_2", ip[1]);
+        values.put("ip_3", ip[2]);
+        values.put("ip_4", ip[3]);
+      }
+    }
+    values.put(ImplicitSnitch.HOST, host);
+    values.put(ImplicitSnitch.PORT, port);
+    values.put(ImplicitSnitch.NODE, nodeId);
+    values.put(ImplicitSnitch.CORES, 0);
+    values.put(ImplicitSnitch.DISK, 1000);
+    values.put(ImplicitSnitch.SYSLOADAVG, 1.0);
+    values.put(ImplicitSnitch.HEAPUSAGE, 123450000);
+    values.put("sysprop.java.version", System.getProperty("java.version"));
+    values.put("sysprop.java.vendor", System.getProperty("java.vendor"));
+    // fake some metrics expected in tests
+    values.put("metrics:solr.node:ADMIN./admin/authorization.clientErrors:count", 0);
+    values.put("metrics:solr.jvm:buffers.direct.Count", 0);
+    return values;
+  }
+
+  /**
+   * Get the instance of {@link SolrResourceLoader} that is used by the cluster components.
+   */
+  public SolrResourceLoader getLoader() {
+    return loader;
+  }
+
+  /**
+   * Add a new node and initialize its node values (metrics). The
+   * /live_nodes list is updated with the new node id.
+   * @return new node id
+   */
+  public String simAddNode() throws Exception {
+    Map<String, Object> values = createNodeValues(null);
+    String nodeId = (String)values.get(ImplicitSnitch.NODE);
+    clusterStateProvider.simAddNode(nodeId);
+    nodeStateProvider.simSetNodeValues(nodeId, values);
+    LOG.trace("-- added node " + nodeId);
+    return nodeId;
+  }
+
+  /**
+   * Remove a node from the cluster. This simulates a node lost scenario.
+   * Node id is removed from the /live_nodes list.
+   * @param nodeId node id
+   * @param withValues when true, remove also simulated node values. If false
+   *                   then node values are retained to later simulate
+   *                   a node that comes back up
+   */
+  public void simRemoveNode(String nodeId, boolean withValues) throws Exception {
+    clusterStateProvider.simRemoveNode(nodeId);
+    if (withValues) {
+      nodeStateProvider.simRemoveNodeValues(nodeId);
+    }
+    LOG.trace("-- removed node " + nodeId);
+  }
+
+  /**
+   * Remove a number of randomly selected nodes
+   * @param number number of nodes to remove
+   * @param withValues when true, remove also simulated node values. If false
+   *                   then node values are retained to later simulate
+   *                   a node that comes back up
+   * @param random random
+   */
+  public void simRemoveRandomNodes(int number, boolean withValues, Random random) throws Exception {
+    List<String> nodes = new ArrayList<>(liveNodesSet.get());
+    Collections.shuffle(nodes, random);
+    int count = Math.min(number, nodes.size());
+    for (int i = 0; i < count; i++) {
+      simRemoveNode(nodes.get(i), withValues);
+    }
+  }
+
+  /**
+   * Clear the (simulated) .system collection.
+   */
+  public void simClearSystemCollection() {
+    systemColl.clear();
+  }
+
+  /**
+   * Get the content of (simulated) .system collection.
+   * @return documents in the collection, in chronological order starting from the oldest.
+   */
+  public List<SolrInputDocument> simGetSystemCollection() {
+    return systemColl;
+  }
+
+  /**
+   * Get a {@link SolrClient} implementation where calls are forwarded to this
+   * instance of the cluster.
+   * @return simulated SolrClient.
+   */
+  public SolrClient simGetSolrClient() {
+    return new SolrClient() {
+      @Override
+      public NamedList<Object> request(SolrRequest request, String collection) throws SolrServerException, IOException {
+        SolrResponse rsp = SimCloudManager.this.request(request);
+        return rsp.getResponse();
+      }
+
+      @Override
+      public void close() throws IOException {
+
+      }
+    };
+  }
+
+  /**
+   * Simulate the effect of restarting Overseer leader - in this case this means restarting the
+   * OverseerTriggerThread and optionally killing a node.
+   * @param killNodeId optional nodeId to kill. If null then don't kill any node, just restart the thread
+   */
+  public void simRestartOverseer(String killNodeId) throws Exception {
+    LOG.info("=== Restarting OverseerTriggerThread and clearing object cache...");
+    triggerThread.interrupt();
+    IOUtils.closeQuietly(triggerThread);
+    if (killNodeId != null) {
+      simRemoveNode(killNodeId, true);
+    }
+    objectCache.clear();
+    OverseerTriggerThread trigger = new OverseerTriggerThread(loader, this,
+        new CloudConfig.CloudConfigBuilder("nonexistent", 0, "sim").build());
+    triggerThread = new Overseer.OverseerThread(triggerThreadGroup, trigger, "Simulated OverseerAutoScalingTriggerThread");
+    triggerThread.start();
+
+  }
+
+  /**
+   * Submit a task to execute in a thread pool.
+   * @param callable task to execute
+   * @return future to obtain results
+   */
+  public <T> Future<T> submit(Callable<T> callable) {
+    return simCloudManagerPool.submit(callable);
+  }
+
+  // ---------- type-safe methods to obtain simulator components ----------
+  public SimClusterStateProvider getSimClusterStateProvider() {
+    return clusterStateProvider;
+  }
+
+  public SimNodeStateProvider getSimNodeStateProvider() {
+    return nodeStateProvider;
+  }
+
+  public SimDistribStateManager getSimDistribStateManager() {
+    return stateManager;
+  }
+
+  public LiveNodesSet getLiveNodesSet() {
+    return liveNodesSet;
+  }
+
+  /**
+   * Get the number and type of operations processed by this cluster.
+   */
+  public Map<String, AtomicLong> simGetOpCounts() {
+    return opCounts;
+  }
+
+  /**
+   * Get the number of processed operations of a specified type.
+   * @param op operation name, eg. MOVEREPLICA
+   * @return number of operations
+   */
+  public long simGetOpCount(String op) {
+    AtomicLong count = opCounts.get(op);
+    return count != null ? count.get() : 0L;
+  }
+
+  // --------- interface methods -----------
+
+
+  @Override
+  public ObjectCache getObjectCache() {
+    return objectCache;
+  }
+
+  @Override
+  public TimeSource getTimeSource() {
+    return timeSource;
+  }
+
+  @Override
+  public ClusterStateProvider getClusterStateProvider() {
+    return clusterStateProvider;
+  }
+
+  @Override
+  public NodeStateProvider getNodeStateProvider() {
+    return nodeStateProvider;
+  }
+
+  @Override
+  public DistribStateManager getDistribStateManager() {
+    return stateManager;
+  }
+
+  @Override
+  public DistributedQueueFactory getDistributedQueueFactory() {
+    return queueFactory;
+  }
+
+  @Override
+  public SolrResponse request(SolrRequest req) throws IOException {
+    try {
+      Future<SolrResponse> rsp = submit(() -> simHandleSolrRequest(req));
+      return rsp.get();
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  private void incrementCount(String op) {
+    AtomicLong count = opCounts.computeIfAbsent(op, o -> new AtomicLong());
+    count.incrementAndGet();
+  }
+
+  /**
+   * Handler method for autoscaling requests. NOTE: only a specific subset of autoscaling requests is
+   * supported!
+   * @param req autoscaling request
+   * @return results
+   */
+  public SolrResponse simHandleSolrRequest(SolrRequest req) throws IOException, InterruptedException {
+    // pay the penalty for remote request, at least 5 ms
+    timeSource.sleep(5);
+
+    LOG.trace("--- got SolrRequest: " + req.getMethod() + " " + req.getPath() +
+        (req.getParams() != null ? " " + req.getParams().toQueryString() : ""));
+    if (req.getPath() != null && req.getPath().startsWith("/admin/autoscaling") ||
+        req.getPath().startsWith("/cluster/autoscaling")) {
+      incrementCount("autoscaling");
+      ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
+      params.set(CommonParams.PATH, req.getPath());
+      LocalSolrQueryRequest queryRequest = new LocalSolrQueryRequest(null, params);
+      RequestWriter.ContentWriter cw = req.getContentWriter("application/json");
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      cw.write(baos);
+      String payload = baos.toString("UTF-8");
+      LOG.trace("-- payload: {}", payload);
+      queryRequest.setContentStreams(Collections.singletonList(new ContentStreamBase.StringStream(payload)));
+      queryRequest.getContext().put("httpMethod", req.getMethod().toString());
+      SolrQueryResponse queryResponse = new SolrQueryResponse();
+      autoScalingHandler.handleRequest(queryRequest, queryResponse);
+      if (queryResponse.getException() != null) {
+        throw new IOException(queryResponse.getException());
+      }
+      SolrResponse rsp = new SolrResponseBase();
+      rsp.setResponse(queryResponse.getValues());
+      LOG.trace("-- response: {}", rsp);
+      return rsp;
+    }
+    if (req instanceof UpdateRequest) {
+      incrementCount("update");
+      // support only updates to the system collection
+      UpdateRequest ureq = (UpdateRequest)req;
+      if (ureq.getCollection() == null || !ureq.getCollection().equals(CollectionAdminParams.SYSTEM_COLL)) {
+        throw new UnsupportedOperationException("Only .system updates are supported but got: " + req);
+      }
+      List<SolrInputDocument> docs = ureq.getDocuments();
+      if (docs != null) {
+        systemColl.addAll(docs);
+      }
+      return new UpdateResponse();
+    }
+    // support only a specific subset of collection admin ops
+    if (!(req instanceof CollectionAdminRequest)) {
+      throw new UnsupportedOperationException("Only some CollectionAdminRequest-s are supported: " + req.getClass().getName());
+    }
+    SolrParams params = req.getParams();
+    String a = params.get(CoreAdminParams.ACTION);
+    SolrResponse rsp = new SolrResponseBase();
+    rsp.setResponse(new NamedList<>());
+    if (a != null) {
+      CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(a);
+      if (action == null) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown action: " + a);
+      }
+      LOG.debug("Invoking Collection Action :{} with params {}", action.toLower(), req.getParams().toQueryString());
+      NamedList results = new NamedList();
+      rsp.setResponse(results);
+      incrementCount(action.name());
+      switch (action) {
+        case REQUESTSTATUS:
+          // we complete all async ops immediately
+          String requestId = req.getParams().get(REQUESTID);
+          SimpleOrderedMap<String> status = new SimpleOrderedMap<>();
+          status.add("state", RequestStatusState.COMPLETED.getKey());
+          status.add("msg", "found [" + requestId + "] in completed tasks");
+          results.add("status", status);
+          results.add("success", "");
+          // ExecutePlanAction expects a specific response class
+          rsp = new CollectionAdminRequest.RequestStatusResponse();
+          rsp.setResponse(results);
+          break;
+        case DELETESTATUS:
+          requestId = req.getParams().get(REQUESTID);
+          results.add("status", "successfully removed stored response for [" + requestId + "]");
+          results.add("success", "");
+          break;
+        case CREATE:
+          try {
+            clusterStateProvider.simCreateCollection(new ZkNodeProps(req.getParams().toNamedList().asMap(10)), results);
+          } catch (Exception e) {
+            throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
+          }
+          break;
+        case DELETE:
+          clusterStateProvider.simDeleteCollection(req.getParams().get(CommonParams.NAME),
+              req.getParams().get(CommonAdminParams.ASYNC), results);
+          break;
+        case LIST:
+          results.add("collections", clusterStateProvider.simListCollections());
+          break;
+        case ADDREPLICA:
+          try {
+            clusterStateProvider.simAddReplica(new ZkNodeProps(req.getParams().toNamedList().asMap(10)), results);
+          } catch (Exception e) {
+            throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
+          }
+          break;
+        case MOVEREPLICA:
+          try {
+            clusterStateProvider.simMoveReplica(new ZkNodeProps(req.getParams().toNamedList().asMap(10)), results);
+          } catch (Exception e) {
+            throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
+          }
+          break;
+        case OVERSEERSTATUS:
+          if (req.getParams().get(CommonAdminParams.ASYNC) != null) {
+            results.add(REQUESTID, req.getParams().get(CommonAdminParams.ASYNC));
+          }
+          if (!liveNodesSet.get().isEmpty()) {
+            results.add("leader", liveNodesSet.get().iterator().next());
+          }
+          results.add("overseer_queue_size", 0);
+          results.add("overseer_work_queue_size", 0);
+          results.add("overseer_collection_queue_size", 0);
+          results.add("success", "");
+          break;
+        case ADDROLE:
+          nodeStateProvider.simAddNodeValue(req.getParams().get("node"), "nodeRole", req.getParams().get("role"));
+          break;
+        case CREATESHARD:
+          try {
+            clusterStateProvider.simCreateShard(new ZkNodeProps(req.getParams().toNamedList().asMap(10)), results);
+          } catch (Exception e) {
+            throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
+          }
+          break;
+        case SPLITSHARD:
+          try {
+            clusterStateProvider.simSplitShard(new ZkNodeProps(req.getParams().toNamedList().asMap(10)), results);
+          } catch (Exception e) {
+            throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
+          }
+          break;
+        default:
+          throw new UnsupportedOperationException("Unsupported collection admin action=" + action + " in request: " + req.getParams());
+      }
+    } else {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "action is a required param in request: " + req.getParams());
+    }
+    return rsp;
+
+  }
+
+  /**
+   * HTTP requests are not supported by this implementation.
+   */
+  @Override
+  public byte[] httpRequest(String url, SolrRequest.METHOD method, Map<String, String> headers, String payload, int timeout, boolean followRedirects) throws IOException {
+    throw new UnsupportedOperationException("general HTTP requests are not supported yet");
+  }
+
+  @Override
+  public void close() throws IOException {
+    IOUtils.closeQuietly(clusterStateProvider);
+    IOUtils.closeQuietly(nodeStateProvider);
+    IOUtils.closeQuietly(stateManager);
+    triggerThread.interrupt();
+    IOUtils.closeQuietly(triggerThread);
+    IOUtils.closeQuietly(objectCache);
+    simCloudManagerPool.shutdownNow();
+  }
+}