You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/12/07 12:45:42 UTC

[48/50] [abbrv] lucene-solr:jira/solr-11285-sim: Merge branch 'master' into jira/solr-11285-sim

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/5ec9b64b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
----------------------------------------------------------------------
diff --cc solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
index f65f31a,0000000..d081fee
mode 100644,000000..100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
@@@ -1,1139 -1,0 +1,1154 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.solr.cloud.autoscaling.sim;
 +
 +import java.io.IOException;
 +import java.lang.invoke.MethodHandles;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Locale;
 +import java.util.Map;
 +import java.util.Random;
 +import java.util.Set;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.concurrent.atomic.AtomicReference;
 +import java.util.concurrent.locks.ReentrantLock;
 +
 +import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
++import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
 +import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
 +import org.apache.solr.client.solrj.cloud.autoscaling.Suggestion;
 +import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
 +import org.apache.solr.client.solrj.impl.ClusterStateProvider;
 +import org.apache.solr.cloud.ActionThrottle;
 +import org.apache.solr.cloud.AddReplicaCmd;
 +import org.apache.solr.cloud.Assign;
 +import org.apache.solr.cloud.CreateCollectionCmd;
 +import org.apache.solr.cloud.CreateShardCmd;
 +import org.apache.solr.cloud.SplitShardCmd;
 +import org.apache.solr.cloud.overseer.ClusterStateMutator;
 +import org.apache.solr.cloud.overseer.CollectionMutator;
 +import org.apache.solr.cloud.overseer.ZkWriteCommand;
 +import org.apache.solr.common.SolrException;
 +import org.apache.solr.common.cloud.ClusterState;
 +import org.apache.solr.common.cloud.DocCollection;
 +import org.apache.solr.common.cloud.DocRouter;
 +import org.apache.solr.common.cloud.Replica;
 +import org.apache.solr.common.cloud.ReplicaPosition;
 +import org.apache.solr.common.cloud.Slice;
 +import org.apache.solr.common.cloud.ZkNodeProps;
 +import org.apache.solr.common.cloud.ZkStateReader;
 +import org.apache.solr.common.params.CollectionParams;
 +import org.apache.solr.common.params.CommonAdminParams;
 +import org.apache.solr.common.params.CoreAdminParams;
 +import org.apache.solr.common.util.NamedList;
 +import org.apache.solr.common.util.Utils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
 +import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
 +import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
 +import static org.apache.solr.common.params.CommonParams.NAME;
 +
 +/**
 + * Simulated {@link ClusterStateProvider}.
 + */
 +public class SimClusterStateProvider implements ClusterStateProvider {
 +  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 +
 +  private final Map<String, List<ReplicaInfo>> nodeReplicaMap = new ConcurrentHashMap<>();
 +  private final Set<String> liveNodes;
 +  private final DistribStateManager stateManager;
 +  private final SimCloudManager cloudManager;
 +
 +  private final Map<String, Object> clusterProperties = new ConcurrentHashMap<>();
 +  private final Map<String, Map<String, Object>> collProperties = new ConcurrentHashMap<>();
 +  private final Map<String, Map<String, Map<String, Object>>> sliceProperties = new ConcurrentHashMap<>();
 +
 +  private final ReentrantLock lock = new ReentrantLock();
 +
 +  private final ActionThrottle leaderThrottle;
 +
 +  // default map of: operation -> delay
 +  private final Map<String, Long> defaultOpDelays = new HashMap<>();
 +  // per-collection map of: collection -> op -> delay
 +  private final Map<String, Map<String, Long>> opDelays = new ConcurrentHashMap<>();
 +
 +
 +  private ClusterState lastSavedState = null;
 +  private Map<String, Object> lastSavedProperties = null;
 +
 +  private AtomicReference<Map<String, DocCollection>> collectionsStatesRef = new AtomicReference<>();
 +
 +  /**
 +   * The instance needs to be initialized using the <code>sim*</code> methods in order
 +   * to ensure proper behavior, otherwise it will behave as a cluster with zero replicas.
 +   */
 +  public SimClusterStateProvider(Set<String> liveNodes, SimCloudManager cloudManager) {
 +    this.liveNodes = liveNodes;
 +    this.cloudManager = cloudManager;
 +    this.stateManager = cloudManager.getDistribStateManager();
 +    this.leaderThrottle = new ActionThrottle("leader", 5000, cloudManager.getTimeSource());
 +    // names are CollectionAction names, delays are in ms (simulated time)
 +    defaultOpDelays.put(CollectionParams.CollectionAction.MOVEREPLICA.name(), 5000L);
 +    defaultOpDelays.put(CollectionParams.CollectionAction.DELETEREPLICA.name(), 5000L);
 +    defaultOpDelays.put(CollectionParams.CollectionAction.ADDREPLICA.name(), 500L);
 +    defaultOpDelays.put(CollectionParams.CollectionAction.SPLITSHARD.name(), 5000L);
 +    defaultOpDelays.put(CollectionParams.CollectionAction.CREATESHARD.name(), 5000L);
 +    defaultOpDelays.put(CollectionParams.CollectionAction.DELETESHARD.name(), 5000L);
 +    defaultOpDelays.put(CollectionParams.CollectionAction.CREATE.name(), 500L);
 +    defaultOpDelays.put(CollectionParams.CollectionAction.DELETE.name(), 5000L);
 +  }
 +
 +  // ============== SIMULATOR SETUP METHODS ====================
 +
 +  /**
 +   * Initialize from an existing cluster state
 +   * @param initialState initial cluster state
 +   */
 +  public void simSetClusterState(ClusterState initialState) throws Exception {
 +    lock.lock();
 +    try {
 +      collProperties.clear();
 +      sliceProperties.clear();
 +      nodeReplicaMap.clear();
 +      liveNodes.clear();
 +      liveNodes.addAll(initialState.getLiveNodes());
 +      initialState.forEachCollection(dc -> {
 +        collProperties.computeIfAbsent(dc.getName(), name -> new ConcurrentHashMap<>()).putAll(dc.getProperties());
 +        opDelays.computeIfAbsent(dc.getName(), c -> new HashMap<>()).putAll(defaultOpDelays);
 +        dc.getSlices().forEach(s -> {
 +          sliceProperties.computeIfAbsent(dc.getName(), name -> new ConcurrentHashMap<>())
 +              .computeIfAbsent(s.getName(), name -> new HashMap<>()).putAll(s.getProperties());
 +          s.getReplicas().forEach(r -> {
 +            ReplicaInfo ri = new ReplicaInfo(r.getName(), r.getCoreName(), dc.getName(), s.getName(), r.getType(), r.getNodeName(), r.getProperties());
 +            if (liveNodes.contains(r.getNodeName())) {
 +              nodeReplicaMap.computeIfAbsent(r.getNodeName(), rn -> new ArrayList<>()).add(ri);
 +            }
 +          });
 +        });
 +      });
 +      saveClusterState();
 +    } finally {
 +      lock.unlock();
 +    }
 +  }
 +
 +  /**
 +   * Reset the leader election throttle.
 +   */
 +  public void simResetLeaderThrottle() {
 +    leaderThrottle.reset();
 +  }
 +
 +  /**
 +   * Get random node id.
 +   * @param random instance of random.
 +   * @return one of the live nodes
 +   */
 +  public String simGetRandomNode(Random random) {
 +    if (liveNodes.isEmpty()) {
 +      return null;
 +    }
 +    List<String> nodes = new ArrayList<>(liveNodes);
 +    return nodes.get(random.nextInt(nodes.size()));
 +  }
 +
 +  // todo: maybe hook up DistribStateManager /live_nodes ?
 +  // todo: maybe hook up DistribStateManager /clusterstate.json watchers?
 +
 +  /**
 +   * Add a new node to the cluster.
 +   * @param nodeId unique node id
 +   */
 +  public void simAddNode(String nodeId) throws Exception {
 +    if (liveNodes.contains(nodeId)) {
 +      throw new Exception("Node " + nodeId + " already exists");
 +    }
 +    liveNodes.add(nodeId);
 +    nodeReplicaMap.putIfAbsent(nodeId, new ArrayList<>());
 +  }
 +
 +  // utility class to run leader election in a separate thread and with throttling
 +  // Note: leader election is a no-op if a shard leader already exists for each shard
 +  private class LeaderElection implements Callable<Boolean> {
 +    Collection<String> collections;
 +    boolean saveClusterState;
 +
 +    LeaderElection(Collection<String> collections, boolean saveClusterState) {
 +      this.collections = collections;
 +      this.saveClusterState = saveClusterState;
 +    }
 +
 +    @Override
 +    public Boolean call() {
 +      leaderThrottle.minimumWaitBetweenActions();
 +      leaderThrottle.markAttemptingAction();
 +      try {
 +        simRunLeaderElection(collections, saveClusterState);
 +      } catch (Exception e) {
 +        return false;
 +      }
 +      return true;
 +    }
 +  }
 +
 +  // todo: maybe hook up DistribStateManager /live_nodes ?
 +  // todo: maybe hook up DistribStateManager /clusterstate.json watchers?
 +
 +  /**
 +   * Remove node from a cluster. This is equivalent to a situation when a node is lost.
 +   * All replicas that were assigned to this node are marked as DOWN.
 +   * @param nodeId node id
 +   * @return true if a node existed and was removed
 +   */
 +  public boolean simRemoveNode(String nodeId) throws Exception {
 +    lock.lock();
 +    try {
 +      Set<String> collections = new HashSet<>();
 +      // mark every replica on that node as down
 +      List<ReplicaInfo> replicas = nodeReplicaMap.get(nodeId);
 +      if (replicas != null) {
 +        replicas.forEach(r -> {
 +          r.getVariables().put(ZkStateReader.STATE_PROP, Replica.State.DOWN.toString());
 +          collections.add(r.getCollection());
 +        });
 +      }
 +      boolean res = liveNodes.remove(nodeId);
 +      if (!collections.isEmpty()) {
 +        cloudManager.submit(new LeaderElection(collections, true));
 +      }
 +      return res;
 +    } finally {
 +      lock.unlock();
 +    }
 +  }
 +
 +  /**
 +   * Add a new replica. Note that if any details of a replica (node, coreNodeName, SolrCore name, etc)
 +   * are missing they will be filled in using the policy framework.
 +   * @param message replica details
 +   * @param results result of the operation
 +   */
 +  public void simAddReplica(ZkNodeProps message, NamedList results) throws Exception {
-     AtomicLong policyVersionAfter = new AtomicLong(-1);
 +    ClusterState clusterState = getClusterState();
 +    DocCollection coll = clusterState.getCollection(message.getStr(ZkStateReader.COLLECTION_PROP));
-     message = AddReplicaCmd.assignReplicaDetails(cloudManager, clusterState, message, policyVersionAfter);
++    AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
++    message = AddReplicaCmd.assignReplicaDetails(cloudManager, clusterState, message, sessionWrapper);
++    if (sessionWrapper.get() != null) {
++      sessionWrapper.get().release();
++    }
 +    if (message.getStr(CoreAdminParams.CORE_NODE_NAME) == null) {
 +      message = message.plus(CoreAdminParams.CORE_NODE_NAME, Assign.assignCoreNodeName(stateManager, coll));
 +    }
 +    ReplicaInfo ri = new ReplicaInfo(
 +        message.getStr(CoreAdminParams.CORE_NODE_NAME),
 +        message.getStr(CoreAdminParams.NAME),
 +        message.getStr(ZkStateReader.COLLECTION_PROP),
 +        message.getStr(ZkStateReader.SHARD_ID_PROP),
 +        Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT)),
 +        message.getStr(CoreAdminParams.NODE),
 +        message.getProperties()
 +    );
 +    simAddReplica(message.getStr(CoreAdminParams.NODE), ri, true);
 +  }
 +
 +  // todo: maybe hook up DistribStateManager /clusterstate.json watchers?
 +
 +  /**
 +   * Add a replica. Note that all details of the replica must be present here, including
 +   * node, coreNodeName and SolrCore name.
 +   * @param nodeId node id where the replica will be added
 +   * @param replicaInfo replica info
 +   * @param runLeaderElection if true then run a leader election after adding the replica.
 +   */
 +  public void simAddReplica(String nodeId, ReplicaInfo replicaInfo, boolean runLeaderElection) throws Exception {
 +    // make sure coreNodeName is unique across cluster
 +    for (Map.Entry<String, List<ReplicaInfo>> e : nodeReplicaMap.entrySet()) {
 +      for (ReplicaInfo ri : e.getValue()) {
 +        if (ri.getCore().equals(replicaInfo.getCore())) {
 +          throw new Exception("Duplicate coreNodeName for existing=" + ri + " on node " + e.getKey() + " and new=" + replicaInfo);
 +        }
 +      }
 +    }
 +    if (!liveNodes.contains(nodeId)) {
 +      throw new Exception("Target node " + nodeId + " is not live: " + liveNodes);
 +    }
 +    // verify info
 +    if (replicaInfo.getCore() == null) {
 +      throw new Exception("Missing core: " + replicaInfo);
 +    }
 +//    if (replicaInfo.getShard() == null) {
 +//      throw new Exception("Missing shard: " + replicaInfo);
 +//    }
 +    replicaInfo.getVariables().remove(ZkStateReader.SHARD_ID_PROP);
 +    if (replicaInfo.getName() == null) {
 +      throw new Exception("Missing name: " + replicaInfo);
 +    }
 +    if (replicaInfo.getNode() == null) {
 +      throw new Exception("Missing node: " + replicaInfo);
 +    }
 +    if (!replicaInfo.getNode().equals(nodeId)) {
 +      throw new Exception("Wrong node (not " + nodeId + "): " + replicaInfo);
 +    }
 +
 +    lock.lock();
 +    try {
 +
 +      opDelay(replicaInfo.getCollection(), CollectionParams.CollectionAction.ADDREPLICA.name());
 +
 +      List<ReplicaInfo> replicas = nodeReplicaMap.computeIfAbsent(nodeId, n -> new ArrayList<>());
 +      // mark replica as active
 +      replicaInfo.getVariables().put(ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
 +      // add property expected in tests
 +      replicaInfo.getVariables().put(Suggestion.coreidxsize, 123450000);
 +      // at this point nuke our cached DocCollection state
 +      collectionsStatesRef.set(null);
 +
 +      replicas.add(replicaInfo);
 +      Map<String, Object> values = cloudManager.getSimNodeStateProvider().simGetAllNodeValues()
 +          .computeIfAbsent(nodeId, id -> new ConcurrentHashMap<>(SimCloudManager.createNodeValues(id)));
 +      // update the number of cores in node values
 +      Integer cores = (Integer)values.get("cores");
 +      if (cores == null) {
 +        cores = 0;
 +      }
 +      cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, "cores", cores + 1);
 +      if (runLeaderElection) {
 +        cloudManager.submit(new LeaderElection(Collections.singleton(replicaInfo.getCollection()), true));
 +      }
 +    } finally {
 +      lock.unlock();
 +    }
 +  }
 +
 +  // todo: maybe hook up DistribStateManager /clusterstate.json watchers?
 +
 +  /**
 +   * Remove replica.
 +   * @param nodeId node id
 +   * @param coreNodeName coreNodeName
 +   */
 +  public void simRemoveReplica(String nodeId, String coreNodeName) throws Exception {
 +    List<ReplicaInfo> replicas = nodeReplicaMap.computeIfAbsent(nodeId, n -> new ArrayList<>());
 +    lock.lock();
 +    try {
 +      for (int i = 0; i < replicas.size(); i++) {
 +        if (coreNodeName.equals(replicas.get(i).getName())) {
 +          ReplicaInfo ri = replicas.remove(i);
 +
 +          opDelay(ri.getCollection(), CollectionParams.CollectionAction.DELETEREPLICA.name());
 +
 +          // update the number of cores in node values, if node is live
 +          Integer cores = (Integer)cloudManager.getSimNodeStateProvider().simGetNodeValue(nodeId, "cores");
 +          if (liveNodes.contains(nodeId)) {
 +            if (cores == null || cores == 0) {
 +              throw new Exception("Unexpected value of 'cores' (" + cores + ") on node: " + nodeId);
 +            }
 +            cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, "cores", cores - 1);
 +          }
 +          cloudManager.submit(new LeaderElection(Collections.singleton(ri.getCollection()), true));
 +          return;
 +        }
 +      }
 +      throw new Exception("Replica " + coreNodeName + " not found on node " + nodeId);
 +    } finally {
 +      lock.unlock();
 +    }
 +  }
 +
 +  /**
 +   * Save clusterstate.json to {@link DistribStateManager}.
 +   * @return saved state
 +   */
 +  private ClusterState saveClusterState() throws IOException {
 +    collectionsStatesRef.set(null);
 +    ClusterState currentState = getClusterState();
 +    if (lastSavedState != null && lastSavedState.equals(currentState)) {
 +      return lastSavedState;
 +    }
 +    byte[] data = Utils.toJSON(currentState);
 +    try {
 +      VersionedData oldData = stateManager.getData(ZkStateReader.CLUSTER_STATE);
 +      int version = oldData != null ? oldData.getVersion() : -1;
 +      stateManager.setData(ZkStateReader.CLUSTER_STATE, data, version);
 +    } catch (Exception e) {
 +      throw new IOException(e);
 +    }
 +    lastSavedState = currentState;
 +    return currentState;
 +  }
 +
 +  /**
 +   * Delay an operation by a configured amount.
 +   * @param collection collection name
 +   * @param op operation name.
 +   */
 +  private void opDelay(String collection, String op) throws InterruptedException {
 +    Map<String, Long> delays = opDelays.get(collection);
 +    if (delays == null || delays.isEmpty() || !delays.containsKey(op)) {
 +      return;
 +    }
 +    cloudManager.getTimeSource().sleep(delays.get(op));
 +  }
 +
 +  /**
 +   * Simulate running a shard leader election. This operation is a no-op if a leader already exists.
 +   * If a new leader is elected the cluster state is saved.
 +   * @param collections list of affected collections
 +   * @param saveClusterState if true then save cluster state regardless of changes.
 +   */
 +  private synchronized void simRunLeaderElection(Collection<String> collections, boolean saveClusterState) throws Exception {
 +    ClusterState state = getClusterState();
 +    AtomicBoolean stateChanged = new AtomicBoolean(Boolean.FALSE);
 +
 +    state.forEachCollection(dc -> {
 +      if (!collections.contains(dc.getName())) {
 +        return;
 +      }
 +      dc.getSlices().forEach(s -> {
 +        Replica leader = s.getLeader();
 +        if (leader == null || !liveNodes.contains(leader.getNodeName())) {
 +          LOG.trace("Running leader election for " + dc.getName() + " / " + s.getName());
 +          if (s.getReplicas().isEmpty()) { // no replicas - punt
 +            return;
 +          }
 +          // mark all replicas as non-leader (probably not necessary) and collect all active and live
 +          List<ReplicaInfo> active = new ArrayList<>();
 +          s.getReplicas().forEach(r -> {
 +            AtomicReference<ReplicaInfo> riRef = new AtomicReference<>();
 +            // find our ReplicaInfo for this replica
 +            nodeReplicaMap.get(r.getNodeName()).forEach(info -> {
 +              if (info.getName().equals(r.getName())) {
 +                riRef.set(info);
 +              }
 +            });
 +            ReplicaInfo ri = riRef.get();
 +            if (ri == null) {
 +              throw new IllegalStateException("-- could not find ReplicaInfo for replica " + r);
 +            }
 +            synchronized (ri) {
 +              if (ri.getVariables().remove(ZkStateReader.LEADER_PROP) != null) {
 +                stateChanged.set(true);
 +              }
 +              if (r.isActive(liveNodes)) {
 +                active.add(ri);
 +              } else { // if it's on a node that is not live mark it down
 +                if (!liveNodes.contains(r.getNodeName())) {
 +                  ri.getVariables().put(ZkStateReader.STATE_PROP, Replica.State.DOWN.toString());
 +                }
 +              }
 +            }
 +          });
 +          if (active.isEmpty()) {
 +            LOG.warn("-- can't find any active replicas for " + dc.getName() + " / " + s.getName());
 +            return;
 +          }
 +          // pick first active one
 +          ReplicaInfo ri = null;
 +          for (ReplicaInfo a : active) {
 +            if (!a.getType().equals(Replica.Type.PULL)) {
 +              ri = a;
 +              break;
 +            }
 +          }
 +          if (ri == null) {
 +            LOG.warn("-- can't find any suitable replica type for " + dc.getName() + " / " + s.getName());
 +            return;
 +          }
 +          synchronized (ri) {
 +            ri.getVariables().put(ZkStateReader.LEADER_PROP, "true");
 +          }
 +          stateChanged.set(true);
 +          LOG.info("-- elected new leader for " + dc.getName() + " / " + s.getName() + ": " + ri);
 +        } else {
 +          LOG.trace("-- already has leader for {} / {}", dc.getName(), s.getName());
 +        }
 +      });
 +    });
 +    if (saveClusterState || stateChanged.get()) {
 +      saveClusterState();
 +    }
 +  }
 +
 +  /**
 +   * Create a new collection. This operation uses policy framework for node and replica assignments.
 +   * @param props collection details
 +   * @param results results of the operation.
 +   */
 +  public void simCreateCollection(ZkNodeProps props, NamedList results) throws Exception {
 +    if (props.getStr(CommonAdminParams.ASYNC) != null) {
 +      results.add(CoreAdminParams.REQUESTID, props.getStr(CommonAdminParams.ASYNC));
 +    }
 +    List<String> nodeList = new ArrayList<>();
 +    List<String> shardNames = new ArrayList<>();
 +    final String collectionName = props.getStr(NAME);
 +    ClusterState clusterState = getClusterState();
 +    ZkWriteCommand cmd = new ClusterStateMutator(cloudManager).createCollection(clusterState, props);
 +    if (cmd.noop) {
 +      LOG.warn("Collection {} already exists. exit", collectionName);
 +      results.add("success", "no-op");
 +      return;
 +    }
 +    opDelays.computeIfAbsent(collectionName, c -> new HashMap<>()).putAll(defaultOpDelays);
 +
 +    opDelay(collectionName, CollectionParams.CollectionAction.CREATE.name());
 +
++    AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
 +    List<ReplicaPosition> replicaPositions = CreateCollectionCmd.buildReplicaPositions(cloudManager, getClusterState(), props,
-         nodeList, shardNames);
++        nodeList, shardNames, sessionWrapper);
++    if (sessionWrapper.get() != null) {
++      sessionWrapper.get().release();
++    }
 +    AtomicInteger replicaNum = new AtomicInteger(1);
 +    replicaPositions.forEach(pos -> {
 +      Map<String, Object> replicaProps = new HashMap<>();
 +      //replicaProps.put(ZkStateReader.SHARD_ID_PROP, pos.shard);
 +      replicaProps.put(ZkStateReader.NODE_NAME_PROP, pos.node);
 +      replicaProps.put(ZkStateReader.REPLICA_TYPE, pos.type.toString());
 +      String coreName = String.format(Locale.ROOT, "%s_%s_replica_%s%s", collectionName, pos.shard, pos.type.name().substring(0,1).toLowerCase(Locale.ROOT),
 +          replicaNum.getAndIncrement());
 +      try {
 +        replicaProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
 +        ReplicaInfo ri = new ReplicaInfo("core_node" + Assign.incAndGetId(stateManager, collectionName, 0),
 +            coreName, collectionName, pos.shard, pos.type, pos.node, replicaProps);
 +        cloudManager.submit(() -> {simAddReplica(pos.node, ri, false); return true;});
 +      } catch (Exception e) {
 +        throw new RuntimeException(e);
 +      }
 +    });
 +    collectionsStatesRef.set(null);
 +    // add collection props
 +    DocCollection coll = cmd.collection;
 +    collProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>()).putAll(coll.getProperties());
 +    // add slice props
 +    coll.getSlices().forEach(s -> {
 +      Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(coll.getName(), c -> new ConcurrentHashMap<>())
 +          .computeIfAbsent(s.getName(), slice -> new ConcurrentHashMap<>());
 +      s.getProperties().forEach((k, v) -> {
 +        if (k != null && v != null) {
 +          sliceProps.put(k, v);
 +        }
 +      });
 +    });
 +    cloudManager.submit(new LeaderElection(Collections.singleton(collectionName), true));
 +    results.add("success", "");
 +  }
 +
 +  /**
 +   * Delete a collection
 +   * @param collection collection name
 +   * @param async async id
 +   * @param results results of the operation
 +   */
 +  public void simDeleteCollection(String collection, String async, NamedList results) throws IOException {
 +    if (async != null) {
 +      results.add(CoreAdminParams.REQUESTID, async);
 +    }
 +    lock.lock();
 +    try {
 +      collProperties.remove(collection);
 +      sliceProperties.remove(collection);
 +
 +      opDelay(collection, CollectionParams.CollectionAction.DELETE.name());
 +
 +      opDelays.remove(collection);
 +      nodeReplicaMap.forEach((n, replicas) -> {
 +        for (Iterator<ReplicaInfo> it = replicas.iterator(); it.hasNext(); ) {
 +          ReplicaInfo ri = it.next();
 +          if (ri.getCollection().equals(collection)) {
 +            it.remove();
 +            // update the number of cores in node values
 +            Integer cores = (Integer) cloudManager.getSimNodeStateProvider().simGetNodeValue(n, "cores");
 +            if (cores != null) { // node is still up
 +              if (cores == 0) {
 +                throw new RuntimeException("Unexpected value of 'cores' (" + cores + ") on node: " + n);
 +              }
 +              cloudManager.getSimNodeStateProvider().simSetNodeValue(n, "cores", cores - 1);
 +            }
 +          }
 +        }
 +      });
 +      saveClusterState();
 +      results.add("success", "");
 +    } catch (Exception e) {
 +      LOG.warn("Exception", e);
 +    } finally {
 +      lock.unlock();
 +    }
 +  }
 +
 +  /**
 +   * Remove all collections.
 +   */
 +  public void simDeleteAllCollections() throws Exception {
 +    lock.lock();
 +    try {
 +      nodeReplicaMap.clear();
 +      collProperties.clear();
 +      sliceProperties.clear();
 +      cloudManager.getSimNodeStateProvider().simGetAllNodeValues().forEach((n, values) -> {
 +        values.put("cores", 0);
 +      });
 +      saveClusterState();
 +    } finally {
 +      lock.unlock();
 +    }
 +  }
 +
 +  /**
 +   * Move replica. This uses a similar algorithm as {@link org.apache.solr.cloud.MoveReplicaCmd#moveNormalReplica(ClusterState, NamedList, String, String, DocCollection, Replica, Slice, int, boolean)}.
 +   * @param message operation details
 +   * @param results operation results.
 +   */
 +  public void simMoveReplica(ZkNodeProps message, NamedList results) throws Exception {
 +    if (message.getStr(CommonAdminParams.ASYNC) != null) {
 +      results.add(CoreAdminParams.REQUESTID, message.getStr(CommonAdminParams.ASYNC));
 +    }
 +    String collection = message.getStr(COLLECTION_PROP);
 +    String targetNode = message.getStr("targetNode");
 +    ClusterState clusterState = getClusterState();
 +    DocCollection coll = clusterState.getCollection(collection);
 +    if (coll == null) {
 +      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection: " + collection + " does not exist");
 +    }
 +    String replicaName = message.getStr(REPLICA_PROP);
 +    Replica replica = coll.getReplica(replicaName);
 +    if (replica == null) {
 +      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
 +          "Collection: " + collection + " replica: " + replicaName + " does not exist");
 +    }
 +    Slice slice = null;
 +    for (Slice s : coll.getSlices()) {
 +      if (s.getReplicas().contains(replica)) {
 +        slice = s;
 +      }
 +    }
 +    if (slice == null) {
 +      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Replica has no 'slice' property! : " + replica);
 +    }
 +
 +    opDelay(collection, CollectionParams.CollectionAction.MOVEREPLICA.name());
 +
 +    // TODO: for now simulate moveNormalReplica sequence, where we first add new replica and then delete the old one
 +
 +    String newSolrCoreName = Assign.buildSolrCoreName(stateManager, coll, slice.getName(), replica.getType());
 +    String coreNodeName = Assign.assignCoreNodeName(stateManager, coll);
 +    ReplicaInfo newReplica = new ReplicaInfo(coreNodeName, newSolrCoreName, collection, slice.getName(), replica.getType(), targetNode, null);
 +    LOG.debug("-- new replica: " + newReplica);
 +    // xxx should run leader election here already?
 +    simAddReplica(targetNode, newReplica, false);
 +    // this will trigger leader election
 +    simRemoveReplica(replica.getNodeName(), replica.getName());
 +    results.add("success", "");
 +  }
 +
 +  /**
 +   * Create a new shard. This uses a similar algorithm as {@link CreateShardCmd}.
 +   * @param message operation details
 +   * @param results operation results
 +   */
 +  public void simCreateShard(ZkNodeProps message, NamedList results) throws Exception {
 +    if (message.getStr(CommonAdminParams.ASYNC) != null) {
 +      results.add(CoreAdminParams.REQUESTID, message.getStr(CommonAdminParams.ASYNC));
 +    }
 +    String collectionName = message.getStr(COLLECTION_PROP);
 +    String sliceName = message.getStr(SHARD_ID_PROP);
 +    ClusterState clusterState = getClusterState();
 +    lock.lock();
 +    try {
 +      ZkWriteCommand cmd = new CollectionMutator(cloudManager).createShard(clusterState, message);
 +      if (cmd.noop) {
 +        results.add("success", "no-op");
 +        return;
 +      }
 +
 +      opDelay(collectionName, CollectionParams.CollectionAction.CREATESHARD.name());
 +
 +      // copy shard properties -- our equivalent of creating an empty shard in cluster state
 +      DocCollection collection = cmd.collection;
 +      Slice slice = collection.getSlice(sliceName);
 +      Map<String, Object> props = sliceProperties.computeIfAbsent(collection.getName(), c -> new ConcurrentHashMap<>())
 +          .computeIfAbsent(sliceName, s -> new ConcurrentHashMap<>());
 +      props.clear();
 +      slice.getProperties().entrySet().stream()
 +          .filter(e -> !e.getKey().equals("range"))
 +          .filter(e -> !e.getKey().equals("replicas"))
 +          .forEach(e -> props.put(e.getKey(), e.getValue()));
 +      // 2. create new replicas
-       List<ReplicaPosition> positions = CreateShardCmd.buildReplicaPositions(cloudManager, clusterState, collectionName, message);
++      AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
++      List<ReplicaPosition> positions = CreateShardCmd.buildReplicaPositions(cloudManager, clusterState, collectionName,
++          message, sessionWrapper);
++      if (sessionWrapper.get() != null) {
++        sessionWrapper.get().release();
++      }
 +      AtomicInteger replicaNum = new AtomicInteger(1);
 +      positions.forEach(pos -> {
 +        Map<String, Object> replicaProps = new HashMap<>();
 +        replicaProps.put(ZkStateReader.SHARD_ID_PROP, pos.shard);
 +        replicaProps.put(ZkStateReader.NODE_NAME_PROP, pos.node);
 +        replicaProps.put(ZkStateReader.REPLICA_TYPE, pos.type.toString());
 +        replicaProps.put(ZkStateReader.BASE_URL_PROP, Utils.getBaseUrlForNodeName(pos.node, "http"));
 +        String coreName = String.format(Locale.ROOT, "%s_%s_replica_%s%s", collectionName, pos.shard, pos.type.name().substring(0,1).toLowerCase(Locale.ROOT),
 +            replicaNum.getAndIncrement());
 +        try {
 +          replicaProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
 +          ReplicaInfo ri = new ReplicaInfo("core_node" + Assign.incAndGetId(stateManager, collectionName, 0),
 +              coreName, collectionName, pos.shard, pos.type, pos.node, replicaProps);
 +          simAddReplica(pos.node, ri, false);
 +        } catch (Exception e) {
 +          throw new RuntimeException(e);
 +        }
 +      });
 +      Map<String, Object> colProps = collProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>());
 +
 +      cloudManager.submit(new LeaderElection(Collections.singleton(collectionName), true));
 +      results.add("success", "");
 +    } finally {
 +      lock.unlock();
 +    }
 +  }
 +
 +  /**
 +   * Split a shard. This uses a similar algorithm as {@link SplitShardCmd}, including simulating its
 +   * quirks, and leaving the original parent slice in place.
 +   * @param message operation details
 +   * @param results operation results.
 +   */
 +  public void simSplitShard(ZkNodeProps message, NamedList results) throws Exception {
 +    String collectionName = message.getStr(COLLECTION_PROP);
 +    AtomicReference<String> sliceName = new AtomicReference<>();
 +    sliceName.set(message.getStr(SHARD_ID_PROP));
 +    String splitKey = message.getStr("split.key");
 +    ClusterState clusterState = getClusterState();
 +    DocCollection collection = clusterState.getCollection(collectionName);
 +    Slice parentSlice = SplitShardCmd.getParentSlice(clusterState, collectionName, sliceName, splitKey);
 +    List<DocRouter.Range> subRanges = new ArrayList<>();
 +    List<String> subSlices = new ArrayList<>();
 +    List<String> subShardNames = new ArrayList<>();
 +
 +    opDelay(collectionName, CollectionParams.CollectionAction.SPLITSHARD.name());
 +
 +    SplitShardCmd.fillRanges(cloudManager, message, collection, parentSlice, subRanges, subSlices, subShardNames);
 +    // mark the old slice as inactive
 +    sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
 +        .computeIfAbsent(sliceName.get(), s -> new ConcurrentHashMap<>())
 +        .put(ZkStateReader.SHARD_STATE_PROP, Slice.State.INACTIVE.toString());
 +    // add slice props
 +    for (int i = 0; i < subRanges.size(); i++) {
 +      String subSlice = subSlices.get(i);
 +      DocRouter.Range range = subRanges.get(i);
 +      Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
 +          .computeIfAbsent(subSlice, ss -> new ConcurrentHashMap<>());
 +      sliceProps.put(Slice.RANGE, range);
 +      sliceProps.put(Slice.PARENT, sliceName.get());
 +      sliceProps.put(ZkStateReader.SHARD_STATE_PROP, Slice.State.ACTIVE.toString());
 +    }
 +    // add replicas for new subShards
 +    int repFactor = parentSlice.getReplicas().size();
 +    List<ReplicaPosition> replicaPositions = Assign.identifyNodes(cloudManager,
 +        clusterState,
 +        new ArrayList<>(clusterState.getLiveNodes()),
 +        collectionName,
 +        new ZkNodeProps(collection.getProperties()),
 +        // reproduce the bug
 +        subSlices, repFactor, 0, 0);
++    PolicyHelper.SessionWrapper sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
++    if (sessionWrapper != null) sessionWrapper.release();
 +
 +    for (ReplicaPosition replicaPosition : replicaPositions) {
 +      String subSliceName = replicaPosition.shard;
 +      String subShardNodeName = replicaPosition.node;
 +      String solrCoreName = collectionName + "_" + subSliceName + "_replica" + (replicaPosition.index);
 +      Map<String, Object> replicaProps = new HashMap<>();
 +      replicaProps.put(ZkStateReader.SHARD_ID_PROP, replicaPosition.shard);
 +      replicaProps.put(ZkStateReader.NODE_NAME_PROP, replicaPosition.node);
 +      replicaProps.put(ZkStateReader.REPLICA_TYPE, replicaPosition.type.toString());
 +      replicaProps.put(ZkStateReader.BASE_URL_PROP, Utils.getBaseUrlForNodeName(subShardNodeName, "http"));
 +
 +      ReplicaInfo ri = new ReplicaInfo("core_node" + Assign.incAndGetId(stateManager, collectionName, 0),
 +          solrCoreName, collectionName, replicaPosition.shard, replicaPosition.type, subShardNodeName, replicaProps);
 +      simAddReplica(replicaPosition.node, ri, false);
 +    }
 +    cloudManager.submit(new LeaderElection(Collections.singleton(collectionName), true));
 +    results.add("success", "");
 +
 +  }
 +
 +  /**
 +   * Delete a shard. This uses a similar algorithm as {@link org.apache.solr.cloud.DeleteShardCmd}
 +   * @param message operation details
 +   * @param results operation results
 +   */
 +  public void simDeleteShard(ZkNodeProps message, NamedList results) throws Exception {
 +    String collectionName = message.getStr(COLLECTION_PROP);
 +    String sliceName = message.getStr(SHARD_ID_PROP);
 +    ClusterState clusterState = getClusterState();
 +    DocCollection collection = clusterState.getCollection(collectionName);
 +    if (collection == null) {
 +      throw new Exception("Collection " + collectionName + " doesn't exist");
 +    }
 +    Slice slice = collection.getSlice(sliceName);
 +    if (slice == null) {
 +      throw new Exception(" Collection " + collectionName + " slice " + sliceName + " doesn't exist.");
 +    }
 +
 +    opDelay(collectionName, CollectionParams.CollectionAction.DELETESHARD.name());
 +
 +    lock.lock();
 +    try {
 +      sliceProperties.computeIfAbsent(collectionName, coll -> new ConcurrentHashMap<>()).remove(sliceName);
 +      nodeReplicaMap.forEach((n, replicas) -> {
 +        Iterator<ReplicaInfo> it = replicas.iterator();
 +        while (it.hasNext()) {
 +          ReplicaInfo ri = it.next();
 +          if (ri.getCollection().equals(collectionName) && ri.getShard().equals(sliceName)) {
 +            it.remove();
 +          }
 +        }
 +      });
 +      results.add("success", "");
 +    } catch (Exception e) {
 +      results.add("failure", e.toString());
 +    } finally {
 +      lock.unlock();
 +    }
 +  }
 +
 +  /**
 +   * Saves cluster properties to clusterprops.json.
 +   * @return current properties
 +   */
 +  private synchronized Map<String, Object> saveClusterProperties() throws Exception {
 +    if (lastSavedProperties != null && lastSavedProperties.equals(clusterProperties)) {
 +      return lastSavedProperties;
 +    }
 +    byte[] data = Utils.toJSON(clusterProperties);
 +    VersionedData oldData = stateManager.getData(ZkStateReader.CLUSTER_PROPS);
 +    int version = oldData != null ? oldData.getVersion() : -1;
 +    stateManager.setData(ZkStateReader.CLUSTER_PROPS, data, version);
 +    lastSavedProperties = (Map)Utils.fromJSON(data);
 +    return lastSavedProperties;
 +  }
 +
 +  /**
 +   * Set all cluster properties. This also updates the clusterprops.json data in
 +   * {@link DistribStateManager}
 +   * @param properties properties to set
 +   */
 +  public void simSetClusterProperties(Map<String, Object> properties) throws Exception {
 +    lock.lock();
 +    try {
 +      clusterProperties.clear();
 +      if (properties != null) {
 +        this.clusterProperties.putAll(properties);
 +      }
 +      saveClusterProperties();
 +    } finally {
 +      lock.unlock();
 +    }
 +  }
 +
 +  /**
 +   * Set a cluster property. This also updates the clusterprops.json data in
 +   * {@link DistribStateManager}
 +   * @param key property name
 +   * @param value property value
 +   */
 +  public void simSetClusterProperty(String key, Object value) throws Exception {
 +    lock.lock();
 +    try {
 +      if (value != null) {
 +        clusterProperties.put(key, value);
 +      } else {
 +        clusterProperties.remove(key);
 +      }
 +      saveClusterProperties();
 +    } finally {
 +      lock.unlock();
 +    }
 +  }
 +
 +  /**
 +   * Set collection properties.
 +   * @param coll collection name
 +   * @param properties properties
 +   */
 +  public void simSetCollectionProperties(String coll, Map<String, Object> properties) throws Exception {
 +    if (properties == null) {
 +      collProperties.remove(coll);
 +      saveClusterState();
 +    } else {
 +      lock.lock();
 +      try {
 +        Map<String, Object> props = collProperties.computeIfAbsent(coll, c -> new HashMap<>());
 +        props.clear();
 +        props.putAll(properties);
 +        saveClusterState();
 +      } finally {
 +        lock.unlock();
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Set collection property.
 +   * @param coll collection name
 +   * @param key property name
 +   * @param value property value
 +   */
 +  public void simSetCollectionProperty(String coll, String key, String value) throws Exception {
 +    Map<String, Object> props = collProperties.computeIfAbsent(coll, c -> new HashMap<>());
 +    if (value == null) {
 +      props.remove(key);
 +    } else {
 +      props.put(key, value);
 +    }
 +    saveClusterState();
 +  }
 +
 +  /**
 +   * Set slice properties.
 +   * @param coll collection name
 +   * @param slice slice name
 +   * @param properties slice properties
 +   */
 +  public void simSetSliceProperties(String coll, String slice, Map<String, Object> properties) throws Exception {
 +    Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(coll, c -> new HashMap<>()).computeIfAbsent(slice, s -> new HashMap<>());
 +    lock.lock();
 +    try {
 +      sliceProps.clear();
 +      if (properties != null) {
 +        sliceProps.putAll(properties);
 +      }
 +      saveClusterState();
 +    } finally {
 +      lock.unlock();
 +    }
 +  }
 +
 +  /**
 +   * Set per-collection value (eg. a metric). This value will be applied to each replica.
 +   * @param collection collection name
 +   * @param key property name
 +   * @param value property value
 +   */
 +  public void simSetCollectionValue(String collection, String key, Object value) throws Exception {
 +    simSetCollectionValue(collection, key, value, false);
 +  }
 +
 +  /**
 +   * Set per-collection value (eg. a metric). This value will be applied to each replica.
 +   * @param collection collection name
 +   * @param key property name
 +   * @param value property value
 +   * @param divide if the value is a {@link Number} and this is true, then the value will be evenly
 +   *               divided by the number of replicas.
 +   */
 +  public void simSetCollectionValue(String collection, String key, Object value, boolean divide) throws Exception {
 +    simSetShardValue(collection, null, key, value, divide);
 +  }
 +
 +  /**
 +   * Set per-collection value (eg. a metric). This value will be applied to each replica in a selected shard.
 +   * @param collection collection name
 +   * @param shard shard name. If null then all shards will be affected.
 +   * @param key property name
 +   * @param value property value
 +   */
 +  public void simSetShardValue(String collection, String shard, String key, Object value) throws Exception {
 +    simSetShardValue(collection, shard, key, value, false);
 +  }
 +
 +  /**
 +   * Set per-collection value (eg. a metric). This value will be applied to each replica in a selected shard.
 +   * @param collection collection name
 +   * @param shard shard name. If null then all shards will be affected.
 +   * @param key property name
 +   * @param value property value
 +   * @param divide if the value is a {@link Number} and this is true, then the value will be evenly
 +   *               divided by the number of replicas.
 +   */
 +  public void simSetShardValue(String collection, String shard, String key, Object value, boolean divide) throws Exception {
 +    List<ReplicaInfo> infos = new ArrayList<>();
 +    nodeReplicaMap.forEach((n, replicas) -> {
 +      replicas.forEach(r -> {
 +        if (r.getCollection().equals(collection)) {
 +          if (shard != null && !shard.equals(r.getShard())) {
 +            return;
 +          }
 +          infos.add(r);
 +        }
 +      });
 +    });
 +    if (infos.isEmpty()) {
 +      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection " + collection + " doesn't exist.");
 +    }
 +    if (divide && value != null && (value instanceof Number)) {
 +      value = ((Number)value).doubleValue() / infos.size();
 +    }
 +    for (ReplicaInfo r : infos) {
 +      synchronized (r) {
 +        if (value == null) {
 +          r.getVariables().remove(key);
 +        } else {
 +          r.getVariables().put(key, value);
 +        }
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Return all replica infos for a node.
 +   * @param node node id
 +   * @return list of replicas on that node
 +   */
 +  public List<ReplicaInfo> simGetReplicaInfos(String node) {
 +    return nodeReplicaMap.get(node);
 +  }
 +
 +  /**
 +   * List collections.
 +   * @return list of existing collections.
 +   */
 +  public List<String> simListCollections() {
 +    final Set<String> collections = new HashSet<>();
 +    lock.lock();
 +    try {
 +      nodeReplicaMap.forEach((n, replicas) -> {
 +        replicas.forEach(ri -> collections.add(ri.getCollection()));
 +      });
 +      return new ArrayList<>(collections);
 +    } finally {
 +      lock.unlock();
 +    }
 +  }
 +
 +  // interface methods
 +
 +  @Override
 +  public ClusterState.CollectionRef getState(String collection) {
 +    try {
 +      return getClusterState().getCollectionRef(collection);
 +    } catch (IOException e) {
 +      return null;
 +    }
 +  }
 +
 +  @Override
 +  public Set<String> getLiveNodes() {
 +    return liveNodes;
 +  }
 +
 +  @Override
 +  public List<String> resolveAlias(String alias) {
 +    throw new UnsupportedOperationException("resolveAlias not implemented");
 +  }
 +
 +  @Override
 +  public ClusterState getClusterState() throws IOException {
 +    return new ClusterState(0, liveNodes, getCollectionStates());
 +  }
 +
 +  private Map<String, DocCollection> getCollectionStates() {
 +    Map<String, DocCollection> collectionStates = collectionsStatesRef.get();
 +    if (collectionStates != null) {
 +      return collectionStates;
 +    }
 +    lock.lock();
 +    try {
 +      Map<String, Map<String, Map<String, Replica>>> collMap = new HashMap<>();
 +      nodeReplicaMap.forEach((n, replicas) -> {
 +        replicas.forEach(ri -> {
 +          Map<String, Object> props;
 +          synchronized (ri) {
 +            props = new HashMap<>(ri.getVariables());
 +          }
 +          props.put(ZkStateReader.NODE_NAME_PROP, n);
 +          //props.put(ZkStateReader.SHARD_ID_PROP, ri.getShard());
 +          props.put(ZkStateReader.CORE_NAME_PROP, ri.getCore());
 +          props.put(ZkStateReader.REPLICA_TYPE, ri.getType().toString());
 +          props.put(ZkStateReader.STATE_PROP, ri.getState().toString());
 +          Replica r = new Replica(ri.getName(), props);
 +          collMap.computeIfAbsent(ri.getCollection(), c -> new HashMap<>())
 +              .computeIfAbsent(ri.getShard(), s -> new HashMap<>())
 +              .put(ri.getName(), r);
 +        });
 +      });
 +
 +      // add empty slices
 +      sliceProperties.forEach((c, perSliceProps) -> {
 +        perSliceProps.forEach((slice, props) -> {
 +          collMap.computeIfAbsent(c, co -> new ConcurrentHashMap<>()).computeIfAbsent(slice, s -> new ConcurrentHashMap<>());
 +        });
 +      });
 +      // add empty collections
 +      collProperties.keySet().forEach(c -> {
 +        collMap.computeIfAbsent(c, co -> new ConcurrentHashMap<>());
 +      });
 +
 +      Map<String, DocCollection> res = new HashMap<>();
 +      collMap.forEach((coll, shards) -> {
 +        Map<String, Slice> slices = new HashMap<>();
 +        shards.forEach((s, replicas) -> {
 +          Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(coll, c -> new ConcurrentHashMap<>()).computeIfAbsent(s, sl -> new ConcurrentHashMap<>());
 +          Slice slice = new Slice(s, replicas, sliceProps);
 +          slices.put(s, slice);
 +        });
 +        Map<String, Object> collProps = collProperties.computeIfAbsent(coll, c -> new ConcurrentHashMap<>());
 +        DocCollection dc = new DocCollection(coll, slices, collProps, DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE);
 +        res.put(coll, dc);
 +      });
 +      collectionsStatesRef.set(res);
 +      return res;
 +    } finally {
 +      lock.unlock();
 +    }
 +  }
 +
 +  @Override
 +  public Map<String, Object> getClusterProperties() {
 +    return clusterProperties;
 +  }
 +
 +  @Override
 +  public String getPolicyNameByCollection(String coll) {
 +    Map<String, Object> props = collProperties.computeIfAbsent(coll, c -> new HashMap<>());
 +    return (String)props.get("policy");
 +  }
 +
 +  @Override
 +  public void connect() {
 +
 +  }
 +
 +  @Override
 +  public void close() throws IOException {
 +
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/5ec9b64b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java
----------------------------------------------------------------------
diff --cc solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java
index 92e3a95,0000000..f1f0d01
mode 100644,000000..100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java
@@@ -1,247 -1,0 +1,248 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.solr.cloud.autoscaling.sim;
 +
 +import java.lang.invoke.MethodHandles;
 +import java.nio.charset.StandardCharsets;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +import org.apache.solr.client.solrj.SolrClient;
 +import org.apache.solr.client.solrj.SolrRequest;
 +import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
 +import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
 +import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
 +import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 +import org.apache.solr.cloud.autoscaling.ActionContext;
 +import org.apache.solr.cloud.autoscaling.ComputePlanAction;
 +import org.apache.solr.cloud.autoscaling.ExecutePlanAction;
 +import org.apache.solr.cloud.autoscaling.TriggerActionBase;
 +import org.apache.solr.cloud.autoscaling.TriggerEvent;
 +import org.apache.solr.cloud.autoscaling.TriggerListenerBase;
 +import org.apache.solr.cloud.autoscaling.CapturedEvent;
 +import org.apache.solr.common.cloud.ZkStateReader;
 +import org.apache.solr.common.util.NamedList;
 +import org.apache.solr.common.util.TimeSource;
 +import org.apache.solr.util.LogLevel;
 +import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
 +
 +/**
 + *
 + */
 +@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG")
 +public class TestLargeCluster extends SimSolrCloudTestCase {
 +  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 +
 +  public static final int SPEED = 50;
 +
 +  public static final int NUM_NODES = 100;
 +
 +  static Map<String, List<CapturedEvent>> listenerEvents = new ConcurrentHashMap<>();
 +  static AtomicInteger triggerFiredCount = new AtomicInteger();
 +  static CountDownLatch triggerFiredLatch;
 +  static int waitForSeconds;
 +
 +  @BeforeClass
 +  public static void setupCluster() throws Exception {
 +    cluster = SimCloudManager.createCluster(NUM_NODES, TimeSource.get("simTime:" + SPEED));
 +  }
 +
 +  @Before
 +  public void setupTest() throws Exception {
 +    // clear any persisted auto scaling configuration
 +    cluster.getDistribStateManager().setData(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH,
 +        "{}".getBytes(StandardCharsets.UTF_8), -1);
 +    cluster.simClearSystemCollection();
 +    cluster.getSimClusterStateProvider().simDeleteAllCollections();
 +
 +    waitForSeconds = 1 + random().nextInt(3);
 +    triggerFiredCount.set(0);
 +    triggerFiredLatch = new CountDownLatch(1);
 +    listenerEvents.clear();
 +    // clear any events or markers
 +    removeChildren(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH);
 +    removeChildren(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH);
 +    removeChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
 +    removeChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
 +    while (cluster.getClusterStateProvider().getLiveNodes().size() < NUM_NODES) {
 +      // perhaps a test stopped a node but didn't start it back
 +      // lets start a node
 +      cluster.simAddNode();
 +    }
 +  }
 +
 +  public static class TestTriggerListener extends TriggerListenerBase {
 +    @Override
 +    public void init(SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) {
 +      super.init(cloudManager, config);
 +    }
 +
 +    @Override
 +    public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
 +                                     ActionContext context, Throwable error, String message) {
 +      List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
 +      lst.add(new CapturedEvent(cluster.getTimeSource().getTime(), context, config, stage, actionName, event, message));
 +    }
 +  }
 +
 +  public static class TestTriggerAction extends TriggerActionBase {
 +    @Override
 +    public void process(TriggerEvent event, ActionContext context) throws Exception {
 +      triggerFiredCount.incrementAndGet();
 +      triggerFiredLatch.countDown();
 +    }
 +  }
 +
 +  @Test
 +  public void testBasic() throws Exception {
 +    SolrClient solrClient = cluster.simGetSolrClient();
 +    String setTriggerCommand = "{" +
 +        "'set-trigger' : {" +
 +        "'name' : 'node_lost_trigger'," +
 +        "'event' : 'nodeLost'," +
 +        "'waitFor' : '" + waitForSeconds + "s'," +
 +        "'enabled' : true," +
 +        "'actions' : [" +
 +        "{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
 +        "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
 +        "{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}" +
 +        "]" +
 +        "}}";
 +    SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
 +    NamedList<Object> response = solrClient.request(req);
 +    assertEquals(response.get("result").toString(), "success");
 +
 +    String setListenerCommand = "{" +
 +        "'set-listener' : " +
 +        "{" +
 +        "'name' : 'foo'," +
 +        "'trigger' : 'node_lost_trigger'," +
 +        "'stage' : ['STARTED','ABORTED','SUCCEEDED', 'FAILED']," +
 +        "'beforeAction' : ['compute', 'execute']," +
 +        "'afterAction' : ['compute', 'execute']," +
 +        "'class' : '" + TestTriggerListener.class.getName() + "'" +
 +        "}" +
 +        "}";
 +    req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
 +    response = solrClient.request(req);
 +    assertEquals(response.get("result").toString(), "success");
 +
 +    cluster.getTimeSource().sleep(5000);
 +
 +    // pick a few random nodes
 +    List<String> nodes = new ArrayList<>();
 +    int limit = 30;
 +    for (String node : cluster.getClusterStateProvider().getLiveNodes()) {
 +      nodes.add(node);
 +      if (nodes.size() > limit) {
 +        break;
 +      }
 +    }
 +    Collections.shuffle(nodes, random());
 +    String collectionName = "testBasic";
 +    CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
 +        "conf", 2, 5, 5, 5);
 +    create.setMaxShardsPerNode(1);
 +    create.setCreateNodeSet(String.join(",", nodes));
 +    create.process(solrClient);
 +
 +    log.info("Ready after " + waitForState(collectionName, 30 * nodes.size(), TimeUnit.SECONDS, clusterShape(2, 15)) + "ms");
 +
 +    int KILL_NODES = 8;
 +    // kill off a number of nodes
 +    for (int i = 0; i < KILL_NODES; i++) {
 +      cluster.simRemoveNode(nodes.get(i));
 +    }
 +
 +    log.info("Ready after " + waitForState(collectionName, 90 * KILL_NODES, TimeUnit.SECONDS, clusterShape(2, 15)) + "ms");
 +
 +  }
 +
 +  @Test
++  @AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-11714")
 +  public void testSearchRate() throws Exception {
 +    SolrClient solrClient = cluster.simGetSolrClient();
 +    String setTriggerCommand = "{" +
 +        "'set-trigger' : {" +
 +        "'name' : 'search_rate_trigger'," +
 +        "'event' : 'searchRate'," +
 +        "'waitFor' : '" + waitForSeconds + "s'," +
 +        "'rate' : 1.0," +
 +        "'enabled' : true," +
 +        "'actions' : [" +
 +        "{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
 +        "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
 +        "{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}" +
 +        "]" +
 +        "}}";
 +    SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
 +    NamedList<Object> response = solrClient.request(req);
 +    assertEquals(response.get("result").toString(), "success");
 +    String setListenerCommand1 = "{" +
 +        "'set-listener' : " +
 +        "{" +
 +        "'name' : 'srt'," +
 +        "'trigger' : 'search_rate_trigger'," +
 +        "'stage' : ['FAILED','SUCCEEDED']," +
 +        "'class' : '" + TestTriggerListener.class.getName() + "'" +
 +        "}" +
 +        "}";
 +    req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand1);
 +    response = solrClient.request(req);
 +    assertEquals(response.get("result").toString(), "success");
 +
 +    String collectionName = "testSearchRate";
 +    CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
 +        "conf", 2, 10);
 +    create.process(solrClient);
 +
 +    log.info("Ready after " + waitForState(collectionName, 300, TimeUnit.SECONDS, clusterShape(2, 10)) + " ms");
 +
 +    // collect the node names
 +    Set<String> nodes = new HashSet<>();
 +    cluster.getSimClusterStateProvider().getClusterState().getCollection(collectionName)
 +        .getReplicas()
 +        .forEach(r -> nodes.add(r.getNodeName()));
 +
 +    String metricName = "QUERY./select.requestTimes:1minRate";
 +    // simulate search traffic
 +    cluster.getSimClusterStateProvider().simSetShardValue(collectionName, "shard1", metricName, 40, true);
 +
 +    Thread.sleep(1000000000);
 +//    boolean await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
 +//    assertTrue("The trigger did not fire at all", await);
 +    // wait for listener to capture the SUCCEEDED stage
 +    cluster.getTimeSource().sleep(2000);
 +    assertEquals(listenerEvents.toString(), 1, listenerEvents.get("srt").size());
 +    CapturedEvent ev = listenerEvents.get("srt").get(0);
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/5ec9b64b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingCloudManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/5ec9b64b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingDistribStateManager.java
----------------------------------------------------------------------
diff --cc solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingDistribStateManager.java
index 0000000,b47d1c8..2fea23b
mode 000000,100644..100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingDistribStateManager.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingDistribStateManager.java
@@@ -1,0 -1,91 +1,106 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.solr.client.solrj.cloud.autoscaling;
+ 
+ import java.io.IOException;
+ import java.util.List;
+ import java.util.NoSuchElementException;
+ 
+ import org.apache.zookeeper.CreateMode;
+ import org.apache.zookeeper.KeeperException;
+ import org.apache.zookeeper.Op;
+ import org.apache.zookeeper.OpResult;
+ import org.apache.zookeeper.Watcher;
+ 
+ public class DelegatingDistribStateManager implements DistribStateManager {
+   private final DistribStateManager delegate;
+ 
+   public DelegatingDistribStateManager(DistribStateManager delegate) {
+     this.delegate = delegate;
+   }
+ 
+   @Override
+   public boolean hasData(String path) throws IOException, KeeperException, InterruptedException {
+     return delegate.hasData(path);
+   }
+ 
+   @Override
+   public List<String> listData(String path) throws NoSuchElementException, IOException, KeeperException, InterruptedException {
+     return delegate.listData(path);
+   }
+ 
+   @Override
++  public List<String> listData(String path, Watcher watcher) throws NoSuchElementException, IOException, KeeperException, InterruptedException {
++    return delegate.listData(path, watcher);
++  }
++
++  @Override
+   public VersionedData getData(String path, Watcher watcher) throws NoSuchElementException, IOException, KeeperException, InterruptedException {
+     return delegate.getData(path, watcher);
+   }
+ 
+   @Override
+   public VersionedData getData(String path) throws NoSuchElementException, IOException, KeeperException, InterruptedException {
+     return delegate.getData(path);
+   }
+ 
+   @Override
+   public void makePath(String path) throws AlreadyExistsException, IOException, KeeperException, InterruptedException {
+     delegate.makePath(path);
+   }
+ 
+   @Override
++  public void makePath(String path, byte[] data, CreateMode createMode, boolean failOnExists) throws AlreadyExistsException, IOException, KeeperException, InterruptedException {
++    delegate.makePath(path, data, createMode, failOnExists);
++  }
++
++  @Override
+   public String createData(String path, byte[] data, CreateMode mode) throws AlreadyExistsException, IOException, KeeperException, InterruptedException {
+     return delegate.createData(path, data, mode);
+   }
+ 
+   @Override
 -  public void removeData(String path, int version) throws NoSuchElementException, IOException, KeeperException, InterruptedException {
++  public void removeData(String path, int version) throws NoSuchElementException, IOException, BadVersionException, KeeperException, InterruptedException {
+     delegate.removeData(path, version);
+   }
+ 
+   @Override
+   public void setData(String path, byte[] data, int version) throws BadVersionException, NoSuchElementException, IOException, KeeperException, InterruptedException {
+     delegate.setData(path, data, version);
+   }
+ 
+   @Override
+   public List<OpResult> multi(Iterable<Op> ops) throws BadVersionException, NoSuchElementException, AlreadyExistsException, IOException, KeeperException, InterruptedException {
+     return delegate.multi(ops);
+   }
+ 
+   @Override
+   public AutoScalingConfig getAutoScalingConfig(Watcher watcher) throws InterruptedException, IOException {
+     return delegate.getAutoScalingConfig(watcher);
+   }
+ 
+   @Override
+   public AutoScalingConfig getAutoScalingConfig() throws InterruptedException, IOException {
+     return delegate.getAutoScalingConfig();
+   }
++
++  @Override
++  public void close() throws IOException {
++    delegate.close();
++  }
+ }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/5ec9b64b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
----------------------------------------------------------------------
diff --cc solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
index 5c31cd5,024c6c3..40ca619
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
@@@ -34,10 -36,15 +35,16 @@@ import org.apache.solr.common.SolrExcep
  import org.apache.solr.common.cloud.Replica;
  import org.apache.solr.common.cloud.ReplicaPosition;
  import org.apache.solr.common.util.Pair;
++import org.apache.solr.common.util.TimeSource;
  import org.apache.solr.common.util.Utils;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
  
+ import static java.util.concurrent.TimeUnit.MILLISECONDS;
  import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
  import static org.apache.solr.common.params.CoreAdminParams.NODE;
+ import static org.apache.solr.common.util.Utils.time;
+ import static org.apache.solr.common.util.Utils.timeElapsed;
  
  public class PolicyHelper {
    private static ThreadLocal<Map<String, String>> policyMapping = new ThreadLocal<>();
@@@ -107,9 -137,8 +137,8 @@@
  
  
    public static final int SESSION_EXPIRY = 180;//3 seconds
-   public static ThreadLocal<Long> REF_VERSION = new ThreadLocal<>();
  
 -  public static MapWriter getDiagnostics(Policy policy, SolrClientCloudManager cloudManager) {
 +  public static MapWriter getDiagnostics(Policy policy, SolrCloudManager cloudManager) {
      Policy.Session session = policy.createSession(cloudManager);
      List<Row> sorted = session.getSorted();
      List<Violation> violations = session.getViolations();
@@@ -163,54 -207,100 +207,100 @@@
      public SessionRef() {
      }
  
-     public long getRefVersion(){
-       return myVersion.get();
+ 
+     //only for debugging
+     SessionWrapper getSessionWrapper() {
+       return sessionWrapper;
      }
  
+     /**
+      * All operations suggested by the current session object
+      * is complete. Do not even cache anything
+      *
+      */
+     private void release(SessionWrapper sessionWrapper) {
+       synchronized (lockObj) {
+         if (sessionWrapper.createTime == this.sessionWrapper.createTime && this.sessionWrapper.refCount.get() <= 0) {
+           log.debug("session set to NULL");
+           this.sessionWrapper = SessionWrapper.DEF_INST;
+         } // else somebody created a new session b/c of expiry . So no need to do anything about it
+       }
+     }
  
-     public void decref(long version) {
-       synchronized (SessionRef.class) {
-         if (session == null) return;
-         if(myVersion.get() != version) return;
-         if (refCount.decrementAndGet() <= 0) {
-           session = null;
-           lastUsedTime = 0;
+     /**
+      * Computing is over for this session and it may contain a new session with new state
+      * The session can be used by others while the caller is performing operations
+      *
+      */
+     private void returnSession(SessionWrapper sessionWrapper) {
++      TimeSource timeSource = sessionWrapper.session.cloudManager.getTimeSource();
+       synchronized (lockObj) {
+         sessionWrapper.status = Status.EXECUTING;
 -        log.info("returnSession, curr-time {} sessionWrapper.createTime {}, this.sessionWrapper.createTime {} ", time(MILLISECONDS),
++        log.info("returnSession, curr-time {} sessionWrapper.createTime {}, this.sessionWrapper.createTime {} ", time(timeSource, MILLISECONDS),
+             sessionWrapper.createTime,
+             this.sessionWrapper.createTime);
+         if (sessionWrapper.createTime == this.sessionWrapper.createTime) {
+           //this session was used for computing new operations and this can now be used for other
+           // computing
+           this.sessionWrapper = sessionWrapper;
+ 
+           //one thread who is waiting for this need to be notified.
+           lockObj.notify();
+         } else {
+           log.info("create time NOT SAME {} ", SessionWrapper.DEF_INST.createTime);
+           //else just ignore it
          }
        }
-     }
  
-     public int getRefCount() {
-       return refCount.get();
      }
  
-     public Policy.Session get() {
-       synchronized (SessionRef.class) {
-         if (session == null) return null;
-         if (TimeUnit.SECONDS.convert(System.nanoTime() - lastUsedTime, TimeUnit.NANOSECONDS) > SESSION_EXPIRY) {
-           session = null;
-           return null;
+ 
+     public SessionWrapper get(SolrCloudManager cloudManager) throws IOException, InterruptedException {
++      TimeSource timeSource = cloudManager.getTimeSource();
+       synchronized (lockObj) {
+         if (sessionWrapper.status == Status.NULL ||
 -            TimeUnit.SECONDS.convert(System.nanoTime() - sessionWrapper.lastUpdateTime, TimeUnit.NANOSECONDS) > SESSION_EXPIRY) {
++            TimeUnit.SECONDS.convert(timeSource.getTime() - sessionWrapper.lastUpdateTime, TimeUnit.NANOSECONDS) > SESSION_EXPIRY) {
+           //no session available or the session is expired
+           return createSession(cloudManager);
          } else {
-           REF_VERSION.set(myVersion.get());
-           refCount.incrementAndGet();
-           return session;
 -          long waitStart = time(MILLISECONDS);
++          long waitStart = time(timeSource, MILLISECONDS);
+           //the session is not expired
+           log.debug("reusing a session {}", this.sessionWrapper.createTime);
+           if (this.sessionWrapper.status == Status.UNUSED || this.sessionWrapper.status == Status.EXECUTING) {
+             this.sessionWrapper.status = Status.COMPUTING;
+             return sessionWrapper;
+           } else {
+             //status= COMPUTING it's being used for computing. computing is
 -            log.debug("session being used. waiting... current time {} ", time(MILLISECONDS));
++            log.debug("session being used. waiting... current time {} ", time(timeSource, MILLISECONDS));
+             try {
+               lockObj.wait(10 * 1000);//wait for a max of 10 seconds
+             } catch (InterruptedException e) {
+               log.info("interrupted... ");
+             }
 -            log.debug("out of waiting curr-time:{} time-elapsed {}", time(MILLISECONDS), timeElapsed(waitStart, MILLISECONDS));
++            log.debug("out of waiting curr-time:{} time-elapsed {}", time(timeSource, MILLISECONDS), timeElapsed(timeSource, waitStart, MILLISECONDS));
+             // now this thread has woken up because it got timed out after 10 seconds or it is notified after
+             //the session was returned from another COMPUTING operation
+             if (this.sessionWrapper.status == Status.UNUSED || this.sessionWrapper.status == Status.EXECUTING) {
+               log.debug("Wait over. reusing the existing session ");
+               this.sessionWrapper.status = Status.COMPUTING;
+               return sessionWrapper;
+             } else {
+               //create a new Session
+               return createSession(cloudManager);
+             }
+           }
          }
        }
 -
 -
      }
  
-     public Policy.Session initOrGet(SolrCloudManager cloudManager, Policy policy) {
-       synchronized (SessionRef.class) {
-         Policy.Session session = get();
-         if (session != null) {
-           if (cloudManager.getClusterStateProvider().getLiveNodes().equals(session.nodes)) {
-             return session;
-           }
-         }
-         this.session = policy.createSession(cloudManager);
-         myVersion.incrementAndGet();
-         lastUsedTime = System.nanoTime();
-         REF_VERSION.set(myVersion.get());
-         refCount.set(1);
-         return this.session;
+     private SessionWrapper createSession(SolrCloudManager cloudManager) throws InterruptedException, IOException {
+       synchronized (lockObj) {
+         log.debug("Creating a new session");
+         Policy.Session session = cloudManager.getDistribStateManager().getAutoScalingConfig().getPolicy().createSession(cloudManager);
+         log.debug("New session created ");
+         this.sessionWrapper = new SessionWrapper(session, this);
+         this.sessionWrapper.status = Status.COMPUTING;
+         return sessionWrapper;
        }
      }
  
@@@ -231,7 -333,70 +333,74 @@@
    }
  
  
-   public static ThreadLocal<SessionRef> SESSION_REF = new ThreadLocal<>();
+   static ThreadLocal<SessionWrapper> SESSION_WRAPPPER_REF = new ThreadLocal<>();
+ 
+ 
+   public static class SessionWrapper {
+     public static final SessionWrapper DEF_INST = new SessionWrapper(null, null);
+ 
+     static {
+       DEF_INST.status = Status.NULL;
+       DEF_INST.createTime = -1l;
+       DEF_INST.lastUpdateTime = -1l;
+     }
+ 
+     private long createTime;
+     private long lastUpdateTime;
+     private Policy.Session session;
+     public Status status;
+     private final SessionRef ref;
+     private AtomicInteger refCount = new AtomicInteger();
+ 
+     public long getCreateTime() {
+       return createTime;
+     }
+ 
+     public long getLastUpdateTime() {
+       return lastUpdateTime;
+     }
+ 
+     public SessionWrapper(Policy.Session session, SessionRef ref) {
 -      lastUpdateTime = createTime = System.nanoTime();
++      lastUpdateTime = createTime = session != null ?
++          session.cloudManager.getTimeSource().getTime() :
++          TimeSource.NANO_TIME.getTime();
+       this.session = session;
+       this.status = Status.UNUSED;
+       this.ref = ref;
+     }
+ 
+     public Policy.Session get() {
+       return session;
+     }
+ 
+     public SessionWrapper update(Policy.Session session) {
 -      this.lastUpdateTime = System.nanoTime();
++      this.lastUpdateTime = session != null ?
++          session.cloudManager.getTimeSource().getTime() :
++          TimeSource.NANO_TIME.getTime();
+       this.session = session;
+       return this;
+     }
+ 
+     public int getRefCount() {
+       return refCount.get();
+     }
+ 
+     /**
+      * return this for later use and update the session with the latest state
+      * ensure that this is done after computing the suggestions
+      */
+     public void returnSession(Policy.Session session) {
+       this.update(session);
+       refCount.incrementAndGet();
+       ref.returnSession(this);
  
+     }
+ 
+     //all ops are executed now it can be destroyed
+     public void release() {
+       refCount.decrementAndGet();
+       ref.release(this);
  
+     }
+   }
  }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/5ec9b64b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/5ec9b64b/solr/solrj/src/java/org/apache/solr/common/util/Utils.java
----------------------------------------------------------------------
diff --cc solr/solrj/src/java/org/apache/solr/common/util/Utils.java
index 6d5ed97,93af8c3..4ab24d2
--- a/solr/solrj/src/java/org/apache/solr/common/util/Utils.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/Utils.java
@@@ -445,17 -445,12 +447,25 @@@ public class Utils 
      }
    }
  
 -  public static long time(TimeUnit unit) {
 -    return unit.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
 +  public static String getBaseUrlForNodeName(final String nodeName, String urlScheme) {
 +    final int _offset = nodeName.indexOf("_");
 +    if (_offset < 0) {
 +      throw new IllegalArgumentException("nodeName does not contain expected '_' separator: " + nodeName);
 +    }
 +    final String hostAndPort = nodeName.substring(0,_offset);
 +    try {
 +      final String path = URLDecoder.decode(nodeName.substring(1+_offset), "UTF-8");
 +      return urlScheme + "://" + hostAndPort + (path.isEmpty() ? "" : ("/" + path));
 +    } catch (UnsupportedEncodingException e) {
 +      throw new IllegalStateException("JVM Does not seem to support UTF-8", e);
 +    }
    }
+ 
 -  public static long timeElapsed(long start, TimeUnit unit) {
 -    return unit.convert(System.nanoTime() - NANOSECONDS.convert(start, unit), NANOSECONDS);
++  public static long time(TimeSource timeSource, TimeUnit unit) {
++    return unit.convert(timeSource.getTime(), TimeUnit.NANOSECONDS);
+   }
+ 
++  public static long timeElapsed(TimeSource timeSource, long start, TimeUnit unit) {
++    return unit.convert(timeSource.getTime() - NANOSECONDS.convert(start, unit), NANOSECONDS);
++  }
  }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/5ec9b64b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/5ec9b64b/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java
----------------------------------------------------------------------