You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2017/12/06 04:04:20 UTC

lucene-solr:master: SOLR-11669: Policy Session lifecycle cleanup

Repository: lucene-solr
Updated Branches:
  refs/heads/master e84cce8ea -> 071d9270d


SOLR-11669: Policy Session lifecycle cleanup


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/071d9270
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/071d9270
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/071d9270

Branch: refs/heads/master
Commit: 071d9270d5e02a40bc36833b15f1e738ca2c3e51
Parents: e84cce8
Author: Noble Paul <no...@apache.org>
Authored: Wed Dec 6 15:04:04 2017 +1100
Committer: Noble Paul <no...@apache.org>
Committed: Wed Dec 6 15:04:04 2017 +1100

----------------------------------------------------------------------
 .../org/apache/solr/cloud/AddReplicaCmd.java    |  15 +-
 .../src/java/org/apache/solr/cloud/Assign.java  |  11 +-
 .../apache/solr/cloud/CreateCollectionCmd.java  |   4 +-
 .../org/apache/solr/cloud/CreateShardCmd.java   |   4 +-
 .../java/org/apache/solr/cloud/RestoreCmd.java  |   4 +-
 .../org/apache/solr/cloud/SplitShardCmd.java    |   5 +-
 .../org/apache/solr/cloud/UtilizeNodeCmd.java   |  12 +-
 .../cloud/autoscaling/ComputePlanAction.java    |  51 ++--
 .../admin/AutoscalingHistoryHandlerTest.java    |   2 +-
 .../autoscaling/DelegatingCloudManager.java     |   3 +-
 .../DelegatingDistribStateManager.java          |  91 ++++++
 .../solrj/cloud/autoscaling/PolicyHelper.java   | 288 +++++++++++++++----
 .../java/org/apache/solr/common/util/Utils.java |  11 +
 .../solrj/cloud/autoscaling/TestPolicy.java     |  97 ++++---
 14 files changed, 457 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/071d9270/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
index b5101f5..c785f9f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
@@ -26,7 +26,7 @@ import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
@@ -101,8 +101,8 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
     ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
     boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
 
