You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/12/14 11:56:53 UTC
[6/9] lucene-solr:master: SOLR-11285: Simulation framework for
autoscaling.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/GenericDistributedQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/GenericDistributedQueue.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/GenericDistributedQueue.java
new file mode 100644
index 0000000..5d7aa4d
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/GenericDistributedQueue.java
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Predicate;
+
+import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.solr.client.solrj.cloud.DistributedQueue;
+import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
+import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
+import org.apache.solr.cloud.OverseerTaskQueue;
+import org.apache.solr.cloud.Stats;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.util.Pair;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A distributed queue that uses {@link DistribStateManager} as the underlying distributed store.
+ * Implementation based on {@link org.apache.solr.cloud.ZkDistributedQueue}
+ */
+public class GenericDistributedQueue implements DistributedQueue {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ static final String PREFIX = "qn-";
+
+ /**
+ * Theory of operation:
+ * <p>
+ * Under ordinary circumstances we neither watch nor poll for children in ZK.
+ * Instead we keep an in-memory list of known child names. When the in-memory
+ * list is exhausted, we then fetch from ZK.
+ * <p>
+ * We only bother setting a child watcher when the queue has no children in ZK.
+ */
+ private static final Object _IMPLEMENTATION_NOTES = null;
+
+ final String dir;
+
+ final DistribStateManager stateManager;
+
+ final Stats stats;
+
+ /**
+ * A lock that guards all of the mutable state that follows.
+ */
+ private final ReentrantLock updateLock = new ReentrantLock();
+
+ /**
+ * Contains the last set of children fetched from ZK. Elements are removed from the head of
+ * this in-memory set as they are consumed from the queue. Due to the distributed nature
+ * of the queue, elements may appear in this set whose underlying nodes have been consumed in ZK.
+ * Therefore, methods like {@link #peek()} have to double-check actual node existence, and methods
+ * like {@link #poll()} must resolve any races by attempting to delete the underlying node.
+ */
+ private TreeSet<String> knownChildren = new TreeSet<>();
+
+ /**
+ * Used to wait on ZK changes to the child list; you must hold {@link #updateLock} before waiting on this condition.
+ */
+ private final Condition changed = updateLock.newCondition();
+
+ private boolean isDirty = true;
+
+ private int watcherCount = 0;
+
+ private final int maxQueueSize;
+
+ /**
+ * If {@link #maxQueueSize} is set, the number of items we can queue without rechecking the server.
+ */
+ private final AtomicInteger offerPermits = new AtomicInteger(0);
+
+ public GenericDistributedQueue(DistribStateManager stateManager, String dir) {
+ this(stateManager, dir, new Stats());
+ }
+
+ public GenericDistributedQueue(DistribStateManager stateManager, String dir, Stats stats) {
+ this(stateManager, dir, stats, 0);
+ }
+
+ public GenericDistributedQueue(DistribStateManager stateManager, String dir, Stats stats, int maxQueueSize) {
+ this.dir = dir;
+
+ try {
+ if (!stateManager.hasData(dir)) {
+ try {
+ stateManager.makePath(dir);
+ } catch (AlreadyExistsException e) {
+ // ignore
+ }
+ }
+ } catch (IOException | KeeperException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ }
+
+ this.stateManager = stateManager;
+ this.stats = stats;
+ this.maxQueueSize = maxQueueSize;
+ }
+
+ /**
+ * Returns the data at the first element of the queue, or null if the queue is
+ * empty.
+ *
+ * @return data at the first element of the queue, or null.
+ */
+ @Override
+ public byte[] peek() throws Exception {
+ Timer.Context time = stats.time(dir + "_peek");
+ try {
+ return firstElement();
+ } finally {
+ time.stop();
+ }
+ }
+
+ /**
+ * Returns the data at the first element of the queue, or null if the queue is
+ * empty and block is false.
+ *
+ * @param block if true, blocks until an element enters the queue
+ * @return data at the first element of the queue, or null.
+ */
+ @Override
+ public byte[] peek(boolean block) throws Exception {
+ return block ? peek(Long.MAX_VALUE) : peek();
+ }
+
+ /**
+ * Returns the data at the first element of the queue, or null if the queue is
+ * empty after wait ms.
+ *
+ * @param wait max wait time in ms.
+ * @return data at the first element of the queue, or null.
+ */
+ @Override
+ public byte[] peek(long wait) throws Exception {
+ Preconditions.checkArgument(wait > 0);
+ Timer.Context time;
+ if (wait == Long.MAX_VALUE) {
+ time = stats.time(dir + "_peek_wait_forever");
+ } else {
+ time = stats.time(dir + "_peek_wait" + wait);
+ }
+ updateLock.lockInterruptibly();
+ try {
+ long waitNanos = TimeUnit.MILLISECONDS.toNanos(wait);
+ while (waitNanos > 0) {
+ byte[] result = firstElement();
+ if (result != null) {
+ return result;
+ }
+ waitNanos = changed.awaitNanos(waitNanos);
+ }
+ return null;
+ } finally {
+ updateLock.unlock();
+ time.stop();
+ }
+ }
+
+ /**
+ * Attempts to remove the head of the queue and return it. Returns null if the
+ * queue is empty.
+ *
+ * @return Head of the queue or null.
+ */
+ @Override
+ public byte[] poll() throws Exception {
+ Timer.Context time = stats.time(dir + "_poll");
+ try {
+ return removeFirst();
+ } finally {
+ time.stop();
+ }
+ }
+
+ /**
+ * Attempts to remove the head of the queue and return it.
+ *
+ * @return The former head of the queue
+ */
+ @Override
+ public byte[] remove() throws Exception {
+ Timer.Context time = stats.time(dir + "_remove");
+ try {
+ byte[] result = removeFirst();
+ if (result == null) {
+ throw new NoSuchElementException();
+ }
+ return result;
+ } finally {
+ time.stop();
+ }
+ }
+
+ public void remove(Collection<String> paths) throws Exception {
+ if (paths.isEmpty()) return;
+ List<Op> ops = new ArrayList<>();
+ for (String path : paths) {
+ ops.add(Op.delete(dir + "/" + path, -1));
+ }
+ for (int from = 0; from < ops.size(); from += 1000) {
+ int to = Math.min(from + 1000, ops.size());
+ if (from < to) {
+ try {
+ stateManager.multi(ops.subList(from, to));
+ } catch (NoSuchElementException e) {
+ // don't know which nodes are not exist, so try to delete one by one node
+ for (int j = from; j < to; j++) {
+ try {
+ stateManager.removeData(ops.get(j).getPath(), -1);
+ } catch (NoSuchElementException e2) {
+ LOG.debug("Can not remove node which is not exist : " + ops.get(j).getPath());
+ }
+ }
+ }
+ }
+ }
+
+ int cacheSizeBefore = knownChildren.size();
+ knownChildren.removeAll(paths);
+ if (cacheSizeBefore - paths.size() == knownChildren.size() && knownChildren.size() != 0) {
+ stats.setQueueLength(knownChildren.size());
+ } else {
+ // There are elements get deleted but not present in the cache,
+ // the cache seems not valid anymore
+ knownChildren.clear();
+ isDirty = true;
+ }
+ }
+
+ /**
+ * Removes the head of the queue and returns it, blocks until it succeeds.
+ *
+ * @return The former head of the queue
+ */
+ @Override
+ public byte[] take() throws Exception {
+ // Same as for element. Should refactor this.
+ Timer.Context timer = stats.time(dir + "_take");
+ updateLock.lockInterruptibly();
+ try {
+ while (true) {
+ byte[] result = removeFirst();
+ if (result != null) {
+ return result;
+ }
+ changed.await();
+ }
+ } finally {
+ updateLock.unlock();
+ timer.stop();
+ }
+ }
+
+ /**
+ * Inserts data into queue. If there are no other queue consumers, the offered element
+ * will be immediately visible when this method returns.
+ */
+ @Override
+ public void offer(byte[] data) throws Exception {
+ Timer.Context time = stats.time(dir + "_offer");
+ try {
+ while (true) {
+ try {
+ if (maxQueueSize > 0) {
+ if (offerPermits.get() <= 0 || offerPermits.getAndDecrement() <= 0) {
+ // If a max queue size is set, check it before creating a new queue item.
+ if (!stateManager.hasData(dir)) {
+ // jump to the code below, which tries to create dir if it doesn't exist
+ throw new NoSuchElementException();
+ }
+ List<String> children = stateManager.listData(dir);
+ int remainingCapacity = maxQueueSize - children.size();
+ if (remainingCapacity <= 0) {
+ throw new IllegalStateException("queue is full");
+ }
+
+ // Allow this client to push up to 1% of the remaining queue capacity without rechecking.
+ offerPermits.set(remainingCapacity / 100);
+ }
+ }
+
+ // Explicitly set isDirty here so that synchronous same-thread calls behave as expected.
+ // This will get set again when the watcher actually fires, but that's ok.
+ stateManager.createData(dir + "/" + PREFIX, data, CreateMode.PERSISTENT_SEQUENTIAL);
+ isDirty = true;
+ return;
+ } catch (NoSuchElementException e) {
+ try {
+ stateManager.createData(dir, new byte[0], CreateMode.PERSISTENT);
+ } catch (NoSuchElementException ne) {
+ // someone created it
+ }
+ }
+ }
+ } finally {
+ time.stop();
+ }
+ }
+
+ public Stats getZkStats() {
+ return stats;
+ }
+
+ @Override
+ public Map<String, Object> getStats() {
+ if (stats == null) {
+ return Collections.emptyMap();
+ }
+ Map<String, Object> res = new HashMap<>();
+ res.put("queueLength", stats.getQueueLength());
+ final Map<String, Object> statsMap = new HashMap<>();
+ res.put("stats", statsMap);
+ stats.getStats().forEach((op, stat) -> {
+ final Map<String, Object> statMap = new HashMap<>();
+ statMap.put("success", stat.success.get());
+ statMap.put("errors", stat.errors.get());
+ final List<Map<String, Object>> failed = new ArrayList<>(stat.failureDetails.size());
+ statMap.put("failureDetails", failed);
+ stat.failureDetails.forEach(failedOp -> {
+ Map<String, Object> fo = new HashMap<>();
+ fo.put("req", failedOp.req);
+ fo.put("resp", failedOp.resp);
+ });
+ statsMap.put(op, statMap);
+ });
+ return res;
+ }
+
+ /**
+ * Returns the name if the first known child node, or {@code null} if the queue is empty.
+ * This is the only place {@link #knownChildren} is ever updated!
+ * The caller must double check that the actual node still exists, since the in-memory
+ * list is inherently stale.
+ */
+ private String firstChild(boolean remove, boolean refetchIfDirty) throws Exception {
+ updateLock.lockInterruptibly();
+ try {
+ // We always return from cache first, the cache will be cleared if the node is not exist
+ if (!knownChildren.isEmpty() && !(isDirty && refetchIfDirty)) {
+ return remove ? knownChildren.pollFirst() : knownChildren.first();
+ }
+
+ if (!isDirty && knownChildren.isEmpty()) {
+ return null;
+ }
+
+ // Dirty, try to fetch an updated list of children from ZK.
+ // Only set a new watcher if there isn't already a watcher.
+ ChildWatcher newWatcher = (watcherCount == 0) ? new ChildWatcher() : null;
+ knownChildren = fetchZkChildren(newWatcher);
+ if (newWatcher != null) {
+ watcherCount++; // watcher was successfully set
+ }
+ isDirty = false;
+ if (knownChildren.isEmpty()) {
+ return null;
+ }
+ changed.signalAll();
+ return remove ? knownChildren.pollFirst() : knownChildren.first();
+ } finally {
+ updateLock.unlock();
+ }
+ }
+
+ /**
+ * Return the current set of children from ZK; does not change internal state.
+ */
+ TreeSet<String> fetchZkChildren(Watcher watcher) throws Exception {
+ while (true) {
+ try {
+ TreeSet<String> orderedChildren = new TreeSet<>();
+
+ List<String> childNames = stateManager.listData(dir, watcher);
+ stats.setQueueLength(childNames.size());
+ for (String childName : childNames) {
+ // Check format
+ if (!childName.regionMatches(0, PREFIX, 0, PREFIX.length())) {
+ LOG.debug("Found child node with improper name: " + childName);
+ continue;
+ }
+ orderedChildren.add(childName);
+ }
+ return orderedChildren;
+ } catch (NoSuchElementException e) {
+ try {
+ stateManager.makePath(dir);
+ } catch (AlreadyExistsException e2) {
+ // ignore
+ }
+ // go back to the loop and try again
+ }
+ }
+ }
+
+ /**
+ * Return the currently-known set of elements, using child names from memory. If no children are found, or no
+ * children pass {@code acceptFilter}, waits up to {@code waitMillis} for at least one child to become available.
+ * <p>
+ * Package-private to support {@link OverseerTaskQueue} specifically.</p>
+ */
+ @Override
+ public Collection<Pair<String, byte[]>> peekElements(int max, long waitMillis, Predicate<String> acceptFilter) throws Exception {
+ List<String> foundChildren = new ArrayList<>();
+ long waitNanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
+ boolean first = true;
+ while (true) {
+ // Trigger a refresh, but only force it if this is not the first iteration.
+ firstChild(false, !first);
+
+ updateLock.lockInterruptibly();
+ try {
+ for (String child : knownChildren) {
+ if (acceptFilter.test(child)) {
+ foundChildren.add(child);
+ }
+ }
+ if (!foundChildren.isEmpty()) {
+ break;
+ }
+ if (waitNanos <= 0) {
+ break;
+ }
+
+ // If this is our first time through, force a refresh before waiting.
+ if (first) {
+ first = false;
+ continue;
+ }
+
+ waitNanos = changed.awaitNanos(waitNanos);
+ } finally {
+ updateLock.unlock();
+ }
+
+ if (!foundChildren.isEmpty()) {
+ break;
+ }
+ }
+
+ // Technically we could restart the method if we fail to actually obtain any valid children
+ // from ZK, but this is a super rare case, and the latency of the ZK fetches would require
+ // much more sophisticated waitNanos tracking.
+ List<Pair<String, byte[]>> result = new ArrayList<>();
+ for (String child : foundChildren) {
+ if (result.size() >= max) {
+ break;
+ }
+ try {
+ VersionedData data = stateManager.getData(dir + "/" + child);
+ result.add(new Pair<>(child, data.getData()));
+ } catch (NoSuchElementException e) {
+ // Another client deleted the node first, remove the in-memory and continue.
+ updateLock.lockInterruptibly();
+ try {
+ knownChildren.remove(child);
+ } finally {
+ updateLock.unlock();
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Return the head of the queue without modifying the queue.
+ *
+ * @return the data at the head of the queue.
+ */
+ private byte[] firstElement() throws Exception {
+ while (true) {
+ String firstChild = firstChild(false, false);
+ if (firstChild == null) {
+ return null;
+ }
+ try {
+ VersionedData data = stateManager.getData(dir + "/" + firstChild);
+ return data != null ? data.getData() : null;
+ } catch (NoSuchElementException e) {
+ // Another client deleted the node first, remove the in-memory and retry.
+ updateLock.lockInterruptibly();
+ try {
+ // Efficient only for single-consumer
+ knownChildren.clear();
+ isDirty = true;
+ } finally {
+ updateLock.unlock();
+ }
+ }
+ }
+ }
+
+ private byte[] removeFirst() throws Exception {
+ while (true) {
+ String firstChild = firstChild(true, false);
+ if (firstChild == null) {
+ return null;
+ }
+ try {
+ String path = dir + "/" + firstChild;
+ VersionedData result = stateManager.getData(path);
+ stateManager.removeData(path, -1);
+ stats.setQueueLength(knownChildren.size());
+ return result.getData();
+ } catch (NoSuchElementException e) {
+ // Another client deleted the node first, remove the in-memory and retry.
+ updateLock.lockInterruptibly();
+ try {
+ // Efficient only for single-consumer
+ knownChildren.clear();
+ isDirty = true;
+ } finally {
+ updateLock.unlock();
+ }
+ }
+ }
+ }
+
+ @VisibleForTesting int watcherCount() throws InterruptedException {
+ updateLock.lockInterruptibly();
+ try {
+ return watcherCount;
+ } finally {
+ updateLock.unlock();
+ }
+ }
+
+ @VisibleForTesting boolean isDirty() throws InterruptedException {
+ updateLock.lockInterruptibly();
+ try {
+ return isDirty;
+ } finally {
+ updateLock.unlock();
+ }
+ }
+
+ @VisibleForTesting class ChildWatcher implements Watcher {
+
+ @Override
+ public void process(WatchedEvent event) {
+ // session events are not change events, and do not remove the watcher; except for Expired
+ if (Event.EventType.None.equals(event.getType()) && !Event.KeeperState.Expired.equals(event.getState())) {
+ return;
+ }
+ updateLock.lock();
+ try {
+ isDirty = true;
+ watcherCount--;
+ // optimistically signal any waiters that the queue may not be empty now, so they can wake up and retry
+ changed.signalAll();
+ } finally {
+ updateLock.unlock();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/GenericDistributedQueueFactory.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/GenericDistributedQueueFactory.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/GenericDistributedQueueFactory.java
new file mode 100644
index 0000000..12b4af8
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/GenericDistributedQueueFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.io.IOException;
+
+import org.apache.solr.client.solrj.cloud.DistributedQueue;
+import org.apache.solr.client.solrj.cloud.DistributedQueueFactory;
+import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
+
+/**
+ * Factory for {@link GenericDistributedQueue}.
+ */
+public class GenericDistributedQueueFactory implements DistributedQueueFactory {
+
+ private final DistribStateManager stateManager;
+
+ public GenericDistributedQueueFactory(DistribStateManager stateManager) {
+ this.stateManager = stateManager;
+ }
+
+ @Override
+ public DistributedQueue makeQueue(String path) throws IOException {
+ return new GenericDistributedQueue(stateManager, path);
+ }
+
+ @Override
+ public void removeQueue(String path) throws IOException {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/LiveNodesSet.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/LiveNodesSet.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/LiveNodesSet.java
new file mode 100644
index 0000000..45cd66b
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/LiveNodesSet.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.solr.common.cloud.LiveNodesListener;
+
+/**
+ * This class represents a set of live nodes and allows adding listeners to track their state.
+ */
+public class LiveNodesSet {
+
+ private final Set<String> set = ConcurrentHashMap.newKeySet();
+ private final Set<LiveNodesListener> listeners = ConcurrentHashMap.newKeySet();
+
+ public Set<String> get() {
+ return Collections.unmodifiableSet(set);
+ }
+
+ public void registerLiveNodesListener(LiveNodesListener listener) {
+ listeners.add(listener);
+ }
+
+ public void removeLiveNodesListener(LiveNodesListener listener) {
+ listeners.remove(listener);
+ }
+
+ private void fireListeners(SortedSet<String> oldNodes, SortedSet<String> newNodes) {
+ for (LiveNodesListener listener : listeners) {
+ listener.onChange(oldNodes, newNodes);
+ }
+ }
+
+ public boolean isEmpty() {
+ return set.isEmpty();
+ }
+
+ public boolean contains(String id) {
+ return set.contains(id);
+ }
+
+ public synchronized boolean add(String id) {
+ if (set.contains(id)) {
+ return false;
+ }
+ TreeSet<String> oldNodes = new TreeSet<>(set);
+ set.add(id);
+ TreeSet<String> newNodes = new TreeSet<>(set);
+ fireListeners(oldNodes, newNodes);
+ return true;
+ }
+
+ public synchronized boolean addAll(Collection<String> nodes) {
+ TreeSet<String> oldNodes = new TreeSet<>(set);
+ boolean changed = set.addAll(nodes);
+ TreeSet<String> newNodes = new TreeSet<>(set);
+ if (changed) {
+ fireListeners(oldNodes, newNodes);
+ }
+ return changed;
+ }
+
+ public synchronized boolean remove(String id) {
+ if (!set.contains(id)) {
+ return false;
+ }
+ TreeSet<String> oldNodes = new TreeSet<>(set);
+ set.remove(id);
+ TreeSet<String> newNodes = new TreeSet<>(set);
+ fireListeners(oldNodes, newNodes);
+ return true;
+ }
+
+ public synchronized void clear() {
+ TreeSet<String> oldNodes = new TreeSet<>(set);
+ set.clear();
+ fireListeners(oldNodes, Collections.emptySortedSet());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
new file mode 100644
index 0000000..bde4b41
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
@@ -0,0 +1,607 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.DistributedQueueFactory;
+import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.NodeStateProvider;
+import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
+import org.apache.solr.client.solrj.impl.ClusterStateProvider;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.RequestWriter;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.RequestStatusState;
+import org.apache.solr.client.solrj.response.SolrResponseBase;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.cloud.autoscaling.AutoScalingHandler;
+import org.apache.solr.cloud.autoscaling.OverseerTriggerThread;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.rule.ImplicitSnitch;
+import org.apache.solr.common.params.CollectionAdminParams;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.CommonAdminParams;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.ContentStreamBase;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.IOUtils;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.ObjectCache;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.core.CloudConfig;
+import org.apache.solr.core.SolrResourceLoader;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.REQUESTID;
+
+/**
+ * Simulated {@link SolrCloudManager}.
+ */
+public class SimCloudManager implements SolrCloudManager {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final SimDistribStateManager stateManager;
+ private final SimClusterStateProvider clusterStateProvider;
+ private final SimNodeStateProvider nodeStateProvider;
+ private final AutoScalingHandler autoScalingHandler;
+ private final LiveNodesSet liveNodesSet = new LiveNodesSet();
+ private final DistributedQueueFactory queueFactory;
+ private final ObjectCache objectCache = new ObjectCache();
+ private TimeSource timeSource;
+
+ private final List<SolrInputDocument> systemColl = Collections.synchronizedList(new ArrayList<>());
+ private final ExecutorService simCloudManagerPool;
+ private final Map<String, AtomicLong> opCounts = new ConcurrentHashMap<>();
+
+
+ private Overseer.OverseerThread triggerThread;
+ private ThreadGroup triggerThreadGroup;
+ private SolrResourceLoader loader;
+
+ private static int nodeIdPort = 10000;
+
+ /**
+ * Create a simulated cluster. This cluster uses the following components:
+ * <ul>
+ * <li>{@link SimDistribStateManager} with non-shared root node.</li>
+ * <li>{@link SimClusterStateProvider}</li>
+ * <li>{@link SimNodeStateProvider}, where node values are automatically initialized when using
+ * {@link #simAddNode()} method.</li>
+ * <li>{@link GenericDistributedQueueFactory} that uses {@link SimDistribStateManager} as its storage.</li>
+ * <li>an instance of {@link AutoScalingHandler} for managing AutoScalingConfig.</li>
+ * <li>an instance of {@link OverseerTriggerThread} for managing triggers and processing events.</li>
+ * </ul>
+ * @param timeSource time source to use.
+ */
+ public SimCloudManager(TimeSource timeSource) throws Exception {
+ this.stateManager = new SimDistribStateManager(SimDistribStateManager.createNewRootNode());
+ this.loader = new SolrResourceLoader();
+ // init common paths
+ stateManager.makePath(ZkStateReader.CLUSTER_STATE);
+ stateManager.makePath(ZkStateReader.CLUSTER_PROPS);
+ stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH);
+ stateManager.makePath(ZkStateReader.LIVE_NODES_ZKNODE);
+ stateManager.makePath(ZkStateReader.ROLES);
+ stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH);
+ stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH);
+ stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
+ stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
+
+ this.timeSource = timeSource != null ? timeSource : TimeSource.NANO_TIME;
+ this.clusterStateProvider = new SimClusterStateProvider(liveNodesSet, this);
+ this.nodeStateProvider = new SimNodeStateProvider(liveNodesSet, this.stateManager, this.clusterStateProvider, null);
+ this.queueFactory = new GenericDistributedQueueFactory(stateManager);
+ this.simCloudManagerPool = ExecutorUtil.newMDCAwareFixedThreadPool(200, new DefaultSolrThreadFactory("simCloudManagerPool"));
+ this.autoScalingHandler = new AutoScalingHandler(this, loader);
+ triggerThreadGroup = new ThreadGroup("Simulated Overseer autoscaling triggers");
+ OverseerTriggerThread trigger = new OverseerTriggerThread(loader, this,
+ new CloudConfig.CloudConfigBuilder("nonexistent", 0, "sim").build());
+ triggerThread = new Overseer.OverseerThread(triggerThreadGroup, trigger, "Simulated OverseerAutoScalingTriggerThread");
+ triggerThread.start();
+ }
+
+ // ---------- simulator setup methods -----------
+
+ /**
+ * Create a cluster with the specified number of nodes. Node metrics are pre-populated.
+ * @param numNodes number of nodes to create
+ * @param timeSource time source
+ * @return instance of simulated cluster
+ */
+ public static SimCloudManager createCluster(int numNodes, TimeSource timeSource) throws Exception {
+ SimCloudManager cloudManager = new SimCloudManager(timeSource);
+ for (int i = 1; i <= numNodes; i++) {
+ Map<String, Object> values = createNodeValues(null);
+// if (i == 1) { // designated Overseer ?
+ //values.put(ImplicitSnitch.NODEROLE, "overseer");
+// }
+ String nodeId = (String)values.get(ImplicitSnitch.NODE);
+ cloudManager.getSimClusterStateProvider().simAddNode(nodeId);
+ cloudManager.getSimNodeStateProvider().simSetNodeValues(nodeId, values);
+ }
+ return cloudManager;
+ }
+
+ /**
+ * Create a cluster initialized from the provided cluster state.
+ * @param initialState existing cluster state
+ * @param timeSource time source
+ * @return instance of simulated cluster with the same layout as the provided cluster state.
+ */
+ public static SimCloudManager createCluster(ClusterState initialState, TimeSource timeSource) throws Exception {
+ SimCloudManager cloudManager = new SimCloudManager(timeSource);
+ cloudManager.getSimClusterStateProvider().simSetClusterState(initialState);
+ for (String node : cloudManager.getClusterStateProvider().getLiveNodes()) {
+ cloudManager.getSimNodeStateProvider().simSetNodeValues(node, createNodeValues(node));
+ }
+ return cloudManager;
+ }
+
+ /**
+ * Create simulated node values (metrics) for a node.
+ * @param nodeName node name (eg. '127.0.0.1:10000_solr'). If null then a new node name will be
+ * created using sequentially increasing port number.
+ * @return node values
+ */
+ public static Map<String, Object> createNodeValues(String nodeName) {
+ Map<String, Object> values = new HashMap<>();
+ String host, nodeId;
+ int port;
+ if (nodeName == null) {
+ host = "127.0.0.1";
+ port = nodeIdPort++;
+ nodeId = host + ":" + port + "_solr";
+ values.put("ip_1", "127");
+ values.put("ip_2", "0");
+ values.put("ip_3", "0");
+ values.put("ip_4", "1");
+ } else {
+ String[] hostPortCtx = nodeName.split(":");
+ if (hostPortCtx.length != 2) {
+ throw new RuntimeException("Invalid nodeName " + nodeName);
+ }
+ host = hostPortCtx[0];
+ String[] portCtx = hostPortCtx[1].split("_");
+ if (portCtx.length != 2) {
+ throw new RuntimeException("Invalid port_context in nodeName " + nodeName);
+ }
+ port = Integer.parseInt(portCtx[0]);
+ nodeId = host + ":" + port + "_" + portCtx[1];
+ String[] ip = host.split("\\.");
+ if (ip.length == 4) {
+ values.put("ip_1", ip[0]);
+ values.put("ip_2", ip[1]);
+ values.put("ip_3", ip[2]);
+ values.put("ip_4", ip[3]);
+ }
+ }
+ values.put(ImplicitSnitch.HOST, host);
+ values.put(ImplicitSnitch.PORT, port);
+ values.put(ImplicitSnitch.NODE, nodeId);
+ values.put(ImplicitSnitch.CORES, 0);
+ values.put(ImplicitSnitch.DISK, 1000);
+ values.put(ImplicitSnitch.SYSLOADAVG, 1.0);
+ values.put(ImplicitSnitch.HEAPUSAGE, 123450000);
+ values.put("sysprop.java.version", System.getProperty("java.version"));
+ values.put("sysprop.java.vendor", System.getProperty("java.vendor"));
+ // fake some metrics expected in tests
+ values.put("metrics:solr.node:ADMIN./admin/authorization.clientErrors:count", 0);
+ values.put("metrics:solr.jvm:buffers.direct.Count", 0);
+ return values;
+ }
+
+ /**
+ * Get the instance of {@link SolrResourceLoader} that is used by the cluster components.
+ */
+ public SolrResourceLoader getLoader() {
+ return loader;
+ }
+
+ /**
+ * Add a new node and initialize its node values (metrics). The
+ * /live_nodes list is updated with the new node id.
+ * @return new node id
+ */
+ public String simAddNode() throws Exception {
+ Map<String, Object> values = createNodeValues(null);
+ String nodeId = (String)values.get(ImplicitSnitch.NODE);
+ clusterStateProvider.simAddNode(nodeId);
+ nodeStateProvider.simSetNodeValues(nodeId, values);
+ LOG.trace("-- added node " + nodeId);
+ return nodeId;
+ }
+
+ /**
+ * Remove a node from the cluster. This simulates a node lost scenario.
+ * Node id is removed from the /live_nodes list.
+ * @param nodeId node id
+ * @param withValues when true, remove also simulated node values. If false
+ * then node values are retained to later simulate
+ * a node that comes back up
+ */
+ public void simRemoveNode(String nodeId, boolean withValues) throws Exception {
+ clusterStateProvider.simRemoveNode(nodeId);
+ if (withValues) {
+ nodeStateProvider.simRemoveNodeValues(nodeId);
+ }
+ LOG.trace("-- removed node " + nodeId);
+ }
+
+ /**
+ * Remove a number of randomly selected nodes
+ * @param number number of nodes to remove
+ * @param withValues when true, remove also simulated node values. If false
+ * then node values are retained to later simulate
+ * a node that comes back up
+ * @param random random
+ */
+ public void simRemoveRandomNodes(int number, boolean withValues, Random random) throws Exception {
+ List<String> nodes = new ArrayList<>(liveNodesSet.get());
+ Collections.shuffle(nodes, random);
+ int count = Math.min(number, nodes.size());
+ for (int i = 0; i < count; i++) {
+ simRemoveNode(nodes.get(i), withValues);
+ }
+ }
+
+ /**
+ * Clear the (simulated) .system collection.
+ */
+ public void simClearSystemCollection() {
+ systemColl.clear();
+ }
+
+ /**
+ * Get the content of (simulated) .system collection.
+ * @return documents in the collection, in chronological order starting from the oldest.
+ */
+ public List<SolrInputDocument> simGetSystemCollection() {
+ return systemColl;
+ }
+
+ /**
+ * Get a {@link SolrClient} implementation where calls are forwarded to this
+ * instance of the cluster.
+ * @return simulated SolrClient.
+ */
+ public SolrClient simGetSolrClient() {
+ return new SolrClient() {
+ @Override
+ public NamedList<Object> request(SolrRequest request, String collection) throws SolrServerException, IOException {
+ SolrResponse rsp = SimCloudManager.this.request(request);
+ return rsp.getResponse();
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+ };
+ }
+
+ /**
+ * Simulate the effect of restarting Overseer leader - in this case this means restarting the
+ * OverseerTriggerThread and optionally killing a node.
+ * @param killNodeId optional nodeId to kill. If null then don't kill any node, just restart the thread
+ */
+ public void simRestartOverseer(String killNodeId) throws Exception {
+ LOG.info("=== Restarting OverseerTriggerThread and clearing object cache...");
+ triggerThread.interrupt();
+ IOUtils.closeQuietly(triggerThread);
+ if (killNodeId != null) {
+ simRemoveNode(killNodeId, true);
+ }
+ objectCache.clear();
+ OverseerTriggerThread trigger = new OverseerTriggerThread(loader, this,
+ new CloudConfig.CloudConfigBuilder("nonexistent", 0, "sim").build());
+ triggerThread = new Overseer.OverseerThread(triggerThreadGroup, trigger, "Simulated OverseerAutoScalingTriggerThread");
+ triggerThread.start();
+
+ }
+
+ /**
+ * Submit a task to execute in a thread pool.
+ * @param callable task to execute
+ * @return future to obtain results
+ */
+ public <T> Future<T> submit(Callable<T> callable) {
+ return simCloudManagerPool.submit(callable);
+ }
+
+ // ---------- type-safe methods to obtain simulator components ----------
+ public SimClusterStateProvider getSimClusterStateProvider() {
+ return clusterStateProvider;
+ }
+
+ public SimNodeStateProvider getSimNodeStateProvider() {
+ return nodeStateProvider;
+ }
+
+ public SimDistribStateManager getSimDistribStateManager() {
+ return stateManager;
+ }
+
+ public LiveNodesSet getLiveNodesSet() {
+ return liveNodesSet;
+ }
+
+ /**
+ * Get the number and type of operations processed by this cluster.
+ */
+ public Map<String, AtomicLong> simGetOpCounts() {
+ return opCounts;
+ }
+
+ /**
+ * Get the number of processed operations of a specified type.
+ * @param op operation name, eg. MOVEREPLICA
+ * @return number of operations
+ */
+ public long simGetOpCount(String op) {
+ AtomicLong count = opCounts.get(op);
+ return count != null ? count.get() : 0L;
+ }
+
+ // --------- interface methods -----------
+
+
+ @Override
+ public ObjectCache getObjectCache() {
+ return objectCache;
+ }
+
+ @Override
+ public TimeSource getTimeSource() {
+ return timeSource;
+ }
+
+ @Override
+ public ClusterStateProvider getClusterStateProvider() {
+ return clusterStateProvider;
+ }
+
+ @Override
+ public NodeStateProvider getNodeStateProvider() {
+ return nodeStateProvider;
+ }
+
+ @Override
+ public DistribStateManager getDistribStateManager() {
+ return stateManager;
+ }
+
+ @Override
+ public DistributedQueueFactory getDistributedQueueFactory() {
+ return queueFactory;
+ }
+
+ @Override
+ public SolrResponse request(SolrRequest req) throws IOException {
+ try {
+ Future<SolrResponse> rsp = submit(() -> simHandleSolrRequest(req));
+ return rsp.get();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ private void incrementCount(String op) {
+ AtomicLong count = opCounts.computeIfAbsent(op, o -> new AtomicLong());
+ count.incrementAndGet();
+ }
+
+ /**
+ * Handler method for autoscaling requests. NOTE: only a specific subset of autoscaling requests is
+ * supported!
+ * @param req autoscaling request
+ * @return results
+ */
+ public SolrResponse simHandleSolrRequest(SolrRequest req) throws IOException, InterruptedException {
+ // pay the penalty for remote request, at least 5 ms
+ timeSource.sleep(5);
+
+ LOG.trace("--- got SolrRequest: " + req.getMethod() + " " + req.getPath() +
+ (req.getParams() != null ? " " + req.getParams().toQueryString() : ""));
+ if (req.getPath() != null && req.getPath().startsWith("/admin/autoscaling") ||
+ req.getPath().startsWith("/cluster/autoscaling")) {
+ incrementCount("autoscaling");
+ ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
+ params.set(CommonParams.PATH, req.getPath());
+ LocalSolrQueryRequest queryRequest = new LocalSolrQueryRequest(null, params);
+ RequestWriter.ContentWriter cw = req.getContentWriter("application/json");
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ cw.write(baos);
+ String payload = baos.toString("UTF-8");
+ LOG.trace("-- payload: {}", payload);
+ queryRequest.setContentStreams(Collections.singletonList(new ContentStreamBase.StringStream(payload)));
+ queryRequest.getContext().put("httpMethod", req.getMethod().toString());
+ SolrQueryResponse queryResponse = new SolrQueryResponse();
+ autoScalingHandler.handleRequest(queryRequest, queryResponse);
+ if (queryResponse.getException() != null) {
+ throw new IOException(queryResponse.getException());
+ }
+ SolrResponse rsp = new SolrResponseBase();
+ rsp.setResponse(queryResponse.getValues());
+ LOG.trace("-- response: {}", rsp);
+ return rsp;
+ }
+ if (req instanceof UpdateRequest) {
+ incrementCount("update");
+ // support only updates to the system collection
+ UpdateRequest ureq = (UpdateRequest)req;
+ if (ureq.getCollection() == null || !ureq.getCollection().equals(CollectionAdminParams.SYSTEM_COLL)) {
+ throw new UnsupportedOperationException("Only .system updates are supported but got: " + req);
+ }
+ List<SolrInputDocument> docs = ureq.getDocuments();
+ if (docs != null) {
+ systemColl.addAll(docs);
+ }
+ return new UpdateResponse();
+ }
+ // support only a specific subset of collection admin ops
+ if (!(req instanceof CollectionAdminRequest)) {
+ throw new UnsupportedOperationException("Only some CollectionAdminRequest-s are supported: " + req.getClass().getName());
+ }
+ SolrParams params = req.getParams();
+ String a = params.get(CoreAdminParams.ACTION);
+ SolrResponse rsp = new SolrResponseBase();
+ rsp.setResponse(new NamedList<>());
+ if (a != null) {
+ CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(a);
+ if (action == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown action: " + a);
+ }
+ LOG.debug("Invoking Collection Action :{} with params {}", action.toLower(), req.getParams().toQueryString());
+ NamedList results = new NamedList();
+ rsp.setResponse(results);
+ incrementCount(action.name());
+ switch (action) {
+ case REQUESTSTATUS:
+ // we complete all async ops immediately
+ String requestId = req.getParams().get(REQUESTID);
+ SimpleOrderedMap<String> status = new SimpleOrderedMap<>();
+ status.add("state", RequestStatusState.COMPLETED.getKey());
+ status.add("msg", "found [" + requestId + "] in completed tasks");
+ results.add("status", status);
+ results.add("success", "");
+ // ExecutePlanAction expects a specific response class
+ rsp = new CollectionAdminRequest.RequestStatusResponse();
+ rsp.setResponse(results);
+ break;
+ case DELETESTATUS:
+ requestId = req.getParams().get(REQUESTID);
+ results.add("status", "successfully removed stored response for [" + requestId + "]");
+ results.add("success", "");
+ break;
+ case CREATE:
+ try {
+ clusterStateProvider.simCreateCollection(new ZkNodeProps(req.getParams().toNamedList().asMap(10)), results);
+ } catch (Exception e) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
+ }
+ break;
+ case DELETE:
+ clusterStateProvider.simDeleteCollection(req.getParams().get(CommonParams.NAME),
+ req.getParams().get(CommonAdminParams.ASYNC), results);
+ break;
+ case LIST:
+ results.add("collections", clusterStateProvider.simListCollections());
+ break;
+ case ADDREPLICA:
+ try {
+ clusterStateProvider.simAddReplica(new ZkNodeProps(req.getParams().toNamedList().asMap(10)), results);
+ } catch (Exception e) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
+ }
+ break;
+ case MOVEREPLICA:
+ try {
+ clusterStateProvider.simMoveReplica(new ZkNodeProps(req.getParams().toNamedList().asMap(10)), results);
+ } catch (Exception e) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
+ }
+ break;
+ case OVERSEERSTATUS:
+ if (req.getParams().get(CommonAdminParams.ASYNC) != null) {
+ results.add(REQUESTID, req.getParams().get(CommonAdminParams.ASYNC));
+ }
+ if (!liveNodesSet.get().isEmpty()) {
+ results.add("leader", liveNodesSet.get().iterator().next());
+ }
+ results.add("overseer_queue_size", 0);
+ results.add("overseer_work_queue_size", 0);
+ results.add("overseer_collection_queue_size", 0);
+ results.add("success", "");
+ break;
+ case ADDROLE:
+ nodeStateProvider.simAddNodeValue(req.getParams().get("node"), "nodeRole", req.getParams().get("role"));
+ break;
+ case CREATESHARD:
+ try {
+ clusterStateProvider.simCreateShard(new ZkNodeProps(req.getParams().toNamedList().asMap(10)), results);
+ } catch (Exception e) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
+ }
+ break;
+ case SPLITSHARD:
+ try {
+ clusterStateProvider.simSplitShard(new ZkNodeProps(req.getParams().toNamedList().asMap(10)), results);
+ } catch (Exception e) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported collection admin action=" + action + " in request: " + req.getParams());
+ }
+ } else {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "action is a required param in request: " + req.getParams());
+ }
+ return rsp;
+
+ }
+
+ /**
+ * HTTP requests are not supported by this implementation.
+ */
+ @Override
+ public byte[] httpRequest(String url, SolrRequest.METHOD method, Map<String, String> headers, String payload, int timeout, boolean followRedirects) throws IOException {
+ throw new UnsupportedOperationException("general HTTP requests are not supported yet");
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOUtils.closeQuietly(clusterStateProvider);
+ IOUtils.closeQuietly(nodeStateProvider);
+ IOUtils.closeQuietly(stateManager);
+ triggerThread.interrupt();
+ IOUtils.closeQuietly(triggerThread);
+ IOUtils.closeQuietly(objectCache);
+ simCloudManagerPool.shutdownNow();
+ }
+}