You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/12/14 21:31:15 UTC
[05/11] lucene-solr:branch_7x: SOLR-11285: Simulation framework for
autoscaling.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
new file mode 100644
index 0000000..1986bac
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
@@ -0,0 +1,1275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
+import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
+import org.apache.solr.client.solrj.cloud.autoscaling.Suggestion;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
+import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
+import org.apache.solr.client.solrj.impl.ClusterStateProvider;
+import org.apache.solr.cloud.ActionThrottle;
+import org.apache.solr.cloud.AddReplicaCmd;
+import org.apache.solr.cloud.Assign;
+import org.apache.solr.cloud.CreateCollectionCmd;
+import org.apache.solr.cloud.CreateShardCmd;
+import org.apache.solr.cloud.SplitShardCmd;
+import org.apache.solr.cloud.overseer.ClusterStateMutator;
+import org.apache.solr.cloud.overseer.CollectionMutator;
+import org.apache.solr.cloud.overseer.ZkWriteCommand;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ReplicaPosition;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.rule.ImplicitSnitch;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.CommonAdminParams;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
+/**
+ * Simulated {@link ClusterStateProvider}.
+ * <p>The following behaviors are supported:</p>
+ * <ul>
+ * <li>using autoscaling policy for replica placements</li>
+ * <li>maintaining and up-to-date list of /live_nodes and nodeAdded / nodeLost markers</li>
+ * <li>running a simulated leader election on collection changes (with throttling), when needed</li>
+ * <li>maintaining an up-to-date /clusterstate.json (single file format), which also tracks replica states,
+ * leader election changes, replica property changes, etc. Note: this file is only written,
+ * but never read by the framework!</li>
+ * <li>maintaining an up-to-date /clusterprops.json. Note: this file is only written, but never read by the
+ * framework!</li>
+ * </ul>
+ */
+public class SimClusterStateProvider implements ClusterStateProvider {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final Map<String, List<ReplicaInfo>> nodeReplicaMap = new ConcurrentHashMap<>();
+ private final LiveNodesSet liveNodes;
+ private final SimDistribStateManager stateManager;
+ private final SimCloudManager cloudManager;
+
+ private final Map<String, Object> clusterProperties = new ConcurrentHashMap<>();
+ private final Map<String, Map<String, Object>> collProperties = new ConcurrentHashMap<>();
+ private final Map<String, Map<String, Map<String, Object>>> sliceProperties = new ConcurrentHashMap<>();
+
+ private final ReentrantLock lock = new ReentrantLock();
+
+ private final ActionThrottle leaderThrottle;
+
+ // default map of: operation -> delay
+ private final Map<String, Long> defaultOpDelays = new HashMap<>();
+ // per-collection map of: collection -> op -> delay
+ private final Map<String, Map<String, Long>> opDelays = new ConcurrentHashMap<>();
+
+
+ private volatile int clusterStateVersion = -1;
+ private Map<String, Object> lastSavedProperties = null;
+
+ private AtomicReference<Map<String, DocCollection>> collectionsStatesRef = new AtomicReference<>();
+ private AtomicBoolean saveClusterState = new AtomicBoolean();
+
+ /**
+ * The instance needs to be initialized using the <code>sim*</code> methods in order
+ * to ensure proper behavior, otherwise it will behave as a cluster with zero replicas.
+ */
+ public SimClusterStateProvider(LiveNodesSet liveNodes, SimCloudManager cloudManager) throws Exception {
+ this.liveNodes = liveNodes;
+ for (String nodeId : liveNodes.get()) {
+ createEphemeralLiveNode(nodeId);
+ }
+ this.cloudManager = cloudManager;
+ this.stateManager = cloudManager.getSimDistribStateManager();
+ this.leaderThrottle = new ActionThrottle("leader", 5000, cloudManager.getTimeSource());
+ // names are CollectionAction operation names, delays are in ms (simulated time)
+ defaultOpDelays.put(CollectionParams.CollectionAction.MOVEREPLICA.name(), 5000L);
+ defaultOpDelays.put(CollectionParams.CollectionAction.DELETEREPLICA.name(), 5000L);
+ defaultOpDelays.put(CollectionParams.CollectionAction.ADDREPLICA.name(), 500L);
+ defaultOpDelays.put(CollectionParams.CollectionAction.SPLITSHARD.name(), 5000L);
+ defaultOpDelays.put(CollectionParams.CollectionAction.CREATESHARD.name(), 5000L);
+ defaultOpDelays.put(CollectionParams.CollectionAction.DELETESHARD.name(), 5000L);
+ defaultOpDelays.put(CollectionParams.CollectionAction.CREATE.name(), 500L);
+ defaultOpDelays.put(CollectionParams.CollectionAction.DELETE.name(), 5000L);
+ }
+
+ // ============== SIMULATOR SETUP METHODS ====================
+
+ /**
+ * Initialize from an existing cluster state
+ * @param initialState initial cluster state
+ */
+ public void simSetClusterState(ClusterState initialState) throws Exception {
+ lock.lock();
+ try {
+ collProperties.clear();
+ sliceProperties.clear();
+ nodeReplicaMap.clear();
+ liveNodes.clear();
+ for (String nodeId : stateManager.listData(ZkStateReader.LIVE_NODES_ZKNODE)) {
+ if (stateManager.hasData(ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeId)) {
+ stateManager.removeData(ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeId, -1);
+ }
+ if (stateManager.hasData(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeId)) {
+ stateManager.removeData(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeId, -1);
+ }
+ }
+ liveNodes.addAll(initialState.getLiveNodes());
+ for (String nodeId : liveNodes.get()) {
+ createEphemeralLiveNode(nodeId);
+ }
+ initialState.forEachCollection(dc -> {
+ collProperties.computeIfAbsent(dc.getName(), name -> new ConcurrentHashMap<>()).putAll(dc.getProperties());
+ opDelays.computeIfAbsent(dc.getName(), c -> new HashMap<>()).putAll(defaultOpDelays);
+ dc.getSlices().forEach(s -> {
+ sliceProperties.computeIfAbsent(dc.getName(), name -> new ConcurrentHashMap<>())
+ .computeIfAbsent(s.getName(), name -> new HashMap<>()).putAll(s.getProperties());
+ s.getReplicas().forEach(r -> {
+ ReplicaInfo ri = new ReplicaInfo(r.getName(), r.getCoreName(), dc.getName(), s.getName(), r.getType(), r.getNodeName(), r.getProperties());
+ if (liveNodes.get().contains(r.getNodeName())) {
+ nodeReplicaMap.computeIfAbsent(r.getNodeName(), rn -> new ArrayList<>()).add(ri);
+ }
+ });
+ });
+ });
+ collectionsStatesRef.set(null);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Reset the leader election throttle.
+ */
+ public void simResetLeaderThrottle() {
+ leaderThrottle.reset();
+ }
+
+ /**
+ * Get random node id.
+ * @param random instance of random.
+ * @return one of the live nodes
+ */
+ public String simGetRandomNode(Random random) {
+ if (liveNodes.isEmpty()) {
+ return null;
+ }
+ List<String> nodes = new ArrayList<>(liveNodes.get());
+ return nodes.get(random.nextInt(nodes.size()));
+ }
+
+ // todo: maybe hook up DistribStateManager /clusterstate.json watchers?
+
+ /**
+ * Add a new node to the cluster.
+ * @param nodeId unique node id
+ */
+ public void simAddNode(String nodeId) throws Exception {
+ if (liveNodes.contains(nodeId)) {
+ throw new Exception("Node " + nodeId + " already exists");
+ }
+ liveNodes.add(nodeId);
+ createEphemeralLiveNode(nodeId);
+ nodeReplicaMap.putIfAbsent(nodeId, new ArrayList<>());
+ }
+
+ // utility class to run leader election in a separate thread and with throttling
+ // Note: leader election is a no-op if a shard leader already exists for each shard
+ private class LeaderElection implements Callable<Boolean> {
+ Collection<String> collections;
+ boolean saveClusterState;
+
+ LeaderElection(Collection<String> collections, boolean saveClusterState) {
+ this.collections = collections;
+ this.saveClusterState = saveClusterState;
+ }
+
+ @Override
+ public Boolean call() {
+ leaderThrottle.minimumWaitBetweenActions();
+ leaderThrottle.markAttemptingAction();
+ try {
+ simRunLeaderElection(collections, saveClusterState);
+ } catch (Exception e) {
+ return false;
+ }
+ return true;
+ }
+ }
+
+ /**
+ * Remove node from a cluster. This is equivalent to a situation when a node is lost.
+ * All replicas that were assigned to this node are marked as DOWN.
+ * @param nodeId node id
+ * @return true if a node existed and was removed
+ */
+ public boolean simRemoveNode(String nodeId) throws Exception {
+ lock.lock();
+ try {
+ Set<String> collections = new HashSet<>();
+ // mark every replica on that node as down
+ setReplicaStates(nodeId, Replica.State.DOWN, collections);
+ boolean res = liveNodes.remove(nodeId);
+ if (!collections.isEmpty()) {
+ collectionsStatesRef.set(null);
+ }
+ // remove ephemeral nodes
+ stateManager.getRoot().removeEphemeralChildren(nodeId);
+ // create a nodeLost marker if needed
+ AutoScalingConfig cfg = stateManager.getAutoScalingConfig(null);
+ if (cfg.hasTriggerForEvents(TriggerEventType.NODELOST)) {
+ stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + nodeId);
+ }
+ if (!collections.isEmpty()) {
+ cloudManager.submit(new LeaderElection(collections, true));
+ }
+ return res;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ // this method needs to be called under a lock
+ private void setReplicaStates(String nodeId, Replica.State state, Set<String> changedCollections) {
+ List<ReplicaInfo> replicas = nodeReplicaMap.get(nodeId);
+ if (replicas != null) {
+ replicas.forEach(r -> {
+ r.getVariables().put(ZkStateReader.STATE_PROP, state.toString());
+ changedCollections.add(r.getCollection());
+ });
+ }
+ }
+
+ // this method needs to be called under a lock
+ private void createEphemeralLiveNode(String nodeId) throws Exception {
+ DistribStateManager mgr = stateManager.withEphemeralId(nodeId);
+ mgr.makePath(ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeId, null, CreateMode.EPHEMERAL, true);
+ AutoScalingConfig cfg = stateManager.getAutoScalingConfig(null);
+ if (cfg.hasTriggerForEvents(TriggerEventType.NODEADDED)) {
+ mgr.makePath(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeId, null, CreateMode.EPHEMERAL, true);
+ }
+ }
+
+ /**
+ * Restore a previously removed node. This also simulates a short replica recovery state.
+ * @param nodeId node id to restore
+ * @return true when this operation restored any replicas, false otherwise (empty node).
+ */
+ public boolean simRestoreNode(String nodeId) throws Exception {
+ liveNodes.add(nodeId);
+ createEphemeralLiveNode(nodeId);
+ Set<String> collections = new HashSet<>();
+ lock.lock();
+ try {
+ setReplicaStates(nodeId, Replica.State.RECOVERING, collections);
+ } finally {
+ lock.unlock();
+ }
+ cloudManager.getTimeSource().sleep(1000);
+ lock.lock();
+ try {
+ setReplicaStates(nodeId, Replica.State.ACTIVE, collections);
+ } finally {
+ lock.unlock();
+ }
+ if (!collections.isEmpty()) {
+ collectionsStatesRef.set(null);
+ cloudManager.submit(new LeaderElection(collections, true));
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Add a new replica. Note that if any details of a replica (node, coreNodeName, SolrCore name, etc)
+ * are missing they will be filled in using the policy framework.
+ * @param message replica details
+ * @param results result of the operation
+ */
+ public void simAddReplica(ZkNodeProps message, NamedList results) throws Exception {
+ ClusterState clusterState = getClusterState();
+ DocCollection coll = clusterState.getCollection(message.getStr(ZkStateReader.COLLECTION_PROP));
+ AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
+ message = AddReplicaCmd.assignReplicaDetails(cloudManager, clusterState, message, sessionWrapper);
+ if (sessionWrapper.get() != null) {
+ sessionWrapper.get().release();
+ }
+ if (message.getStr(CoreAdminParams.CORE_NODE_NAME) == null) {
+ message = message.plus(CoreAdminParams.CORE_NODE_NAME, Assign.assignCoreNodeName(stateManager, coll));
+ }
+ ReplicaInfo ri = new ReplicaInfo(
+ message.getStr(CoreAdminParams.CORE_NODE_NAME),
+ message.getStr(CoreAdminParams.NAME),
+ message.getStr(ZkStateReader.COLLECTION_PROP),
+ message.getStr(ZkStateReader.SHARD_ID_PROP),
+ Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT)),
+ message.getStr(CoreAdminParams.NODE),
+ message.getProperties()
+ );
+ simAddReplica(message.getStr(CoreAdminParams.NODE), ri, true);
+ results.add("success", "");
+ }
+
+ /**
+ * Add a replica. Note that all details of the replica must be present here, including
+ * node, coreNodeName and SolrCore name.
+ * @param nodeId node id where the replica will be added
+ * @param replicaInfo replica info
+ * @param runLeaderElection if true then run a leader election after adding the replica.
+ */
+ public void simAddReplica(String nodeId, ReplicaInfo replicaInfo, boolean runLeaderElection) throws Exception {
+ // make sure coreNodeName is unique across cluster
+ for (Map.Entry<String, List<ReplicaInfo>> e : nodeReplicaMap.entrySet()) {
+ for (ReplicaInfo ri : e.getValue()) {
+ if (ri.getCore().equals(replicaInfo.getCore())) {
+ throw new Exception("Duplicate SolrCore name for existing=" + ri + " on node " + e.getKey() + " and new=" + replicaInfo);
+ }
+ if (ri.getName().equals(replicaInfo.getName())) {
+ throw new Exception("Duplicate coreNode name for existing=" + ri + " on node " + e.getKey() + " and new=" + replicaInfo);
+ }
+ }
+ }
+ if (!liveNodes.contains(nodeId)) {
+ throw new Exception("Target node " + nodeId + " is not live: " + liveNodes);
+ }
+ // verify info
+ if (replicaInfo.getCore() == null) {
+ throw new Exception("Missing core: " + replicaInfo);
+ }
+ // XXX replica info is not supposed to have this as a variable
+ replicaInfo.getVariables().remove(ZkStateReader.SHARD_ID_PROP);
+ if (replicaInfo.getName() == null) {
+ throw new Exception("Missing name: " + replicaInfo);
+ }
+ if (replicaInfo.getNode() == null) {
+ throw new Exception("Missing node: " + replicaInfo);
+ }
+ if (!replicaInfo.getNode().equals(nodeId)) {
+ throw new Exception("Wrong node (not " + nodeId + "): " + replicaInfo);
+ }
+
+ lock.lock();
+ try {
+
+ opDelay(replicaInfo.getCollection(), CollectionParams.CollectionAction.ADDREPLICA.name());
+
+ List<ReplicaInfo> replicas = nodeReplicaMap.computeIfAbsent(nodeId, n -> new ArrayList<>());
+ // mark replica as active
+ replicaInfo.getVariables().put(ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
+ // add a property expected in tests
+ replicaInfo.getVariables().put(Suggestion.coreidxsize, 123450000);
+
+ replicas.add(replicaInfo);
+ // at this point nuke our cached DocCollection state
+ collectionsStatesRef.set(null);
+ LOG.trace("-- simAddReplica {}", replicaInfo);
+
+ Map<String, Object> values = cloudManager.getSimNodeStateProvider().simGetAllNodeValues()
+ .computeIfAbsent(nodeId, id -> new ConcurrentHashMap<>(SimCloudManager.createNodeValues(id)));
+ // update the number of cores and freedisk in node values
+ Integer cores = (Integer)values.get(ImplicitSnitch.CORES);
+ if (cores == null) {
+ cores = 0;
+ }
+ cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.CORES, cores + 1);
+ Integer disk = (Integer)values.get(ImplicitSnitch.DISK);
+ if (disk == null) {
+ disk = 1000;
+ }
+ cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.DISK, disk - 10);
+ if (runLeaderElection) {
+ cloudManager.submit(new LeaderElection(Collections.singleton(replicaInfo.getCollection()), true));
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Remove replica.
+ * @param nodeId node id
+ * @param coreNodeName coreNodeName
+ */
+ public void simRemoveReplica(String nodeId, String coreNodeName) throws Exception {
+ List<ReplicaInfo> replicas = nodeReplicaMap.computeIfAbsent(nodeId, n -> new ArrayList<>());
+ lock.lock();
+ try {
+ for (int i = 0; i < replicas.size(); i++) {
+ if (coreNodeName.equals(replicas.get(i).getName())) {
+ ReplicaInfo ri = replicas.remove(i);
+ collectionsStatesRef.set(null);
+
+ opDelay(ri.getCollection(), CollectionParams.CollectionAction.DELETEREPLICA.name());
+
+ // update the number of cores in node values, if node is live
+ if (liveNodes.contains(nodeId)) {
+ Integer cores = (Integer)cloudManager.getSimNodeStateProvider().simGetNodeValue(nodeId, ImplicitSnitch.CORES);
+ if (cores == null || cores == 0) {
+ throw new Exception("Unexpected value of 'cores' (" + cores + ") on node: " + nodeId);
+ }
+ cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.CORES, cores - 1);
+ Integer disk = (Integer)cloudManager.getSimNodeStateProvider().simGetNodeValue(nodeId, ImplicitSnitch.DISK);
+ if (disk == null || disk == 0) {
+ throw new Exception("Unexpected value of 'freedisk' (" + disk + ") on node: " + nodeId);
+ }
+ cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.DISK, disk + 10);
+ }
+ LOG.trace("-- simRemoveReplica {}", ri);
+ cloudManager.submit(new LeaderElection(Collections.singleton(ri.getCollection()), true));
+ return;
+ }
+ }
+ throw new Exception("Replica " + coreNodeName + " not found on node " + nodeId);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Save clusterstate.json to {@link DistribStateManager}.
+ * @return saved state
+ */
+ private ClusterState saveClusterState(ClusterState state) throws IOException {
+ byte[] data = Utils.toJSON(state);
+ try {
+ VersionedData oldData = stateManager.getData(ZkStateReader.CLUSTER_STATE);
+ int version = oldData != null ? oldData.getVersion() : -1;
+ stateManager.setData(ZkStateReader.CLUSTER_STATE, data, version);
+ LOG.trace("-- saved cluster state version=" + clusterStateVersion +
+ ", zkVersion=" + (version + 1) + ", {}", state);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ return state;
+ }
+
+ /**
+ * Delay an operation by a configured amount.
+ * @param collection collection name
+ * @param op operation name.
+ */
+ private void opDelay(String collection, String op) throws InterruptedException {
+ Map<String, Long> delays = opDelays.get(collection);
+ if (delays == null || delays.isEmpty() || !delays.containsKey(op)) {
+ return;
+ }
+ cloudManager.getTimeSource().sleep(delays.get(op));
+ }
+
+ /**
+ * Simulate running a shard leader election. This operation is a no-op if a leader already exists.
+ * If a new leader is elected the cluster state is saved.
+ * @param collections list of affected collections
+ * @param saveClusterState if true then save cluster state regardless of changes.
+ */
+ private synchronized void simRunLeaderElection(Collection<String> collections, boolean saveClusterState) throws Exception {
+ ClusterState state = getClusterState();
+ AtomicBoolean stateChanged = new AtomicBoolean(Boolean.FALSE);
+
+ state.forEachCollection(dc -> {
+ if (!collections.contains(dc.getName())) {
+ return;
+ }
+ dc.getSlices().forEach(s -> {
+ Replica leader = s.getLeader();
+ if (leader == null || !liveNodes.contains(leader.getNodeName())) {
+ LOG.trace("Running leader election for " + dc.getName() + " / " + s.getName());
+ if (s.getReplicas().isEmpty()) { // no replicas - punt
+ return;
+ }
+ // mark all replicas as non-leader (probably not necessary) and collect all active and live
+ List<ReplicaInfo> active = new ArrayList<>();
+ s.getReplicas().forEach(r -> {
+ AtomicReference<ReplicaInfo> riRef = new AtomicReference<>();
+ // find our ReplicaInfo for this replica
+ nodeReplicaMap.get(r.getNodeName()).forEach(info -> {
+ if (info.getName().equals(r.getName())) {
+ riRef.set(info);
+ }
+ });
+ ReplicaInfo ri = riRef.get();
+ if (ri == null) {
+ throw new IllegalStateException("-- could not find ReplicaInfo for replica " + r);
+ }
+ synchronized (ri) {
+ if (ri.getVariables().remove(ZkStateReader.LEADER_PROP) != null) {
+ stateChanged.set(true);
+ }
+ if (r.isActive(liveNodes.get())) {
+ active.add(ri);
+ } else { // if it's on a node that is not live mark it down
+ if (!liveNodes.contains(r.getNodeName())) {
+ ri.getVariables().put(ZkStateReader.STATE_PROP, Replica.State.DOWN.toString());
+ }
+ }
+ }
+ });
+ if (active.isEmpty()) {
+ LOG.warn("-- can't find any active replicas for " + dc.getName() + " / " + s.getName());
+ return;
+ }
+ // pick first active one
+ ReplicaInfo ri = null;
+ for (ReplicaInfo a : active) {
+ if (!a.getType().equals(Replica.Type.PULL)) {
+ ri = a;
+ break;
+ }
+ }
+ if (ri == null) {
+ LOG.warn("-- can't find any suitable replica type for " + dc.getName() + " / " + s.getName());
+ return;
+ }
+ synchronized (ri) {
+ ri.getVariables().put(ZkStateReader.LEADER_PROP, "true");
+ }
+ stateChanged.set(true);
+ LOG.debug("-- elected new leader for " + dc.getName() + " / " + s.getName() + ": " + ri);
+ } else {
+ LOG.trace("-- already has leader for {} / {}", dc.getName(), s.getName());
+ }
+ });
+ });
+ if (saveClusterState || stateChanged.get()) {
+ collectionsStatesRef.set(null);
+ }
+ }
+
+ /**
+ * Create a new collection. This operation uses policy framework for node and replica assignments.
+ * @param props collection details
+ * @param results results of the operation.
+ */
+ public void simCreateCollection(ZkNodeProps props, NamedList results) throws Exception {
+ if (props.getStr(CommonAdminParams.ASYNC) != null) {
+ results.add(CoreAdminParams.REQUESTID, props.getStr(CommonAdminParams.ASYNC));
+ }
+ boolean waitForFinalState = props.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
+ List<String> nodeList = new ArrayList<>();
+ List<String> shardNames = new ArrayList<>();
+ final String collectionName = props.getStr(NAME);
+ ClusterState clusterState = getClusterState();
+ ZkWriteCommand cmd = new ClusterStateMutator(cloudManager).createCollection(clusterState, props);
+ if (cmd.noop) {
+ LOG.warn("Collection {} already exists. exit", collectionName);
+ results.add("success", "no-op");
+ return;
+ }
+ opDelays.computeIfAbsent(collectionName, c -> new HashMap<>()).putAll(defaultOpDelays);
+
+ opDelay(collectionName, CollectionParams.CollectionAction.CREATE.name());
+
+ AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
+ List<ReplicaPosition> replicaPositions = CreateCollectionCmd.buildReplicaPositions(cloudManager, getClusterState(), props,
+ nodeList, shardNames, sessionWrapper);
+ if (sessionWrapper.get() != null) {
+ sessionWrapper.get().release();
+ }
+ final CountDownLatch finalStateLatch = new CountDownLatch(replicaPositions.size());
+ AtomicInteger replicaNum = new AtomicInteger(1);
+ replicaPositions.forEach(pos -> {
+ Map<String, Object> replicaProps = new HashMap<>();
+ replicaProps.put(ZkStateReader.NODE_NAME_PROP, pos.node);
+ replicaProps.put(ZkStateReader.REPLICA_TYPE, pos.type.toString());
+ String coreName = String.format(Locale.ROOT, "%s_%s_replica_%s%s", collectionName, pos.shard, pos.type.name().substring(0,1).toLowerCase(Locale.ROOT),
+ replicaNum.getAndIncrement());
+ try {
+ replicaProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
+ ReplicaInfo ri = new ReplicaInfo("core_node" + Assign.incAndGetId(stateManager, collectionName, 0),
+ coreName, collectionName, pos.shard, pos.type, pos.node, replicaProps);
+ cloudManager.submit(() -> {
+ simAddReplica(pos.node, ri, false);
+ finalStateLatch.countDown();
+ return true;
+ });
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ // add collection props
+ DocCollection coll = cmd.collection;
+ collProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>()).putAll(coll.getProperties());
+ // add slice props
+ coll.getSlices().forEach(s -> {
+ Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(coll.getName(), c -> new ConcurrentHashMap<>())
+ .computeIfAbsent(s.getName(), slice -> new ConcurrentHashMap<>());
+ s.getProperties().forEach((k, v) -> {
+ if (k != null && v != null) {
+ sliceProps.put(k, v);
+ }
+ });
+ });
+ cloudManager.submit(new LeaderElection(Collections.singleton(collectionName), true));
+ if (waitForFinalState) {
+ boolean finished = finalStateLatch.await(cloudManager.getTimeSource().convertDelay(TimeUnit.SECONDS, 60, TimeUnit.MILLISECONDS),
+ TimeUnit.MILLISECONDS);
+ if (!finished) {
+ results.add("failure", "Timeout waiting for all replicas to become active.");
+ return;
+ }
+ }
+ results.add("success", "");
+ }
+
+ /**
+ * Delete a collection
+ * @param collection collection name
+ * @param async async id
+ * @param results results of the operation
+ */
+ public void simDeleteCollection(String collection, String async, NamedList results) throws IOException {
+ if (async != null) {
+ results.add(CoreAdminParams.REQUESTID, async);
+ }
+ lock.lock();
+ try {
+ collProperties.remove(collection);
+ sliceProperties.remove(collection);
+
+ opDelay(collection, CollectionParams.CollectionAction.DELETE.name());
+
+ opDelays.remove(collection);
+ nodeReplicaMap.forEach((n, replicas) -> {
+ for (Iterator<ReplicaInfo> it = replicas.iterator(); it.hasNext(); ) {
+ ReplicaInfo ri = it.next();
+ if (ri.getCollection().equals(collection)) {
+ it.remove();
+ // update the number of cores in node values
+ Integer cores = (Integer) cloudManager.getSimNodeStateProvider().simGetNodeValue(n, "cores");
+ if (cores != null) { // node is still up
+ if (cores == 0) {
+ throw new RuntimeException("Unexpected value of 'cores' (" + cores + ") on node: " + n);
+ }
+ cloudManager.getSimNodeStateProvider().simSetNodeValue(n, "cores", cores - 1);
+ }
+ }
+ }
+ });
+ collectionsStatesRef.set(null);
+ results.add("success", "");
+ } catch (Exception e) {
+ LOG.warn("Exception", e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Remove all collections.
+ */
+ public void simDeleteAllCollections() throws Exception {
+ lock.lock();
+ try {
+ nodeReplicaMap.clear();
+ collProperties.clear();
+ sliceProperties.clear();
+ cloudManager.getSimNodeStateProvider().simGetAllNodeValues().forEach((n, values) -> {
+ values.put("cores", 0);
+ });
+ collectionsStatesRef.set(null);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Move replica. This uses a similar algorithm as {@link org.apache.solr.cloud.MoveReplicaCmd#moveNormalReplica(ClusterState, NamedList, String, String, DocCollection, Replica, Slice, int, boolean)}.
+ * @param message operation details
+ * @param results operation results.
+ */
+ public void simMoveReplica(ZkNodeProps message, NamedList results) throws Exception {
+ if (message.getStr(CommonAdminParams.ASYNC) != null) {
+ results.add(CoreAdminParams.REQUESTID, message.getStr(CommonAdminParams.ASYNC));
+ }
+ String collection = message.getStr(COLLECTION_PROP);
+ String targetNode = message.getStr("targetNode");
+ ClusterState clusterState = getClusterState();
+ DocCollection coll = clusterState.getCollection(collection);
+ if (coll == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection: " + collection + " does not exist");
+ }
+ String replicaName = message.getStr(REPLICA_PROP);
+ Replica replica = coll.getReplica(replicaName);
+ if (replica == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Collection: " + collection + " replica: " + replicaName + " does not exist");
+ }
+ Slice slice = null;
+ for (Slice s : coll.getSlices()) {
+ if (s.getReplicas().contains(replica)) {
+ slice = s;
+ }
+ }
+ if (slice == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Replica has no 'slice' property! : " + replica);
+ }
+
+ opDelay(collection, CollectionParams.CollectionAction.MOVEREPLICA.name());
+
+ // TODO: for now simulate moveNormalReplica sequence, where we first add new replica and then delete the old one
+
+ String newSolrCoreName = Assign.buildSolrCoreName(stateManager, coll, slice.getName(), replica.getType());
+ String coreNodeName = Assign.assignCoreNodeName(stateManager, coll);
+ ReplicaInfo newReplica = new ReplicaInfo(coreNodeName, newSolrCoreName, collection, slice.getName(), replica.getType(), targetNode, null);
+ LOG.debug("-- new replica: " + newReplica);
+ // xxx should run leader election here already?
+ simAddReplica(targetNode, newReplica, false);
+ // this will trigger leader election
+ simRemoveReplica(replica.getNodeName(), replica.getName());
+ results.add("success", "");
+ }
+
+ /**
+ * Create a new shard. This uses a similar algorithm as {@link CreateShardCmd}.
+ * @param message operation details
+ * @param results operation results
+ */
+ public void simCreateShard(ZkNodeProps message, NamedList results) throws Exception {
+ if (message.getStr(CommonAdminParams.ASYNC) != null) {
+ results.add(CoreAdminParams.REQUESTID, message.getStr(CommonAdminParams.ASYNC));
+ }
+ String collectionName = message.getStr(COLLECTION_PROP);
+ String sliceName = message.getStr(SHARD_ID_PROP);
+ ClusterState clusterState = getClusterState();
+ lock.lock();
+ try {
+ ZkWriteCommand cmd = new CollectionMutator(cloudManager).createShard(clusterState, message);
+ if (cmd.noop) {
+ results.add("success", "no-op");
+ return;
+ }
+
+ opDelay(collectionName, CollectionParams.CollectionAction.CREATESHARD.name());
+
+ // copy shard properties -- our equivalent of creating an empty shard in cluster state
+ DocCollection collection = cmd.collection;
+ Slice slice = collection.getSlice(sliceName);
+ Map<String, Object> props = sliceProperties.computeIfAbsent(collection.getName(), c -> new ConcurrentHashMap<>())
+ .computeIfAbsent(sliceName, s -> new ConcurrentHashMap<>());
+ props.clear();
+ slice.getProperties().entrySet().stream()
+ .filter(e -> !e.getKey().equals("range"))
+ .filter(e -> !e.getKey().equals("replicas"))
+ .forEach(e -> props.put(e.getKey(), e.getValue()));
+ // 2. create new replicas
+ AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
+ List<ReplicaPosition> positions = CreateShardCmd.buildReplicaPositions(cloudManager, clusterState, collectionName,
+ message, sessionWrapper);
+ if (sessionWrapper.get() != null) {
+ sessionWrapper.get().release();
+ }
+ AtomicInteger replicaNum = new AtomicInteger(1);
+ positions.forEach(pos -> {
+ Map<String, Object> replicaProps = new HashMap<>();
+ replicaProps.put(ZkStateReader.SHARD_ID_PROP, pos.shard);
+ replicaProps.put(ZkStateReader.NODE_NAME_PROP, pos.node);
+ replicaProps.put(ZkStateReader.REPLICA_TYPE, pos.type.toString());
+ replicaProps.put(ZkStateReader.BASE_URL_PROP, Utils.getBaseUrlForNodeName(pos.node, "http"));
+ String coreName = String.format(Locale.ROOT, "%s_%s_replica_%s%s", collectionName, pos.shard, pos.type.name().substring(0,1).toLowerCase(Locale.ROOT),
+ replicaNum.getAndIncrement());
+ try {
+ replicaProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
+ ReplicaInfo ri = new ReplicaInfo("core_node" + Assign.incAndGetId(stateManager, collectionName, 0),
+ coreName, collectionName, pos.shard, pos.type, pos.node, replicaProps);
+ simAddReplica(pos.node, ri, false);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ Map<String, Object> colProps = collProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>());
+
+ cloudManager.submit(new LeaderElection(Collections.singleton(collectionName), true));
+ results.add("success", "");
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Split a shard. This uses a similar algorithm as {@link SplitShardCmd}, including simulating its
+ * quirks, and leaving the original parent slice in place.
+ * @param message operation details
+ * @param results operation results.
+ */
+ public void simSplitShard(ZkNodeProps message, NamedList results) throws Exception {
+ String collectionName = message.getStr(COLLECTION_PROP);
+ AtomicReference<String> sliceName = new AtomicReference<>();
+ sliceName.set(message.getStr(SHARD_ID_PROP));
+ String splitKey = message.getStr("split.key");
+ ClusterState clusterState = getClusterState();
+ DocCollection collection = clusterState.getCollection(collectionName);
+ Slice parentSlice = SplitShardCmd.getParentSlice(clusterState, collectionName, sliceName, splitKey);
+ List<DocRouter.Range> subRanges = new ArrayList<>();
+ List<String> subSlices = new ArrayList<>();
+ List<String> subShardNames = new ArrayList<>();
+
+ opDelay(collectionName, CollectionParams.CollectionAction.SPLITSHARD.name());
+
+ SplitShardCmd.fillRanges(cloudManager, message, collection, parentSlice, subRanges, subSlices, subShardNames);
+ // mark the old slice as inactive
+ sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
+ .computeIfAbsent(sliceName.get(), s -> new ConcurrentHashMap<>())
+ .put(ZkStateReader.SHARD_STATE_PROP, Slice.State.INACTIVE.toString());
+ // add slice props
+ for (int i = 0; i < subRanges.size(); i++) {
+ String subSlice = subSlices.get(i);
+ DocRouter.Range range = subRanges.get(i);
+ Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
+ .computeIfAbsent(subSlice, ss -> new ConcurrentHashMap<>());
+ sliceProps.put(Slice.RANGE, range);
+ sliceProps.put(Slice.PARENT, sliceName.get());
+ sliceProps.put(ZkStateReader.SHARD_STATE_PROP, Slice.State.ACTIVE.toString());
+ }
+ // add replicas for new subShards
+ int repFactor = parentSlice.getReplicas().size();
+ List<ReplicaPosition> replicaPositions = Assign.identifyNodes(cloudManager,
+ clusterState,
+ new ArrayList<>(clusterState.getLiveNodes()),
+ collectionName,
+ new ZkNodeProps(collection.getProperties()),
+ // reproduce the bug
+ subSlices, repFactor, 0, 0);
+ PolicyHelper.SessionWrapper sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
+ if (sessionWrapper != null) sessionWrapper.release();
+
+ for (ReplicaPosition replicaPosition : replicaPositions) {
+ String subSliceName = replicaPosition.shard;
+ String subShardNodeName = replicaPosition.node;
+ String solrCoreName = collectionName + "_" + subSliceName + "_replica" + (replicaPosition.index);
+ Map<String, Object> replicaProps = new HashMap<>();
+ replicaProps.put(ZkStateReader.SHARD_ID_PROP, replicaPosition.shard);
+ replicaProps.put(ZkStateReader.NODE_NAME_PROP, replicaPosition.node);
+ replicaProps.put(ZkStateReader.REPLICA_TYPE, replicaPosition.type.toString());
+ replicaProps.put(ZkStateReader.BASE_URL_PROP, Utils.getBaseUrlForNodeName(subShardNodeName, "http"));
+
+ ReplicaInfo ri = new ReplicaInfo("core_node" + Assign.incAndGetId(stateManager, collectionName, 0),
+ solrCoreName, collectionName, replicaPosition.shard, replicaPosition.type, subShardNodeName, replicaProps);
+ simAddReplica(replicaPosition.node, ri, false);
+ }
+ cloudManager.submit(new LeaderElection(Collections.singleton(collectionName), true));
+ results.add("success", "");
+
+ }
+
+ /**
+ * Delete a shard. This uses a similar algorithm as {@link org.apache.solr.cloud.DeleteShardCmd}
+ * @param message operation details
+ * @param results operation results
+ */
+ public void simDeleteShard(ZkNodeProps message, NamedList results) throws Exception {
+ String collectionName = message.getStr(COLLECTION_PROP);
+ String sliceName = message.getStr(SHARD_ID_PROP);
+ ClusterState clusterState = getClusterState();
+ DocCollection collection = clusterState.getCollection(collectionName);
+ if (collection == null) {
+ throw new Exception("Collection " + collectionName + " doesn't exist");
+ }
+ Slice slice = collection.getSlice(sliceName);
+ if (slice == null) {
+ throw new Exception(" Collection " + collectionName + " slice " + sliceName + " doesn't exist.");
+ }
+
+ opDelay(collectionName, CollectionParams.CollectionAction.DELETESHARD.name());
+
+ lock.lock();
+ try {
+ sliceProperties.computeIfAbsent(collectionName, coll -> new ConcurrentHashMap<>()).remove(sliceName);
+ nodeReplicaMap.forEach((n, replicas) -> {
+ Iterator<ReplicaInfo> it = replicas.iterator();
+ while (it.hasNext()) {
+ ReplicaInfo ri = it.next();
+ if (ri.getCollection().equals(collectionName) && ri.getShard().equals(sliceName)) {
+ it.remove();
+ }
+ }
+ });
+ collectionsStatesRef.set(null);
+ results.add("success", "");
+ } catch (Exception e) {
+ results.add("failure", e.toString());
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Saves cluster properties to clusterprops.json.
+ * @return current properties
+ */
+ private synchronized Map<String, Object> saveClusterProperties() throws Exception {
+ if (lastSavedProperties != null && lastSavedProperties.equals(clusterProperties)) {
+ return lastSavedProperties;
+ }
+ byte[] data = Utils.toJSON(clusterProperties);
+ VersionedData oldData = stateManager.getData(ZkStateReader.CLUSTER_PROPS);
+ int version = oldData != null ? oldData.getVersion() : -1;
+ stateManager.setData(ZkStateReader.CLUSTER_PROPS, data, version);
+ lastSavedProperties = (Map)Utils.fromJSON(data);
+ return lastSavedProperties;
+ }
+
+ /**
+ * Set all cluster properties. This also updates the clusterprops.json data in
+ * {@link DistribStateManager}
+ * @param properties properties to set
+ */
+ public void simSetClusterProperties(Map<String, Object> properties) throws Exception {
+ lock.lock();
+ try {
+ clusterProperties.clear();
+ if (properties != null) {
+ this.clusterProperties.putAll(properties);
+ }
+ saveClusterProperties();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Set a cluster property. This also updates the clusterprops.json data in
+ * {@link DistribStateManager}
+ * @param key property name
+ * @param value property value
+ */
+ public void simSetClusterProperty(String key, Object value) throws Exception {
+ lock.lock();
+ try {
+ if (value != null) {
+ clusterProperties.put(key, value);
+ } else {
+ clusterProperties.remove(key);
+ }
+ saveClusterProperties();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Set collection properties.
+ * @param coll collection name
+ * @param properties properties
+ */
+ public void simSetCollectionProperties(String coll, Map<String, Object> properties) throws Exception {
+ lock.lock();
+ try {
+ if (properties == null) {
+ collProperties.remove(coll);
+ } else {
+ Map<String, Object> props = collProperties.computeIfAbsent(coll, c -> new HashMap<>());
+ props.clear();
+ props.putAll(properties);
+ }
+ collectionsStatesRef.set(null);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Set collection property.
+ * @param coll collection name
+ * @param key property name
+ * @param value property value
+ */
+ public void simSetCollectionProperty(String coll, String key, String value) throws Exception {
+ Map<String, Object> props = collProperties.computeIfAbsent(coll, c -> new HashMap<>());
+ lock.lock();
+ try {
+ if (value == null) {
+ props.remove(key);
+ } else {
+ props.put(key, value);
+ }
+ collectionsStatesRef.set(null);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Set slice properties.
+ * @param coll collection name
+ * @param slice slice name
+ * @param properties slice properties
+ */
+ public void simSetSliceProperties(String coll, String slice, Map<String, Object> properties) throws Exception {
+ Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(coll, c -> new HashMap<>()).computeIfAbsent(slice, s -> new HashMap<>());
+ lock.lock();
+ try {
+ sliceProps.clear();
+ if (properties != null) {
+ sliceProps.putAll(properties);
+ }
+ collectionsStatesRef.set(null);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Set per-collection value (eg. a metric). This value will be applied to each replica.
+ * @param collection collection name
+ * @param key property name
+ * @param value property value
+ */
+ public void simSetCollectionValue(String collection, String key, Object value) throws Exception {
+ simSetCollectionValue(collection, key, value, false);
+ }
+
+ /**
+ * Set per-collection value (eg. a metric). This value will be applied to each replica.
+ * @param collection collection name
+ * @param key property name
+ * @param value property value
+ * @param divide if the value is a {@link Number} and this param is true, then the value will be evenly
+ * divided by the number of replicas.
+ */
+ public void simSetCollectionValue(String collection, String key, Object value, boolean divide) throws Exception {
+ simSetShardValue(collection, null, key, value, divide);
+ }
+
+ /**
+ * Set per-collection value (eg. a metric). This value will be applied to each replica in a selected shard.
+ * @param collection collection name
+ * @param shard shard name. If null then all shards will be affected.
+ * @param key property name
+ * @param value property value
+ */
+ public void simSetShardValue(String collection, String shard, String key, Object value) throws Exception {
+ simSetShardValue(collection, shard, key, value, false);
+ }
+
+ /**
+ * Set per-collection value (eg. a metric). This value will be applied to each replica in a selected shard.
+ * @param collection collection name
+ * @param shard shard name. If null then all shards will be affected.
+ * @param key property name
+ * @param value property value
+ * @param divide if the value is a {@link Number} and this is true, then the value will be evenly
+ * divided by the number of replicas.
+ */
+ public void simSetShardValue(String collection, String shard, String key, Object value, boolean divide) throws Exception {
+ List<ReplicaInfo> infos = new ArrayList<>();
+ nodeReplicaMap.forEach((n, replicas) -> {
+ replicas.forEach(r -> {
+ if (r.getCollection().equals(collection)) {
+ if (shard != null && !shard.equals(r.getShard())) {
+ return;
+ }
+ infos.add(r);
+ }
+ });
+ });
+ if (infos.isEmpty()) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection " + collection + " doesn't exist.");
+ }
+ if (divide && value != null && (value instanceof Number)) {
+ value = ((Number)value).doubleValue() / infos.size();
+ }
+ for (ReplicaInfo r : infos) {
+ synchronized (r) {
+ if (value == null) {
+ r.getVariables().remove(key);
+ } else {
+ r.getVariables().put(key, value);
+ }
+ }
+ }
+ }
+
+ /**
+ * Return all replica infos for a node.
+ * @param node node id
+ * @return list of replicas on that node, or empty list if none
+ */
+ public List<ReplicaInfo> simGetReplicaInfos(String node) {
+ List<ReplicaInfo> replicas = nodeReplicaMap.get(node);
+ if (replicas == null) {
+ return Collections.emptyList();
+ } else {
+ return replicas;
+ }
+ }
+
+ /**
+ * List collections.
+ * @return list of existing collections.
+ */
+ public List<String> simListCollections() {
+ final Set<String> collections = new HashSet<>();
+ lock.lock();
+ try {
+ nodeReplicaMap.forEach((n, replicas) -> {
+ replicas.forEach(ri -> collections.add(ri.getCollection()));
+ });
+ return new ArrayList<>(collections);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ // interface methods
+
+ @Override
+ public ClusterState.CollectionRef getState(String collection) {
+ try {
+ return getClusterState().getCollectionRef(collection);
+ } catch (IOException e) {
+ return null;
+ }
+ }
+
+ @Override
+ public Set<String> getLiveNodes() {
+ return liveNodes.get();
+ }
+
+ @Override
+ public List<String> resolveAlias(String alias) {
+ throw new UnsupportedOperationException("resolveAlias not implemented");
+ }
+
+ @Override
+ public ClusterState getClusterState() throws IOException {
+ Map<String, DocCollection> states = getCollectionStates();
+ ClusterState state = new ClusterState(clusterStateVersion, liveNodes.get(), states);
+ if (saveClusterState.getAndSet(false)) {
+ saveClusterState(state);
+ }
+ return state;
+ }
+
+ private Map<String, DocCollection> getCollectionStates() {
+ Map<String, DocCollection> collectionStates = collectionsStatesRef.get();
+ if (collectionStates != null) {
+ return collectionStates;
+ }
+ lock.lock();
+ collectionsStatesRef.set(null);
+ clusterStateVersion++;
+ saveClusterState.set(true);
+ try {
+ Map<String, Map<String, Map<String, Replica>>> collMap = new HashMap<>();
+ nodeReplicaMap.forEach((n, replicas) -> {
+ replicas.forEach(ri -> {
+ Map<String, Object> props;
+ synchronized (ri) {
+ props = new HashMap<>(ri.getVariables());
+ }
+ props.put(ZkStateReader.NODE_NAME_PROP, n);
+ props.put(ZkStateReader.CORE_NAME_PROP, ri.getCore());
+ props.put(ZkStateReader.REPLICA_TYPE, ri.getType().toString());
+ props.put(ZkStateReader.STATE_PROP, ri.getState().toString());
+ Replica r = new Replica(ri.getName(), props);
+ collMap.computeIfAbsent(ri.getCollection(), c -> new HashMap<>())
+ .computeIfAbsent(ri.getShard(), s -> new HashMap<>())
+ .put(ri.getName(), r);
+ });
+ });
+
+ // add empty slices
+ sliceProperties.forEach((c, perSliceProps) -> {
+ perSliceProps.forEach((slice, props) -> {
+ collMap.computeIfAbsent(c, co -> new ConcurrentHashMap<>()).computeIfAbsent(slice, s -> new ConcurrentHashMap<>());
+ });
+ });
+ // add empty collections
+ collProperties.keySet().forEach(c -> {
+ collMap.computeIfAbsent(c, co -> new ConcurrentHashMap<>());
+ });
+
+ Map<String, DocCollection> res = new HashMap<>();
+ collMap.forEach((coll, shards) -> {
+ Map<String, Slice> slices = new HashMap<>();
+ shards.forEach((s, replicas) -> {
+ Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(coll, c -> new ConcurrentHashMap<>()).computeIfAbsent(s, sl -> new ConcurrentHashMap<>());
+ Slice slice = new Slice(s, replicas, sliceProps);
+ slices.put(s, slice);
+ });
+ Map<String, Object> collProps = collProperties.computeIfAbsent(coll, c -> new ConcurrentHashMap<>());
+ DocCollection dc = new DocCollection(coll, slices, collProps, DocRouter.DEFAULT, clusterStateVersion, ZkStateReader.CLUSTER_STATE);
+ res.put(coll, dc);
+ });
+ collectionsStatesRef.set(res);
+ return res;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public Map<String, Object> getClusterProperties() {
+ return clusterProperties;
+ }
+
+ @Override
+ public String getPolicyNameByCollection(String coll) {
+ Map<String, Object> props = collProperties.computeIfAbsent(coll, c -> new HashMap<>());
+ return (String)props.get("policy");
+ }
+
+ @Override
+ public void connect() {
+
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java
new file mode 100644
index 0000000..f9f17a0
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java
@@ -0,0 +1,580 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.jute.Record;
+import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
+import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
+import org.apache.solr.cloud.ActionThrottle;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.AutoScalingParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.apache.solr.util.IdUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.proto.CheckVersionRequest;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.proto.DeleteRequest;
+import org.apache.zookeeper.proto.SetDataRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simulated {@link DistribStateManager} that keeps all data locally in a static structure. Instances of this
+ * class are identified by their id in order to simulate the deletion of ephemeral nodes when {@link #close()} is
+ * invoked.
+ */
+public class SimDistribStateManager implements DistribStateManager {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static final class Node {
+ ReentrantLock dataLock = new ReentrantLock();
+ private int version = -1;
+ private int seq = 0;
+ private final CreateMode mode;
+ private final String clientId;
+ private final String path;
+ private final String name;
+ private final Node parent;
+ private byte[] data = null;
+ private Map<String, Node> children = new ConcurrentHashMap<>();
+ Set<Watcher> dataWatches = ConcurrentHashMap.newKeySet();
+ Set<Watcher> childrenWatches = ConcurrentHashMap.newKeySet();
+
+ Node(Node parent, String name, String path, CreateMode mode, String clientId) {
+ this.parent = parent;
+ this.name = name;
+ this.path = path;
+ this.mode = mode;
+ this.clientId = clientId;
+
+ }
+
+ public void clear() {
+ dataLock.lock();
+ try {
+ children.clear();
+ version = 0;
+ seq = 0;
+ dataWatches.clear();
+ childrenWatches.clear();
+ data = null;
+ } finally {
+ dataLock.unlock();
+ }
+ }
+
+ public void setData(byte[] data, int version) throws BadVersionException, IOException {
+ Set<Watcher> currentWatchers = new HashSet<>(dataWatches);
+ dataLock.lock();
+ try {
+ if (version != -1 && version != this.version) {
+ throw new BadVersionException(version, path);
+ }
+ if (data != null) {
+ this.data = Arrays.copyOf(data, data.length);
+ } else {
+ this.data = null;
+ }
+ this.version++;
+ dataWatches.clear();
+ } finally {
+ dataLock.unlock();
+ }
+ for (Watcher w : currentWatchers) {
+ w.process(new WatchedEvent(Watcher.Event.EventType.NodeDataChanged, Watcher.Event.KeeperState.SyncConnected, path));
+ }
+ }
+
+ public VersionedData getData(Watcher w) {
+ dataLock.lock();
+ try {
+ VersionedData res = new VersionedData(version, data, clientId);
+ if (w != null && !dataWatches.contains(w)) {
+ dataWatches.add(w);
+ }
+ return res;
+ } finally {
+ dataLock.unlock();
+ }
+ }
+
+ public void setChild(String name, Node child) {
+ assert child.name.equals(name);
+ Set<Watcher> currentWatchers = new HashSet<>(childrenWatches);
+ dataLock.lock();
+ try {
+ children.put(name, child);
+ childrenWatches.clear();
+ } finally {
+ dataLock.unlock();
+ }
+ for (Watcher w : currentWatchers) {
+ w.process(new WatchedEvent(Watcher.Event.EventType.NodeChildrenChanged, Watcher.Event.KeeperState.SyncConnected, path));
+ }
+ }
+
+ public void removeChild(String name, int version) throws NoSuchElementException, BadVersionException, IOException {
+ Node n = children.get(name);
+ if (n == null) {
+ throw new NoSuchElementException(path + "/" + name);
+ }
+ if (version != -1 && version != n.version) {
+ throw new BadVersionException(version, path);
+ }
+ children.remove(name);
+ Set<Watcher> currentWatchers = new HashSet<>(childrenWatches);
+ childrenWatches.clear();
+ for (Watcher w : currentWatchers) {
+ w.process(new WatchedEvent(Watcher.Event.EventType.NodeChildrenChanged, Watcher.Event.KeeperState.SyncConnected, path));
+ }
+ currentWatchers = new HashSet<>(n.dataWatches);
+ n.dataWatches.clear();
+ for (Watcher w : currentWatchers) {
+ w.process(new WatchedEvent(Watcher.Event.EventType.NodeDeleted, Watcher.Event.KeeperState.SyncConnected, n.path));
+ }
+ // TODO: not sure if it's correct to recurse and fire watches???
+ Set<String> kids = new HashSet<>(n.children.keySet());
+ for (String kid : kids) {
+ n.removeChild(kid, -1);
+ }
+ }
+
+ public void removeEphemeralChildren(String id) throws NoSuchElementException, BadVersionException, IOException {
+ Set<String> kids = new HashSet<>(children.keySet());
+ for (String kid : kids) {
+ Node n = children.get(kid);
+ if (n == null) {
+ continue;
+ }
+ if ((CreateMode.EPHEMERAL == n.mode || CreateMode.EPHEMERAL_SEQUENTIAL == n.mode) &&
+ id.equals(n.clientId)) {
+ removeChild(n.name, -1);
+ } else {
+ n.removeEphemeralChildren(id);
+ }
+ }
+ }
+
+ }
+
+ private final ReentrantLock multiLock = new ReentrantLock();
+
+ public static Node createNewRootNode() {
+ return new Node(null, "", "/", CreateMode.PERSISTENT, "__root__");
+ }
+
+ private final ExecutorService watchersPool;
+
+ private final AtomicReference<ActionThrottle> throttleRef = new AtomicReference<>();
+ private final AtomicReference<ActionError> errorRef = new AtomicReference<>();
+ private final String id;
+ private final Node root;
+
+ public SimDistribStateManager() {
+ this(null);
+ }
+
+ /**
+ * Construct new state manager that uses provided root node for storing data.
+ * @param root if null then a new root node will be created.
+ */
+ public SimDistribStateManager(Node root) {
+ this.id = IdUtils.timeRandomId();
+ this.root = root != null ? root : createNewRootNode();
+ watchersPool = ExecutorUtil.newMDCAwareFixedThreadPool(10, new DefaultSolrThreadFactory("sim-watchers"));
+ }
+
+ public SimDistribStateManager(ActionThrottle actionThrottle, ActionError actionError) {
+ this(null, actionThrottle, actionError);
+ }
+
+ public SimDistribStateManager(Node root, ActionThrottle actionThrottle, ActionError actionError) {
+ this(root);
+ this.throttleRef.set(actionThrottle);
+ this.errorRef.set(actionError);
+ }
+
+ private SimDistribStateManager(String id, ExecutorService watchersPool, Node root, ActionThrottle actionThrottle,
+ ActionError actionError) {
+ this.id = id;
+ this.watchersPool = watchersPool;
+ this.root = root;
+ this.throttleRef.set(actionThrottle);
+ this.errorRef.set(actionError);
+ }
+
+ /**
+ * Create a copy of this instance using a specified ephemeral owner id. This is useful when performing
+ * node operations that require using a specific id. Note: this instance should never be closed, it can
+ * be just discarded after use.
+ * @param id ephemeral owner id
+ */
+ public SimDistribStateManager withEphemeralId(String id) {
+ return new SimDistribStateManager(id, watchersPool, root, throttleRef.get(), errorRef.get()) {
+ @Override
+ public void close() {
+ throw new UnsupportedOperationException("this instance should never be closed - instead close the parent instance.");
+ }
+ };
+ }
+
+ /**
+ * Get the root node of the tree used by this instance. It could be a static shared root node.
+ */
+ public Node getRoot() {
+ return root;
+ }
+
+ /**
+ * Clear this instance. All nodes, watchers and data is deleted.
+ */
+ public void clear() {
+ root.clear();
+ }
+
+ private void throttleOrError(String path) throws IOException {
+ ActionError err = errorRef.get();
+ if (err != null && err.shouldFail(path)) {
+ throw new IOException("Simulated error, path=" + path);
+ }
+ ActionThrottle throttle = throttleRef.get();
+ if (throttle != null) {
+ throttle.minimumWaitBetweenActions();
+ throttle.markAttemptingAction();
+ }
+ }
+
+ // this method should always be invoked under lock
+ private Node traverse(String path, boolean create, CreateMode mode) throws IOException {
+ if (path == null || path.isEmpty()) {
+ return null;
+ }
+ throttleOrError(path);
+ if (path.charAt(0) == '/') {
+ path = path.substring(1);
+ }
+ StringBuilder currentPath = new StringBuilder();
+ String[] elements = path.split("/");
+ Node parentNode = root;
+ Node n = null;
+ for (int i = 0; i < elements.length; i++) {
+ String currentName = elements[i];
+ currentPath.append('/');
+ LOG.info(" - parentNode=" + parentNode + ", currentName=" + currentName);
+ n = parentNode.children != null ? parentNode.children.get(currentName) : null;
+ if (n == null) {
+ if (create) {
+ if ((parentNode.mode == CreateMode.EPHEMERAL || parentNode.mode == CreateMode.EPHEMERAL_SEQUENTIAL) &&
+ (mode == CreateMode.EPHEMERAL || mode == CreateMode.EPHEMERAL_SEQUENTIAL)) {
+ throw new IOException("NoChildrenEphemerals for " + parentNode.path);
+ }
+ if (CreateMode.PERSISTENT_SEQUENTIAL == mode || CreateMode.EPHEMERAL_SEQUENTIAL == mode) {
+ currentName = currentName + String.format(Locale.ROOT, "%010d", parentNode.seq);
+ parentNode.seq++;
+ }
+ currentPath.append(currentName);
+ n = new Node(parentNode, currentName, currentPath.toString(), mode, id);
+ parentNode.setChild(currentName, n);
+ } else {
+ break;
+ }
+ } else {
+ currentPath.append(currentName);
+ }
+ parentNode = n;
+ }
+ return n;
+ }
+
+ @Override
+ public void close() throws IOException {
+ multiLock.lock();
+ try {
+ // remove all my ephemeral nodes
+ root.removeEphemeralChildren(id);
+ } catch (BadVersionException e) {
+ // not happening
+ } finally {
+ multiLock.unlock();
+ }
+
+ }
+
+ @Override
+ public boolean hasData(String path) throws IOException {
+ multiLock.lock();
+ try {
+ return traverse(path, false, CreateMode.PERSISTENT) != null;
+ } finally {
+ multiLock.unlock();
+ }
+ }
+
+ @Override
+ public List<String> listData(String path) throws NoSuchElementException, IOException {
+ multiLock.lock();
+ try {
+ Node n = traverse(path, false, CreateMode.PERSISTENT);
+ if (n == null) {
+ throw new NoSuchElementException(path);
+ }
+ List<String> res = new ArrayList<>(n.children.keySet());
+ Collections.sort(res);
+ return res;
+ } finally {
+ multiLock.unlock();
+ }
+ }
+
+ @Override
+ public List<String> listData(String path, Watcher watcher) throws NoSuchElementException, IOException {
+ Node n;
+ List<String> res;
+ multiLock.lock();
+ try {
+ n = traverse(path, false, CreateMode.PERSISTENT);
+ if (n == null) {
+ throw new NoSuchElementException(path);
+ }
+ res = new ArrayList<>(n.children.keySet());
+ Collections.sort(res);
+ } finally {
+ multiLock.unlock();
+ }
+ if (watcher != null) {
+ n.dataWatches.add(watcher);
+ n.childrenWatches.add(watcher);
+ }
+ return res;
+ }
+
+ @Override
+ public VersionedData getData(String path, Watcher watcher) throws NoSuchElementException, IOException {
+ Node n = null;
+ multiLock.lock();
+ try {
+ n = traverse(path, false, CreateMode.PERSISTENT);
+ if (n == null) {
+ throw new NoSuchElementException(path);
+ }
+ } finally {
+ multiLock.unlock();
+ }
+ return n.getData(watcher);
+ }
+
+ @Override
+ public void makePath(String path) throws IOException {
+ multiLock.lock();
+ try {
+ traverse(path, true, CreateMode.PERSISTENT);
+ } finally {
+ multiLock.unlock();
+ }
+ }
+
+ @Override
+ public void makePath(String path, byte[] data, CreateMode createMode, boolean failOnExists) throws AlreadyExistsException, IOException, KeeperException, InterruptedException {
+ Node n = null;
+ multiLock.lock();
+ try {
+ if (failOnExists && hasData(path)) {
+ throw new AlreadyExistsException(path);
+ }
+ n = traverse(path, true, createMode);
+ } finally {
+ multiLock.unlock();
+ }
+ try {
+ n.setData(data, -1);
+ } catch (BadVersionException e) {
+ throw new IOException("should not happen!", e);
+ }
+ }
+
+ @Override
+ public String createData(String path, byte[] data, CreateMode mode) throws AlreadyExistsException, NoSuchElementException, IOException {
+ if ((CreateMode.EPHEMERAL == mode || CreateMode.PERSISTENT == mode) && hasData(path)) {
+ throw new AlreadyExistsException(path);
+ }
+ // check if parent exists
+ String relPath = path.charAt(0) == '/' ? path.substring(1) : path;
+ if (relPath.length() > 0) { // non-root path - check if parent exists
+ String[] elements = relPath.split("/");
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < elements.length - 1; i++) {
+ sb.append('/');
+ sb.append(elements[i]);
+ }
+ if (!hasData(sb.toString())) {
+ throw new NoSuchElementException(sb.toString());
+ }
+ }
+ Node n = null;
+ multiLock.lock();
+ try {
+ n = traverse(path, true, mode);
+ } finally {
+ multiLock.unlock();
+ }
+ try {
+ n.setData(data, -1);
+ return n.path;
+ } catch (BadVersionException e) {
+ // not happening
+ return null;
+ }
+ }
+
+ @Override
+ public void removeData(String path, int version) throws NoSuchElementException, BadVersionException, IOException {
+ multiLock.lock();
+ try {
+ Node n = traverse(path, false, CreateMode.PERSISTENT);
+ if (n == null) {
+ throw new NoSuchElementException(path);
+ }
+ Node parent = n.parent;
+ if (parent == null) {
+ throw new IOException("Cannot remove root node");
+ }
+ parent.removeChild(n.name, version);
+ } finally {
+ multiLock.unlock();
+ }
+
+ }
+
+ @Override
+ public void setData(String path, byte[] data, int version) throws NoSuchElementException, BadVersionException, IOException {
+ multiLock.lock();
+ Node n = null;
+ try {
+ n = traverse(path, false, CreateMode.PERSISTENT);
+ if (n == null) {
+ throw new NoSuchElementException(path);
+ }
+ } finally {
+ multiLock.unlock();
+ }
+ n.setData(data, version);
+ }
+
+ @Override
+ public List<OpResult> multi(Iterable<Op> ops) throws BadVersionException, NoSuchElementException, AlreadyExistsException, IOException, KeeperException, InterruptedException {
+ multiLock.lock();
+ List<OpResult> res = new ArrayList<>();
+ try {
+ for (Op op : ops) {
+ Record r = op.toRequestRecord();
+ try {
+ if (op instanceof Op.Check) {
+ CheckVersionRequest rr = (CheckVersionRequest)r;
+ Node n = traverse(rr.getPath(), false, CreateMode.PERSISTENT);
+ if (n == null) {
+ throw new NoSuchElementException(rr.getPath());
+ }
+ if (rr.getVersion() != -1 && n.version != rr.getVersion()) {
+ throw new Exception("version mismatch");
+ }
+ // everything ok
+ res.add(new OpResult.CheckResult());
+ } else if (op instanceof Op.Create) {
+ CreateRequest rr = (CreateRequest)r;
+ createData(rr.getPath(), rr.getData(), CreateMode.fromFlag(rr.getFlags()));
+ res.add(new OpResult.CreateResult(rr.getPath()));
+ } else if (op instanceof Op.Delete) {
+ DeleteRequest rr = (DeleteRequest)r;
+ removeData(rr.getPath(), rr.getVersion());
+ res.add(new OpResult.DeleteResult());
+ } else if (op instanceof Op.SetData) {
+ SetDataRequest rr = (SetDataRequest)r;
+ setData(rr.getPath(), rr.getData(), rr.getVersion());
+ VersionedData vd = getData(rr.getPath());
+ Stat s = new Stat();
+ s.setVersion(vd.getVersion());
+ res.add(new OpResult.SetDataResult(s));
+ } else {
+ throw new Exception("Unknown Op: " + op);
+ }
+ } catch (Exception e) {
+ res.add(new OpResult.ErrorResult(KeeperException.Code.APIERROR.intValue()));
+ }
+ }
+ } finally {
+ multiLock.unlock();
+ }
+ return res;
+ }
+
+ @Override
+ public AutoScalingConfig getAutoScalingConfig(Watcher watcher) throws InterruptedException, IOException {
+ Map<String, Object> map = new HashMap<>();
+ int version = -1;
+ try {
+ VersionedData data = getData(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, watcher);
+ if (data != null && data.getData() != null && data.getData().length > 0) {
+ map = (Map<String, Object>) Utils.fromJSON(data.getData());
+ version = data.getVersion();
+ }
+ } catch (NoSuchElementException e) {
+ // ignore
+ }
+ map.put(AutoScalingParams.ZK_VERSION, version);
+ return new AutoScalingConfig(map);
+ }
+
+ // ------------ simulator methods --------------
+
+ public void simSetAutoScalingConfig(AutoScalingConfig cfg) throws Exception {
+ try {
+ makePath(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH);
+ } catch (Exception e) {
+ // ignore
+ }
+ setData(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(cfg), -1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistributedQueueFactory.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistributedQueueFactory.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistributedQueueFactory.java
new file mode 100644
index 0000000..e9616f0
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistributedQueueFactory.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Predicate;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
+import org.apache.solr.client.solrj.cloud.DistributedQueue;
+import org.apache.solr.client.solrj.cloud.DistributedQueueFactory;
+import org.apache.solr.cloud.Stats;
+import org.apache.solr.common.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simulated {@link DistributedQueueFactory} that keeps all data in memory. Unlike
+ * the {@link GenericDistributedQueueFactory} this queue implementation data is not
+ * exposed anywhere.
+ */
+public class SimDistributedQueueFactory implements DistributedQueueFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ Map<String, SimDistributedQueue> queues = new ConcurrentHashMap<>();
+
+ public SimDistributedQueueFactory() {
+ }
+
+ @Override
+ public DistributedQueue makeQueue(final String path) throws IOException {
+ return queues.computeIfAbsent(path, p -> new SimDistributedQueue(path));
+ }
+
+ @Override
+ public void removeQueue(String path) throws IOException {
+ queues.remove(path);
+ }
+
+ public static class SimDistributedQueue implements DistributedQueue {
+ private final Queue<Pair<String, byte[]>> queue = new ConcurrentLinkedQueue<>();
+ private final ReentrantLock updateLock = new ReentrantLock();
+ private final Condition changed = updateLock.newCondition();
+ private final Stats stats = new Stats();
+ private final String dir;
+ private int seq = 0;
+
+ public SimDistributedQueue(String dir) {
+ this.dir = dir;
+ }
+
+ @Override
+ public byte[] peek() throws Exception {
+ Timer.Context time = stats.time(dir + "_peek");
+ try {
+ Pair<String, byte[]> pair = queue.peek();
+ return pair != null ? pair.second() : null;
+ } finally {
+ time.stop();
+ }
+ }
+
+ @Override
+ public byte[] peek(boolean block) throws Exception {
+ return block ? peek(Long.MAX_VALUE) : peek();
+ }
+
+ @Override
+ public byte[] peek(long wait) throws Exception {
+ Timer.Context time;
+ if (wait == Long.MAX_VALUE) {
+ time = stats.time(dir + "_peek_wait_forever");
+ } else {
+ time = stats.time(dir + "_peek_wait" + wait);
+ }
+ try {
+ Pair<String, byte[]> pair = peekInternal(wait);
+ return pair != null ? pair.second() : null;
+ } finally {
+ time.stop();
+ }
+ }
+
+ private Pair<String, byte[]> peekInternal(long wait) throws Exception {
+ Preconditions.checkArgument(wait > 0);
+ long waitNanos = TimeUnit.MILLISECONDS.toNanos(wait);
+ updateLock.lockInterruptibly();
+ try {
+ while (waitNanos > 0) {
+ Pair<String, byte[]> pair = queue.peek();
+ if (pair != null) {
+ return pair;
+ }
+ waitNanos = changed.awaitNanos(waitNanos);
+ if (waitNanos < 0) { // timed out
+ return null;
+ }
+ }
+ } finally {
+ updateLock.unlock();
+ }
+ return null;
+ }
+
+ @Override
+ public byte[] poll() throws Exception {
+ Timer.Context time = stats.time(dir + "_poll");
+ updateLock.lockInterruptibly();
+ try {
+ Pair<String, byte[]> pair = queue.poll();
+ if (pair != null) {
+ changed.signalAll();
+ return pair.second();
+ } else {
+ return null;
+ }
+ } finally {
+ updateLock.unlock();
+ time.stop();
+ }
+ }
+
+ @Override
+ public byte[] remove() throws Exception {
+ Timer.Context time = stats.time(dir + "_remove");
+ updateLock.lockInterruptibly();
+ try {
+ byte[] res = queue.remove().second();
+ changed.signalAll();
+ return res;
+ } finally {
+ updateLock.unlock();
+ time.stop();
+ }
+ }
+
+ @Override
+ public byte[] take() throws Exception {
+ Timer.Context timer = stats.time(dir + "_take");
+ updateLock.lockInterruptibly();
+ try {
+ while (true) {
+ byte[] result = poll();
+ if (result != null) {
+ return result;
+ }
+ changed.await();
+ }
+ } finally {
+ updateLock.unlock();
+ timer.stop();
+ }
+ }
+
+ @Override
+ public void offer(byte[] data) throws Exception {
+ Timer.Context time = stats.time(dir + "_offer");
+ updateLock.lockInterruptibly();
+ try {
+ queue.offer(new Pair(String.format(Locale.ROOT, "qn-%010d", seq), data));
+ seq++;
+ LOG.trace("=== offer " + System.nanoTime());
+ changed.signalAll();
+ } finally {
+ updateLock.unlock();
+ time.stop();
+ }
+ }
+
+ @Override
+ public Collection<Pair<String, byte[]>> peekElements(int max, long waitMillis, Predicate<String> acceptFilter) throws Exception {
+ updateLock.lockInterruptibly();
+ try {
+ List<Pair<String, byte[]>> res = new LinkedList<>();
+ final int maximum = max < 0 ? Integer.MAX_VALUE : max;
+ final AtomicReference<Pair<String, byte[]>> pairRef = new AtomicReference<>();
+ queue.forEach(pair -> {
+ if (acceptFilter != null && !acceptFilter.test(pair.first())) {
+ return;
+ }
+ if (res.size() < maximum) {
+ pairRef.set(pair);
+ res.add(pair);
+ }
+ });
+ if (res.size() < maximum && waitMillis > 0) {
+ long waitNanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
+ waitNanos = changed.awaitNanos(waitNanos);
+ if (waitNanos < 0) {
+ return res;
+ }
+ AtomicBoolean seen = new AtomicBoolean(false);
+ queue.forEach(pair -> {
+ if (!seen.get()) {
+ if (pairRef.get() == null) {
+ seen.set(true);
+ } else {
+ if (pairRef.get().first().equals(pair.first())) {
+ seen.set(true);
+ return;
+ }
+ }
+ }
+ if (!seen.get()) {
+ return;
+ }
+ if (!acceptFilter.test(pair.first())) {
+ return;
+ }
+ if (res.size() < maximum) {
+ res.add(pair);
+ pairRef.set(pair);
+ } else {
+ return;
+ }
+ });
+ }
+ return res;
+ } finally {
+ updateLock.unlock();
+ }
+ }
+
+ public Stats getZkStats() {
+ return stats;
+ }
+
+ @Override
+ public Map<String, Object> getStats() {
+ if (stats == null) {
+ return Collections.emptyMap();
+ }
+ Map<String, Object> res = new HashMap<>();
+ res.put("queueLength", stats.getQueueLength());
+ final Map<String, Object> statsMap = new HashMap<>();
+ res.put("stats", statsMap);
+ stats.getStats().forEach((op, stat) -> {
+ final Map<String, Object> statMap = new HashMap<>();
+ statMap.put("success", stat.success.get());
+ statMap.put("errors", stat.errors.get());
+ final List<Map<String, Object>> failed = new ArrayList<>(stat.failureDetails.size());
+ statMap.put("failureDetails", failed);
+ stat.failureDetails.forEach(failedOp -> {
+ Map<String, Object> fo = new HashMap<>();
+ fo.put("req", failedOp.req);
+ fo.put("resp", failedOp.resp);
+ });
+ statsMap.put(op, statMap);
+ });
+ return res;
+ }
+ }
+}