-    final Long policyVersionBefore = PolicyHelper.REF_VERSION.get();
-    AtomicLong policyVersionAfter  = new AtomicLong(-1);
+
+    AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
     // Kind of unnecessary, but it does put the logic of whether to override maxShardsPerNode in one place.
     if (!skipCreateReplicaInClusterState) {
       if (CreateShardCmd.usePolicyFramework(coll, ocmh)) {
@@ -118,9 +118,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
               replicaType == Replica.Type.TLOG ? 0 : 1,
               replicaType == Replica.Type.PULL ? 0 : 1
           ).get(0).node;
-          if (policyVersionBefore == null && PolicyHelper.REF_VERSION.get() != null) {
-            policyVersionAfter.set(PolicyHelper.REF_VERSION.get());
-          }
+          sessionWrapper.set(PolicyHelper.getLastSessionWrapper(true));
         }
       } else {
         node = getNodesForNewReplicas(clusterState, collection, shard, 1, node,
@@ -220,9 +218,8 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
     Runnable runnable = () -> {
       ocmh.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica", asyncId, requestMap);
       ocmh.waitForCoreNodeName(collection, fnode, fcoreName);
-      if (policyVersionAfter.get() > -1) {
-        PolicyHelper.REF_VERSION.remove();
-        PolicyHelper.getPolicySessionRef(ocmh.overseer.getSolrCloudManager()).decref(policyVersionAfter.get());
+      if (sessionWrapper.get() != null) {
+        sessionWrapper.get().release();
       }
       if (onComplete != null) onComplete.run();
     };

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/071d9270/solr/core/src/java/org/apache/solr/cloud/Assign.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Assign.java b/solr/core/src/java/org/apache/solr/cloud/Assign.java
index 9967b23..fd0738f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Assign.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Assign.java
@@ -293,15 +293,8 @@ public class Assign {
     } else  {
       if (message.getStr(CREATE_NODE_SET) == null)
         nodeList = Collections.emptyList();// unless explicitly specified do not pass node list to Policy
-      synchronized (PolicyHelper.class) {
-        PolicyHelper.SESSION_REF.set(PolicyHelper.getPolicySessionRef(ocmh.overseer.getSolrCloudManager()));
-        try {
-          return getPositionsUsingPolicy(collectionName,
-              shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas, policyName, ocmh.overseer.getSolrCloudManager(), nodeList);
-        } finally {
-          PolicyHelper.SESSION_REF.remove();
-        }
-      }
+      return getPositionsUsingPolicy(collectionName,
+          shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas, policyName, ocmh.overseer.getSolrCloudManager(), nodeList);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/071d9270/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
index 94d7d9f..2c4f01e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
@@ -103,6 +103,7 @@ public class CreateCollectionCmd implements Cmd {
     }
 
     ocmh.validateConfigOrThrowSolrException(configName);
+    PolicyHelper.SessionWrapper sessionWrapper = null;
 
     try {
       // look at the replication factor and see if it matches reality
@@ -184,6 +185,7 @@ public class CreateCollectionCmd implements Cmd {
         }
         replicaPositions = Assign.identifyNodes(ocmh
             , clusterState, nodeList, collectionName, message, shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas);
+        sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
       }
 
       ZkStateReader zkStateReader = ocmh.zkStateReader;
@@ -318,7 +320,7 @@ public class CreateCollectionCmd implements Cmd {
     } catch (Exception ex) {
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, null, ex);
     } finally {
-      PolicyHelper.clearFlagAndDecref(PolicyHelper.getPolicySessionRef(ocmh.overseer.getSolrCloudManager()));
+      if(sessionWrapper != null) sessionWrapper.release();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/071d9270/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java
index 73ac7ff..18b0b63 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java
@@ -88,6 +88,7 @@ public class CreateShardCmd implements Cmd {
     Object createNodeSetStr = message.get(OverseerCollectionMessageHandler.CREATE_NODE_SET);
 
     ZkStateReader zkStateReader = ocmh.zkStateReader;
+    PolicyHelper.SessionWrapper sessionWrapper = null;
     boolean usePolicyFramework = usePolicyFramework(collection,ocmh);
     List<ReplicaPosition> positions = null;
     SolrCloseableLatch countDownLatch;
@@ -103,6 +104,7 @@ public class CreateShardCmd implements Cmd {
             numNrtReplicas,
             numTlogReplicas,
             numPullReplicas);
+        sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
       } else {
         List<Assign.ReplicaCount> sortedNodeList = getNodesForNewReplicas(clusterState, collectionName, sliceName, totalReplicas,
             createNodeSetStr, ocmh.overseer.getSolrCloudManager());
@@ -164,7 +166,7 @@ public class CreateShardCmd implements Cmd {
         });
       }
     } finally {
-      PolicyHelper.clearFlagAndDecref(PolicyHelper.getPolicySessionRef(ocmh.overseer.getSolrCloudManager()));
+      if(sessionWrapper != null) sessionWrapper.release();
     }
 
     log.debug("Waiting for create shard action to complete");

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/071d9270/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
index 1f9d64a..039ab5c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
@@ -219,6 +219,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
 
     List<String> sliceNames = new ArrayList<>();
     restoreCollection.getSlices().forEach(x -> sliceNames.add(x.getName()));
+    PolicyHelper.SessionWrapper sessionWrapper = null;
 
     try {
       List<ReplicaPosition> replicaPositions = Assign.identifyNodes(
@@ -226,6 +227,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
           nodeList, restoreCollectionName,
           message, sliceNames,
           numNrtReplicas, numTlogReplicas, numPullReplicas);
+      sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
       //Create one replica per shard and copy backed up data to it
       for (Slice slice : restoreCollection.getSlices()) {
         log.debug("Adding replica for shard={} collection={} ", slice.getName(), restoreCollection);
@@ -350,7 +352,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
 
       log.info("Completed restoring collection={} backupName={}", restoreCollection, backupName);
     } finally {
-      PolicyHelper.clearFlagAndDecref(PolicyHelper.getPolicySessionRef(ocmh.overseer.getSolrCloudManager()));
+      if (sessionWrapper != null) sessionWrapper.release();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/071d9270/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
index 2c1e5d6..8f65255 100644
--- a/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
@@ -92,6 +92,8 @@ public class SplitShardCmd implements Cmd {
 
     DocCollection collection = clusterState.getCollection(collectionName);
     DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
+    PolicyHelper.SessionWrapper sessionWrapper = null;
+
 
     Slice parentSlice;
 
@@ -392,6 +394,7 @@ public class SplitShardCmd implements Cmd {
           collectionName,
           new ZkNodeProps(collection.getProperties()),
           subSlices, repFactor - 1, 0, 0);
+      sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
 
       List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2);
 
@@ -513,7 +516,7 @@ public class SplitShardCmd implements Cmd {
       log.error("Error executing split operation for collection: " + collectionName + " parent shard: " + slice, e);
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, null, e);
     } finally {
-      PolicyHelper.clearFlagAndDecref(PolicyHelper.getPolicySessionRef(ocmh.overseer.getSolrCloudManager()));
+      if (sessionWrapper != null) sessionWrapper.release();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/071d9270/solr/core/src/java/org/apache/solr/cloud/UtilizeNodeCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/UtilizeNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/UtilizeNodeCmd.java
index 54044eb..6a55cfd 100644
--- a/solr/core/src/java/org/apache/solr/cloud/UtilizeNodeCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/UtilizeNodeCmd.java
@@ -84,7 +84,8 @@ public class UtilizeNodeCmd implements OverseerCollectionMessageHandler.Cmd {
       }
     }
     executeAll(requests);
-    Policy.Session session = autoScalingConfig.getPolicy().createSession(ocmh.overseer.getSolrCloudManager());
+    PolicyHelper.SessionWrapper sessionWrapper = PolicyHelper.getSession(ocmh.overseer.getSolrCloudManager());
+    Policy.Session session =  sessionWrapper.get();
     for (; ; ) {
       Suggester suggester = session.getSuggester(MOVEREPLICA)
           .hint(Suggester.Hint.TARGET_NODE, nodeName);
@@ -96,9 +97,12 @@ public class UtilizeNodeCmd implements OverseerCollectionMessageHandler.Cmd {
           REPLICA_PROP, request.getParams().get(REPLICA_PROP),
           ASYNC, request.getParams().get(ASYNC)));
     }
-
-
-    executeAll(requests);
+    sessionWrapper.returnSession(session);
+    try {
+      executeAll(requests);
+    } finally {
+      sessionWrapper.release();
+    }
   }
 
   private void executeAll(List<ZkNodeProps> requests) throws Exception {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/071d9270/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
index 34c0705..ccffea7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
@@ -28,11 +28,12 @@ import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
 import org.apache.solr.client.solrj.cloud.autoscaling.NoneSuggester;
 import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
+import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
 import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.params.AutoScalingParams;
 import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
 import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.params.AutoScalingParams;
 import org.apache.solr.common.params.CollectionParams;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,28 +57,42 @@ public class ComputePlanAction extends TriggerActionBase {
       if (autoScalingConf.isEmpty()) {
         throw new Exception("Action: " + getName() + " executed but no policy is configured");
       }
-      Policy policy = autoScalingConf.getPolicy();
-      Policy.Session session = policy.createSession(cloudManager);
-      Suggester suggester = getSuggester(session, event, cloudManager);
-      while (true) {
-        SolrRequest operation = suggester.getSuggestion();
-        if (operation == null) break;
-        log.info("Computed Plan: {}", operation.getParams());
-        Map<String, Object> props = context.getProperties();
-        props.compute("operations", (k, v) -> {
-          List<SolrRequest> operations = (List<SolrRequest>) v;
-          if (operations == null) operations = new ArrayList<>();
-          operations.add(operation);
-          return operations;
-        });
-        session = suggester.getSession();
-        suggester = getSuggester(session, event, cloudManager);
+
+      //    Policy.Session session = cloudManager.getDistribStateManager().getAutoScalingConfig().getPolicy().createSession(cloudManager);
+//    return new PolicyHelper.SessionWrapper(session, null);
+      PolicyHelper.SessionWrapper sessionWrapper = PolicyHelper.getSession(cloudManager);
+      Policy.Session session = sessionWrapper.get();
+//      Policy policy = autoScalingConf.getPolicy();
+      try {
+        Suggester suggester = getSuggester(session, event, cloudManager);
+        while (true) {
+          SolrRequest operation = suggester.getSuggestion();
+          if (operation == null) break;
+          log.info("Computed Plan: {}", operation.getParams());
+          Map<String, Object> props = context.getProperties();
+          props.compute("operations", (k, v) -> {
+            List<SolrRequest> operations = (List<SolrRequest>) v;
+            if (operations == null) operations = new ArrayList<>();
+            operations.add(operation);
+            return operations;
+          });
+          session = suggester.getSession();
+          suggester = getSuggester(session, event, cloudManager);
+        }
+      } finally {
+        releasePolicySession(sessionWrapper, session);
       }
     } catch (Exception e) {
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
           "Unexpected exception while processing event: " + event, e);    }
   }
 
+  private void releasePolicySession(PolicyHelper.SessionWrapper sessionWrapper, Policy.Session session) {
+    sessionWrapper.returnSession(session);
+    sessionWrapper.release();
+
+  }
+
   protected Suggester getSuggester(Policy.Session session, TriggerEvent event, SolrCloudManager cloudManager) {
     Suggester suggester;
     switch (event.getEventType()) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/071d9270/solr/core/src/test/org/apache/solr/handler/admin/AutoscalingHistoryHandlerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/handler/admin/AutoscalingHistoryHandlerTest.java b/solr/core/src/test/org/apache/solr/handler/admin/AutoscalingHistoryHandlerTest.java
index 0480426..1122952 100644
--- a/solr/core/src/test/org/apache/solr/handler/admin/AutoscalingHistoryHandlerTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/admin/AutoscalingHistoryHandlerTest.java
@@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory;
 
 import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
 
-@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.cloud.Overseer=DEBUG;org.apache.solr.cloud.overseer=DEBUG;")
+@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.cloud.Overseer=DEBUG;org.apache.solr.cloud.overseer=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper=DEBUG")
 public class AutoscalingHistoryHandlerTest extends SolrCloudTestCase {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/071d9270/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingCloudManager.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingCloudManager.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingCloudManager.java
index 0987db4..09b6193 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingCloudManager.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingCloudManager.java
@@ -30,6 +30,7 @@ import org.apache.solr.common.util.ObjectCache;
  */
 public class DelegatingCloudManager implements SolrCloudManager {
   private final SolrCloudManager delegate;
+  private ObjectCache objectCache = new ObjectCache();
 
   public DelegatingCloudManager(SolrCloudManager delegate) {
     this.delegate = delegate;
@@ -57,7 +58,7 @@ public class DelegatingCloudManager implements SolrCloudManager {
 
   @Override
   public ObjectCache getObjectCache() {
-    return delegate.getObjectCache();
+    return delegate == null ? objectCache : delegate.getObjectCache();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/071d9270/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingDistribStateManager.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingDistribStateManager.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingDistribStateManager.java
new file mode 100644
index 0000000..b47d1c8
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingDistribStateManager.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.cloud.autoscaling;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.Watcher;
+
+public class DelegatingDistribStateManager implements DistribStateManager {
+  private final DistribStateManager delegate;
+
+  public DelegatingDistribStateManager(DistribStateManager delegate) {
+    this.delegate = delegate;
+  }
+
+  @Override
+  public boolean hasData(String path) throws IOException, KeeperException, InterruptedException {
+    return delegate.hasData(path);
+  }
+
+  @Override
+  public List<String> listData(String path) throws NoSuchElementException, IOException, KeeperException, InterruptedException {
+    return delegate.listData(path);
+  }
+
+  @Override
+  public VersionedData getData(String path, Watcher watcher) throws NoSuchElementException, IOException, KeeperException, InterruptedException {
+    return delegate.getData(path, watcher);
+  }
+
+  @Override
+  public VersionedData getData(String path) throws NoSuchElementException, IOException, KeeperException, InterruptedException {
+    return delegate.getData(path);
+  }
+
+  @Override
+  public void makePath(String path) throws AlreadyExistsException, IOException, KeeperException, InterruptedException {
+    delegate.makePath(path);
+  }
+
+  @Override
+  public String createData(String path, byte[] data, CreateMode mode) throws AlreadyExistsException, IOException, KeeperException, InterruptedException {
+    return delegate.createData(path, data, mode);
+  }
+
+  @Override
+  public void removeData(String path, int version) throws NoSuchElementException, IOException, KeeperException, InterruptedException {
+    delegate.removeData(path, version);
+  }
+
+  @Override
+  public void setData(String path, byte[] data, int version) throws BadVersionException, NoSuchElementException, IOException, KeeperException, InterruptedException {
+    delegate.setData(path, data, version);
+  }
+
+  @Override
+  public List<OpResult> multi(Iterable<Op> ops) throws BadVersionException, NoSuchElementException, AlreadyExistsException, IOException, KeeperException, InterruptedException {
+    return delegate.multi(ops);
+  }
+
+  @Override
+  public AutoScalingConfig getAutoScalingConfig(Watcher watcher) throws InterruptedException, IOException {
+    return delegate.getAutoScalingConfig(watcher);
+  }
+
+  @Override
+  public AutoScalingConfig getAutoScalingConfig() throws InterruptedException, IOException {
+    return delegate.getAutoScalingConfig();
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/071d9270/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
index 5d179b7..024c6c3 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
@@ -18,13 +18,14 @@
 package org.apache.solr.client.solrj.cloud.autoscaling;
 
 
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.EnumMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.cloud.autoscaling.Suggester.Hint;
@@ -36,12 +37,19 @@ import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.ReplicaPosition;
 import org.apache.solr.common.util.Pair;
 import org.apache.solr.common.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
 import static org.apache.solr.common.params.CoreAdminParams.NODE;
+import static org.apache.solr.common.util.Utils.time;
+import static org.apache.solr.common.util.Utils.timeElapsed;
 
 public class PolicyHelper {
   private static ThreadLocal<Map<String, String>> policyMapping = new ThreadLocal<>();
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   public static List<ReplicaPosition> getReplicaLocations(String collName, AutoScalingConfig autoScalingConfig,
                                                           SolrCloudManager cloudManager,
                                                           Map<String, String> optionalPolicyMapping,
@@ -52,26 +60,45 @@ public class PolicyHelper {
                                                           List<String> nodesList) {
     List<ReplicaPosition> positions = new ArrayList<>();
     ClusterStateProvider stateProvider = new DelegatingClusterStateProvider(cloudManager.getClusterStateProvider()) {
-        @Override
-        public String getPolicyNameByCollection(String coll) {
-          return policyMapping.get() != null && policyMapping.get().containsKey(coll) ?
-              optionalPolicyMapping.get(coll) :
-              delegate.getPolicyNameByCollection(coll);
-        }
-      };
+      @Override
+      public String getPolicyNameByCollection(String coll) {
+        return policyMapping.get() != null && policyMapping.get().containsKey(coll) ?
+            optionalPolicyMapping.get(coll) :
+            delegate.getPolicyNameByCollection(coll);
+      }
+    };
     SolrCloudManager delegatingManager = new DelegatingCloudManager(cloudManager) {
       @Override
       public ClusterStateProvider getClusterStateProvider() {
         return stateProvider;
       }
+
+      @Override
+      public DistribStateManager getDistribStateManager() {
+        if (autoScalingConfig != null) {
+          return new DelegatingDistribStateManager(null) {
+            @Override
+            public AutoScalingConfig getAutoScalingConfig() throws InterruptedException, IOException {
+              return autoScalingConfig;
+            }
+          };
+        } else {
+          return super.getDistribStateManager();
+        }
+      }
     };
 
     policyMapping.set(optionalPolicyMapping);
+    SessionWrapper sessionWrapper = null;
     Policy.Session session = null;
     try {
-      session = SESSION_REF.get() != null ?
-          SESSION_REF.get().initOrGet(delegatingManager, autoScalingConfig.getPolicy()) :
-          autoScalingConfig.getPolicy().createSession(delegatingManager);
+      try {
+        SESSION_WRAPPPER_REF.set(sessionWrapper = getSession(delegatingManager));
+      } catch (Exception e) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "unable to get autoscaling policy session", e);
+
+      }
+      session = sessionWrapper.session;
 
       Map<Replica.Type, Integer> typeVsCount = new EnumMap<>(Replica.Type.class);
       typeVsCount.put(Replica.Type.NRT, nrtReplicas);
@@ -100,15 +127,16 @@ public class PolicyHelper {
         }
       }
     } finally {
-      if (session != null && SESSION_REF.get() != null) SESSION_REF.get().updateSession(session);
       policyMapping.remove();
+      if (sessionWrapper != null) {
+        sessionWrapper.returnSession(session);
+      }
     }
     return positions;
   }
 
 
   public static final int SESSION_EXPIRY = 180;//3 seconds
-  public static ThreadLocal<Long> REF_VERSION = new ThreadLocal<>();
 
   public static MapWriter getDiagnostics(Policy policy, SolrClientCloudManager cloudManager) {
     Policy.Session session = policy.createSession(cloudManager);
@@ -155,80 +183,220 @@ public class PolicyHelper {
     return suggestionCtx.getSuggestions();
   }
 
-  public static class SessionRef {
-    private final AtomicLong myVersion = new AtomicLong(0);
-    AtomicInteger refCount = new AtomicInteger();
-    private Policy.Session session;
-    long lastUsedTime;
+  public enum Status {
+    NULL,
+    //it is just created and not yet used or all operations on it has been competed fully
+    UNUSED,
+    COMPUTING, EXECUTING;
+  }
+
+  /**
+   * This class stores a session for sharing purpose. If a process creates a session to
+   * compute operations,
+   * 1) see if there is a session that is available in the cache,
+   * 2) if yes, check if it is expired
+   * 3) if it is expired, create a new session
+   * 4) if it is not expired, borrow it
+   * 5) after computing operations put it back in the cache
+   */
+  static class SessionRef {
+    private final Object lockObj = new Object();
+    private SessionWrapper sessionWrapper = SessionWrapper.DEF_INST;
+
 
     public SessionRef() {
     }
 
-    public long getRefVersion(){
-      return myVersion.get();
+
+    //only for debugging
+    SessionWrapper getSessionWrapper() {
+      return sessionWrapper;
     }
 
+    /**
+     * All operations suggested by the current session object
+     * is complete. Do not even cache anything
+     *
+     */
+    private void release(SessionWrapper sessionWrapper) {
+      synchronized (lockObj) {
+        if (sessionWrapper.createTime == this.sessionWrapper.createTime && this.sessionWrapper.refCount.get() <= 0) {
+          log.debug("session set to NULL");
+          this.sessionWrapper = SessionWrapper.DEF_INST;
+        } // else somebody created a new session b/c of expiry . So no need to do anything about it
+      }
+    }
 
-    public void decref(long version) {
-      synchronized (SessionRef.class) {
-        if (session == null) return;
-        if(myVersion.get() != version) return;
-        if (refCount.decrementAndGet() <= 0) {
-          session = null;
-          lastUsedTime = 0;
+    /**
+     * Computing is over for this session and it may contain a new session with new state
+     * The session can be used by others while the caller is performing operations
+     *
+     */
+    private void returnSession(SessionWrapper sessionWrapper) {
+      synchronized (lockObj) {
+        sessionWrapper.status = Status.EXECUTING;
+        log.info("returnSession, curr-time {} sessionWrapper.createTime {}, this.sessionWrapper.createTime {} ", time(MILLISECONDS),
+            sessionWrapper.createTime,
+            this.sessionWrapper.createTime);
+        if (sessionWrapper.createTime == this.sessionWrapper.createTime) {
+          //this session was used for computing new operations and this can now be used for other
+          // computing
+          this.sessionWrapper = sessionWrapper;
+
+          //one thread who is waiting for this need to be notified.
+          lockObj.notify();
+        } else {
+          log.info("create time NOT SAME {} ", SessionWrapper.DEF_INST.createTime);
+          //else just ignore it
         }
       }
-    }
 
-    public int getRefCount() {
-      return refCount.get();
     }
 
-    public Policy.Session get() {
-      synchronized (SessionRef.class) {
-        if (session == null) return null;
-        if (TimeUnit.SECONDS.convert(System.nanoTime() - lastUsedTime, TimeUnit.NANOSECONDS) > SESSION_EXPIRY) {
-          session = null;
-          return null;
+
+    public SessionWrapper get(SolrCloudManager cloudManager) throws IOException, InterruptedException {
+      synchronized (lockObj) {
+        if (sessionWrapper.status == Status.NULL ||
+            TimeUnit.SECONDS.convert(System.nanoTime() - sessionWrapper.lastUpdateTime, TimeUnit.NANOSECONDS) > SESSION_EXPIRY) {
+          //no session available or the session is expired
+          return createSession(cloudManager);
         } else {
-          REF_VERSION.set(myVersion.get());
-          refCount.incrementAndGet();
-          return session;
+          long waitStart = time(MILLISECONDS);
+          //the session is not expired
+          log.debug("reusing a session {}", this.sessionWrapper.createTime);
+          if (this.sessionWrapper.status == Status.UNUSED || this.sessionWrapper.status == Status.EXECUTING) {
+            this.sessionWrapper.status = Status.COMPUTING;
+            return sessionWrapper;
+          } else {
+            //status= COMPUTING it's being used for computing. computing is
+            log.debug("session being used. waiting... current time {} ", time(MILLISECONDS));
+            try {
+              lockObj.wait(10 * 1000);//wait for a max of 10 seconds
+            } catch (InterruptedException e) {
+              log.info("interrupted... ");
+            }
+            log.debug("out of waiting curr-time:{} time-elapsed {}", time(MILLISECONDS), timeElapsed(waitStart, MILLISECONDS));
+            // now this thread has woken up because it got timed out after 10 seconds or it is notified after
+            //the session was returned from another COMPUTING operation
+            if (this.sessionWrapper.status == Status.UNUSED || this.sessionWrapper.status == Status.EXECUTING) {
+              log.debug("Wait over. reusing the existing session ");
+              this.sessionWrapper.status = Status.COMPUTING;
+              return sessionWrapper;
+            } else {
+              //create a new Session
+              return createSession(cloudManager);
+            }
+          }
         }
       }
+
+
     }
 
-    public Policy.Session initOrGet(SolrCloudManager cloudManager, Policy policy) {
-      synchronized (SessionRef.class) {
-        Policy.Session session = get();
-        if (session != null) return session;
-        this.session = policy.createSession(cloudManager);
-        myVersion.incrementAndGet();
-        lastUsedTime = System.nanoTime();
-        REF_VERSION.set(myVersion.get());
-        refCount.set(1);
-        return this.session;
+    private SessionWrapper createSession(SolrCloudManager cloudManager) throws InterruptedException, IOException {
+      synchronized (lockObj) {
+        log.debug("Creating a new session");
+        Policy.Session session = cloudManager.getDistribStateManager().getAutoScalingConfig().getPolicy().createSession(cloudManager);
+        log.debug("New session created ");
+        this.sessionWrapper = new SessionWrapper(session, this);
+        this.sessionWrapper.status = Status.COMPUTING;
+        return sessionWrapper;
       }
     }
 
 
-    private void updateSession(Policy.Session session) {
-      this.session = session;
-      lastUsedTime = System.nanoTime();
-    }
   }
 
-  public static void clearFlagAndDecref(SessionRef policySessionRef) {
-    Long refVersion =  REF_VERSION.get();
-    if (refVersion != null) policySessionRef.decref(refVersion);
-    REF_VERSION.remove();
+  /**
+   * How to get a shared Policy Session
+   * 1) call {@link #getSession(SolrCloudManager)}
+   * 2) compute all suggestions
+   * 3) call {@link  SessionWrapper#returnSession(Policy.Session)}
+   * 4) perform all suggestions
+   * 5) call {@link  SessionWrapper#release()}
+   */
+  public static SessionWrapper getSession(SolrCloudManager cloudManager) throws IOException, InterruptedException {
+    SessionRef sessionRef = (SessionRef) cloudManager.getObjectCache().computeIfAbsent(SessionRef.class.getName(), s -> new SessionRef());
+    return sessionRef.get(cloudManager);
   }
-  public static PolicyHelper.SessionRef getPolicySessionRef(SolrCloudManager cloudManager){
-    return (SessionRef) cloudManager.getObjectCache().computeIfAbsent(SessionRef.class.getName(), s -> new SessionRef());
+
+  /**
+   * Use this to get the last used session wrapper in this thread
+   *
+   * @param clear whether to unset the threadlocal or not
+   */
+  public static SessionWrapper getLastSessionWrapper(boolean clear) {
+    SessionWrapper wrapper = SESSION_WRAPPPER_REF.get();
+    if (clear) SESSION_WRAPPPER_REF.remove();
+    return wrapper;
+
   }
 
 
-  public static ThreadLocal<SessionRef> SESSION_REF = new ThreadLocal<>();
+  static ThreadLocal<SessionWrapper> SESSION_WRAPPPER_REF = new ThreadLocal<>();
+
+
+  public static class SessionWrapper {
+    public static final SessionWrapper DEF_INST = new SessionWrapper(null, null);
+
+    static {
+      DEF_INST.status = Status.NULL;
+      DEF_INST.createTime = -1l;
+      DEF_INST.lastUpdateTime = -1l;
+    }
+
+    private long createTime;
+    private long lastUpdateTime;
+    private Policy.Session session;
+    public Status status;
+    private final SessionRef ref;
+    private AtomicInteger refCount = new AtomicInteger();
+
+    public long getCreateTime() {
+      return createTime;
+    }
+
+    public long getLastUpdateTime() {
+      return lastUpdateTime;
+    }
+
+    public SessionWrapper(Policy.Session session, SessionRef ref) {
+      lastUpdateTime = createTime = System.nanoTime();
+      this.session = session;
+      this.status = Status.UNUSED;
+      this.ref = ref;
+    }
+
+    public Policy.Session get() {
+      return session;
+    }
+
+    public SessionWrapper update(Policy.Session session) {
+      this.lastUpdateTime = System.nanoTime();
+      this.session = session;
+      return this;
+    }
+
+    public int getRefCount() {
+      return refCount.get();
+    }
+
+    /**
+     * return this for later use and update the session with the latest state
+     * ensure that this is done after computing the suggestions
+     */
+    public void returnSession(Policy.Session session) {
+      this.update(session);
+      refCount.incrementAndGet();
+      ref.returnSession(this);
 
+    }
+
+    //all ops are executed now it can be destroyed
+    public void release() {
+      refCount.decrementAndGet();
+      ref.release(this);
 
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/071d9270/solr/solrj/src/java/org/apache/solr/common/util/Utils.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/Utils.java b/solr/solrj/src/java/org/apache/solr/common/util/Utils.java
index 8dd431b..93af8c3 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/Utils.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/Utils.java
@@ -35,6 +35,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -61,6 +62,7 @@ import org.slf4j.LoggerFactory;
 
 import static java.util.Collections.unmodifiableList;
 import static java.util.Collections.unmodifiableSet;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 public class Utils {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -442,4 +444,13 @@ public class Utils {
       }
     }
   }
+
+  public static long time(TimeUnit unit) {
+    return unit.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
+  }
+
+  public static long timeElapsed(long start, TimeUnit unit) {
+    return unit.convert(System.nanoTime() - NANOSECONDS.convert(start, unit), NANOSECONDS);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/071d9270/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
index 31e316e..396234b 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
@@ -30,6 +30,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.solr.SolrTestCaseJ4;
@@ -846,15 +847,15 @@ public class TestPolicy extends SolrTestCaseJ4 {
   }
 
   @Test
-  public void testSessionCaching() {
-    PolicyHelper.SessionRef ref1 = new PolicyHelper.SessionRef();
+  public void testSessionCaching() throws IOException, InterruptedException {
+//    PolicyHelper.SessionRef ref1 = new PolicyHelper.SessionRef();
     String autoScalingjson = "  '{cluster-policy':[" +
         "    {      'cores':'<10',      'node':'#ANY'}," +
         "    {      'replica':'<2',      'shard':'#EACH',      'node':'#ANY'}," +
         "    {      'nodeRole':'overseer','replica':0}]," +
         "  'cluster-preferences':[{'minimize':'cores'}]}";
     Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
-    PolicyHelper.SESSION_REF.set(ref1);
+//    PolicyHelper.SESSION_REF.set(ref1);
     String nodeValues = " {" +
         "    'node4':{" +
         "      'node':'10.0.0.4:8987_solr'," +
@@ -870,7 +871,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
         "      'freedisk':884.7097854614258}," +
         "}";
 
-    SolrCloudManager provider = getSolrCloudManager((Map<String, Map>) Utils.fromJSONString(nodeValues), clusterState);
+
     Map policies = (Map) Utils.fromJSONString("{" +
         "  'cluster-preferences': [" +
         "    { 'maximize': 'freedisk', 'precision': 50}," +
@@ -882,44 +883,70 @@ public class TestPolicy extends SolrTestCaseJ4 {
         "  ]" +
         "}");
     AutoScalingConfig config = new AutoScalingConfig(policies);
+    final SolrCloudManager solrCloudManager = new DelegatingCloudManager(getSolrCloudManager((Map<String, Map>) Utils.fromJSONString(nodeValues),
+        clusterState)) {
+      @Override
+      public DistribStateManager getDistribStateManager() {
+        return delegatingDistribStateManager(config);
+      }
+    };
 
-    List<ReplicaPosition> locations = PolicyHelper.getReplicaLocations("c", config, provider, null,
+    List<ReplicaPosition> locations = PolicyHelper.getReplicaLocations("c", config, solrCloudManager, null,
         Arrays.asList("s1", "s2"), 1, 0, 0,
         null);
 
-    long sessionRefVersion =  PolicyHelper.REF_VERSION.get();
-    PolicyHelper.SessionRef ref1Copy = PolicyHelper.SESSION_REF.get();
-    PolicyHelper.SESSION_REF.remove();
-    Policy.Session session = ref1Copy.get();
+    PolicyHelper.SessionRef sessionRef = (PolicyHelper.SessionRef) solrCloudManager.getObjectCache().get(PolicyHelper.SessionRef.class.getName());
+    assertNotNull(sessionRef);
+    PolicyHelper.SessionWrapper sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
+
+
+    Policy.Session session = sessionWrapper.get();
     assertNotNull(session);
-    assertEquals(ref1, ref1Copy);
-    assertTrue(session.getPolicy() == config.getPolicy());
-    ref1Copy.decref(sessionRefVersion);
-    PolicyHelper.SESSION_REF.set(ref1);
-    AutoScalingConfig config2 = new AutoScalingConfig(policies);
-    locations = PolicyHelper.getReplicaLocations("c2", config2, provider, null, Arrays.asList("s1", "s2"), 1, 0, 0,
-        null);
-    sessionRefVersion =  PolicyHelper.REF_VERSION.get();
-    ref1Copy = PolicyHelper.SESSION_REF.get();
-    PolicyHelper.SESSION_REF.remove();
-    session = ref1Copy.get();
-    ref1Copy.decref(sessionRefVersion);
-    assertEquals(ref1, ref1Copy);
-    assertFalse(session.getPolicy() == config2.getPolicy());
     assertTrue(session.getPolicy() == config.getPolicy());
-    assertEquals(2, ref1Copy.getRefCount());
-    ref1.decref(sessionRefVersion);//decref 1
-    ref1.decref(sessionRefVersion);//decref 2
-    PolicyHelper.SESSION_REF.set(ref1);
-    locations = PolicyHelper.getReplicaLocations("c3", config2, provider, null, Arrays.asList("s1", "s2"), 1, 0, 0,
-        null);
-    sessionRefVersion =  PolicyHelper.REF_VERSION.get();
-    ref1Copy = PolicyHelper.SESSION_REF.get();
-    PolicyHelper.SESSION_REF.remove();
-    session = ref1Copy.get();
-    ref1Copy.decref(sessionRefVersion);
-    assertTrue(session.getPolicy() == config2.getPolicy());
+    assertEquals(sessionWrapper.status, PolicyHelper.Status.EXECUTING);
+    sessionWrapper.release();
+    assertTrue(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEF_INST);
+    PolicyHelper.SessionWrapper s1 = PolicyHelper.getSession(solrCloudManager);
+    assertEquals(sessionRef.getSessionWrapper().getCreateTime(), s1.getCreateTime());
+    PolicyHelper.SessionWrapper[] s2 = new PolicyHelper.SessionWrapper[1];
+    AtomicLong secondTime = new AtomicLong();
+    Thread thread = new Thread(() -> {
+      try {
+        s2[0] = PolicyHelper.getSession(solrCloudManager);
+        secondTime.set(System.nanoTime());
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    });
+    thread.start();
+    Thread.sleep(50);
+    long beforeReturn = System.nanoTime();
+    assertEquals(s1.getCreateTime(), sessionRef.getSessionWrapper().getCreateTime());
+    s1.returnSession(s1.get());
+    assertEquals(1, s1.getRefCount());
+    thread.join();
+    assertNotNull(s2[0]);
+    assertTrue(secondTime.get() > beforeReturn);
+    assertTrue(s1.getCreateTime() == s2[0].getCreateTime());
+
+    s2[0].returnSession(s2[0].get());
+    assertEquals(2, s1.getRefCount());
+
+    s2[0].release();
+    assertFalse(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEF_INST);
+    s1.release();
+    assertTrue(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEF_INST);
 
+
+  }
+
+  private DistribStateManager delegatingDistribStateManager(AutoScalingConfig config) {
+    return new DelegatingDistribStateManager(null) {
+      @Override
+      public AutoScalingConfig getAutoScalingConfig() throws InterruptedException, IOException {
+        return config;
+      }
+    };
   }
 
   public void testNegativeConditions() {