You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by il...@apache.org on 2020/06/29 18:20:43 UTC

[lucene-solr] branch branch_8x updated: SOLR-14462: cache more than one autoscaling session (#1630)

This is an automated email from the ASF dual-hosted git repository.

ilan pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/branch_8x by this push:
     new 7815287  SOLR-14462: cache more than one autoscaling session (#1630)
7815287 is described below

commit 78152876fda92c61d1c6bcdf5e8953042a592b4f
Author: Ilan Ginzburg <ig...@salesforce.com>
AuthorDate: Mon Jun 29 20:20:30 2020 +0200

    SOLR-14462: cache more than one autoscaling session (#1630)
    
    Cherry picked from 25428013fb0ed8f8fdbebdef3f1d65dea77129c2
---
 .../solrj/cloud/autoscaling/PolicyHelper.java      | 310 ++++++++++++++-------
 .../client/solrj/cloud/autoscaling/TestPolicy.java | 134 ++++++++-
 2 files changed, 334 insertions(+), 110 deletions(-)

diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
index 52ad540..4066eab 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
@@ -28,6 +28,7 @@ import java.util.Collections;
 import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -396,33 +397,59 @@ public class PolicyHelper {
   }
 
   public enum Status {
-    NULL,
-    //it is just created and not yet used or all operations on it has been completed fully
-    UNUSED,
-    COMPUTING, EXECUTING
+    /**
+     * A command is actively using and modifying the session to compute placements
+     */
+    COMPUTING,
+    /**
+     * A command is not done yet processing its changes but no longer updates or even uses the session
+     */
+    EXECUTING
   }
 
   /**
-   * This class stores a session for sharing purpose. If a process creates a session to
-   * compute operations,
-   * 1) see if there is a session that is available in the cache,
-   * 2) if yes, check if it is expired
-   * 3) if it is expired, create a new session
-   * 4) if it is not expired, borrow it
-   * 5) after computing operations put it back in the cache
+   * This class stores sessions for sharing purposes. If a process requires a session to
+   * compute operations:
+   * <ol>
+   * <li>see if there is an available non expired session in the cache,</li>
+   * <li>if yes, borrow it.</li>
+   * <li>if no, create a new one and borrow it.</li>
+   * <li>after computing (update) operations are done, {@link #returnSession(SessionWrapper)} back to the cache so it's
+   * again available for borrowing.</li>
+   * <li>after all borrowers are done computing then executing with the session, {@link #release(SessionWrapper)} it,
+   * which removes it from the cache.</li>
+   * </ol>
    */
   static class SessionRef {
+    /**
+     * Lock protecting access to {@link #sessionWrapperSet} and to {@link #creationsInProgress}
+     */
     private final Object lockObj = new Object();
-    private SessionWrapper sessionWrapper = SessionWrapper.DEFAULT_INSTANCE;
 
+    /**
+     * Sessions currently in use in {@link Status#COMPUTING} or {@link Status#EXECUTING} states. As soon as all
+     * uses of a session are over, that session is removed from this set. Sessions not actively in use are NOT kept around.
+     *
+     * <p>Access should only be done under the protection of {@link #lockObj}</p>
+     */
+    private Set<SessionWrapper> sessionWrapperSet = Collections.newSetFromMap(new IdentityHashMap<>());
+
+
+    /**
+     * Number of sessions currently being created but not yet present in {@link #sessionWrapperSet}.
+     *
+     * <p>Access should only be done under the protection of {@link #lockObj}</p>
+     */
+    private int creationsInProgress = 0;
 
     public SessionRef() {
     }
 
-
-    //only for debugging
-    SessionWrapper getSessionWrapper() {
-      return sessionWrapper;
+    // used only by tests
+    boolean isEmpty() {
+      synchronized (lockObj) {
+        return sessionWrapperSet.isEmpty();
+      }
     }
 
     /**
@@ -430,11 +457,19 @@ public class PolicyHelper {
      * is complete. Do not even cache anything
      */
     private void release(SessionWrapper sessionWrapper) {
+      boolean present;
       synchronized (lockObj) {
-        if (sessionWrapper.createTime == this.sessionWrapper.createTime && this.sessionWrapper.refCount.get() <= 0) {
-          log.debug("session set to NULL");
-          this.sessionWrapper = SessionWrapper.DEFAULT_INSTANCE;
-        } // else somebody created a new session b/c of expiry . So no need to do anything about it
+        present = sessionWrapperSet.remove(sessionWrapper);
+      }
+      if (!present) {
+        log.warn("released session {} not found in session set", sessionWrapper.getCreateTime());
+      } else {
+        if (log.isDebugEnabled()) {
+          TimeSource timeSource = sessionWrapper.session.cloudManager.getTimeSource();
+          log.debug("final release, session {} lived a total of {}ms, ", sessionWrapper.getCreateTime(),
+              timeElapsed(timeSource, TimeUnit.MILLISECONDS.convert(sessionWrapper.getCreateTime(),
+                  TimeUnit.NANOSECONDS), MILLISECONDS)); // logOk
+        }
       }
     }
 
@@ -443,87 +478,149 @@ public class PolicyHelper {
      * The session can be used by others while the caller is performing operations
      */
     private void returnSession(SessionWrapper sessionWrapper) {
-      TimeSource timeSource = sessionWrapper.session != null ? sessionWrapper.session.cloudManager.getTimeSource() : TimeSource.NANO_TIME;
+      boolean present;
       synchronized (lockObj) {
         sessionWrapper.status = Status.EXECUTING;
-        if (log.isDebugEnabled()) {
-          log.debug("returnSession, curr-time {} sessionWrapper.createTime {}, this.sessionWrapper.createTime {} "
-              , time(timeSource, MILLISECONDS),
-              sessionWrapper.createTime,
-              this.sessionWrapper.createTime);
-        }
-        if (sessionWrapper.createTime == this.sessionWrapper.createTime) {
-          //this session was used for computing new operations and this can now be used for other
-          // computing
-          this.sessionWrapper = sessionWrapper;
+        present = sessionWrapperSet.contains(sessionWrapper);
 
-          //one thread who is waiting for this need to be notified.
-          lockObj.notify();
-        } else {
-          log.debug("create time NOT SAME {} ", SessionWrapper.DEFAULT_INSTANCE.createTime);
-          //else just ignore it
-        }
+        // wake up single thread waiting for a session return (ok if not woken up, wait is short)
+        // Important to wake up a single one, otherwise of multiple waiting threads, all but one will immediately create new sessions
+        lockObj.notify();
       }
 
+      // Logging
+      if (present) {
+        if (log.isDebugEnabled()) {
+          log.debug("returnSession {}", sessionWrapper.getCreateTime());
+        }
+      } else {
+        log.warn("returning unknown session {} ", sessionWrapper.getCreateTime());
+      }
     }
 
-
-    public SessionWrapper get(SolrCloudManager cloudManager) throws IOException, InterruptedException {
+    /**
+     * <p>Method returning an available session that can be used for {@link Status#COMPUTING}, either from the
+     * {@link #sessionWrapperSet} cache or by creating a new one. The status of the returned session is set to {@link Status#COMPUTING}.</p>
+     *
+     * Some waiting is done in two cases:
+     * <ul>
+     *   <li>A candidate session is present in {@link #sessionWrapperSet} but is still {@link Status#COMPUTING}, a random wait
+     *   is observed to see if the session gets freed to save a session creation and allow session reuse,</li>
+     *   <li>It is necessary to create a new session but there are already sessions in the process of being created, a
+     *   random wait is observed (if no waiting already occurred waiting for a session to become free) before creation
+     *   takes place, just in case one of the created sessions got used then {@link #returnSession(SessionWrapper)} in the meantime.</li>
+     * </ul>
+     *
+     * The random wait prevents the "thundering herd" effect when all threads needing a session at the same time create a new
+     * one even though some differentiated waits could have led to better reuse and less session creations.
+     *
+     * @param allowWait usually <code>true</code> except in tests that know there's no point in waiting because nothing
+     *                  will happen...
+     */
+    public SessionWrapper get(SolrCloudManager cloudManager, boolean allowWait) throws IOException, InterruptedException {
       TimeSource timeSource = cloudManager.getTimeSource();
+      long oldestUpdateTimeNs = TimeUnit.SECONDS.convert(timeSource.getTimeNs(), TimeUnit.NANOSECONDS) - SESSION_EXPIRY;
+      int zkVersion = cloudManager.getDistribStateManager().getAutoScalingConfig().getZkVersion();
+
       synchronized (lockObj) {
-        if (sessionWrapper.status == Status.NULL ||
-            sessionWrapper.zkVersion != cloudManager.getDistribStateManager().getAutoScalingConfig().getZkVersion() ||
-            TimeUnit.SECONDS.convert(timeSource.getTimeNs() - sessionWrapper.lastUpdateTime, TimeUnit.NANOSECONDS) > SESSION_EXPIRY) {
-          //no session available or the session is expired
-          return createSession(cloudManager);
-        } else {
+        SessionWrapper sw = getAvailableSession(zkVersion, oldestUpdateTimeNs);
+
+        // Best case scenario: an available session
+        if (sw != null) {
+          if (log.isDebugEnabled()) {
+            log.debug("reusing session {}", sw.getCreateTime());
+          }
+          return sw;
+        }
+
+        // Wait for a while before deciding what to do if waiting could help...
+        if ((creationsInProgress != 0 || hasCandidateSession(zkVersion, oldestUpdateTimeNs)) && allowWait) {
+          // Either an existing session might be returned and become usable while we wait, or a session in the process of being
+          // created might finish creation, be used then returned and become usable. So we wait.
+          // wait 1 to 10 secs. Random to help spread wakeups.
+          long waitForMs = (long) (Math.random() * 9 * 1000) + 1000;
+
+          if (log.isDebugEnabled()) {
+            log.debug("No sessions are available, all busy COMPUTING (or {} creations in progress). starting wait of {}ms",
+                creationsInProgress, waitForMs);
+          }
           long waitStart = time(timeSource, MILLISECONDS);
-          //the session is not expired
-          log.debug("reusing a session {}", this.sessionWrapper.createTime);
-          if (this.sessionWrapper.status == Status.UNUSED || this.sessionWrapper.status == Status.EXECUTING) {
-            this.sessionWrapper.status = Status.COMPUTING;
-            return sessionWrapper;
-          } else {
-            //status= COMPUTING it's being used for computing. computing is
-            if (log.isDebugEnabled()) {
-              log.debug("session being used. waiting... current time {} ", time(timeSource, MILLISECONDS));
-            }
-            try {
-              lockObj.wait(10 * 1000);//wait for a max of 10 seconds
-            } catch (InterruptedException e) {
-              log.info("interrupted... ");
-            }
+          try {
+            lockObj.wait(waitForMs);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+          }
+          if (log.isDebugEnabled()) {
+            log.debug("out of waiting. wait of {}ms, actual time elapsed {}ms", waitForMs, timeElapsed(timeSource, waitStart, MILLISECONDS));
+          }
+
+          // We've waited, now we can either reuse immediately an available session, or immediately create a new one
+          sw = getAvailableSession(zkVersion, oldestUpdateTimeNs);
+
+          // Second best case scenario: an available session
+          if (sw != null) {
             if (log.isDebugEnabled()) {
-              log.debug("out of waiting curr-time:{} time-elapsed {}"
-                  , time(timeSource, MILLISECONDS), timeElapsed(timeSource, waitStart, MILLISECONDS));
-            }
-            // now this thread has woken up because it got timed out after 10 seconds or it is notified after
-            // the session was returned from another COMPUTING operation
-            if (this.sessionWrapper.status == Status.UNUSED || this.sessionWrapper.status == Status.EXECUTING) {
-              log.debug("Wait over. reusing the existing session ");
-              this.sessionWrapper.status = Status.COMPUTING;
-              return sessionWrapper;
-            } else {
-              //create a new Session
-              return createSession(cloudManager);
+              log.debug("reusing session {} after wait", sw.getCreateTime());
             }
+            return sw;
           }
         }
+
+        // We're going to create a new Session OUTSIDE of the critical section because session creation can take quite some time
+        creationsInProgress++;
       }
-    }
 
-    private SessionWrapper createSession(SolrCloudManager cloudManager) throws InterruptedException, IOException {
-      synchronized (lockObj) {
-        log.debug("Creating a new session");
+      SessionWrapper newSessionWrapper = null;
+      try {
+        if (log.isDebugEnabled()) {
+          log.debug("Creating a new session");
+        }
         Policy.Session session = cloudManager.getDistribStateManager().getAutoScalingConfig().getPolicy().createSession(cloudManager);
-        log.debug("New session created ");
-        this.sessionWrapper = new SessionWrapper(session, this);
-        this.sessionWrapper.status = Status.COMPUTING;
-        return sessionWrapper;
+        newSessionWrapper = new SessionWrapper(session, this);
+        if (log.isDebugEnabled()) {
+          log.debug("New session created, {}", newSessionWrapper.getCreateTime());
+        }
+        return newSessionWrapper;
+      } finally {
+        synchronized (lockObj) {
+          creationsInProgress--;
+
+          if (newSessionWrapper != null) {
+            // Session created successfully
+            sessionWrapperSet.add(newSessionWrapper);
+          }
+        }
       }
     }
 
+    /**
+       * Returns an available session from the cache (the best one once cache strategies are defined), or null if no session
+       * from the cache is available (i.e. all are still COMPUTING, are too old, wrong zk version or the cache is empty).<p>
+       * This method must be called while holding the monitor on {@link #lockObj}.<p>
+       * The method updates the session status to computing.
+       */
+    private SessionWrapper getAvailableSession(int zkVersion, long oldestUpdateTimeNs) {
+      for (SessionWrapper sw : sessionWrapperSet) {
+        if (sw.status == Status.EXECUTING && sw.getLastUpdateTime() >= oldestUpdateTimeNs && sw.zkVersion == zkVersion) {
+          sw.status = Status.COMPUTING;
+          return sw;
+        }
+      }
+      return null;
+    }
 
+    /**
+     * Returns true if there's a session in the cache that could be returned (if it was free). This is required to
+     * know if there's any point in waiting or if a new session should better be created right away.
+     */
+    private boolean hasCandidateSession(int zkVersion, long oldestUpdateTimeNs) {
+      for (SessionWrapper sw : sessionWrapperSet) {
+        if (sw.getLastUpdateTime() >= oldestUpdateTimeNs && sw.zkVersion == zkVersion) {
+          return true;
+        }
+      }
+      return false;
+    }
   }
 
   /**
@@ -535,8 +632,12 @@ public class PolicyHelper {
    * 5) call {@link  SessionWrapper#release()}
    */
   public static SessionWrapper getSession(SolrCloudManager cloudManager) throws IOException, InterruptedException {
+    return getSession(cloudManager, true);
+  }
+
+  static SessionWrapper getSession(SolrCloudManager cloudManager, boolean allowWait) throws IOException, InterruptedException {
     SessionRef sessionRef = (SessionRef) cloudManager.getObjectCache().computeIfAbsent(SessionRef.class.getName(), s -> new SessionRef());
-    return sessionRef.get(cloudManager);
+    return sessionRef.get(cloudManager, allowWait);
   }
 
   /**
@@ -556,22 +657,21 @@ public class PolicyHelper {
 
 
   public static class SessionWrapper {
-    public static final SessionWrapper DEFAULT_INSTANCE = new SessionWrapper(null, null);
-
-    static {
-      DEFAULT_INSTANCE.status = Status.NULL;
-      DEFAULT_INSTANCE.createTime = -1L;
-      DEFAULT_INSTANCE.lastUpdateTime = -1L;
-    }
-
-    private long createTime;
+    private final long createTime;
     private long lastUpdateTime;
     private Policy.Session session;
     public Status status;
     private final SessionRef ref;
-    private AtomicInteger refCount = new AtomicInteger();
+    /**
+     * Number of commands currently using the session in {@link Status#EXECUTING}. There is one <b>additional</b> command
+     * using the session and updating it if {@link #status} is {@link Status#COMPUTING}
+     */
+    private final AtomicInteger refCount = new AtomicInteger();
     public final long zkVersion;
 
+    /**
+     * Nanoseconds (since/to some arbitrary time) when the session got created. Also used in logs (only in logs!) to identify the session.
+     */
     public long getCreateTime() {
       return createTime;
     }
@@ -581,27 +681,22 @@ public class PolicyHelper {
     }
 
     public SessionWrapper(Policy.Session session, SessionRef ref) {
-      lastUpdateTime = createTime = session != null ?
-          session.cloudManager.getTimeSource().getTimeNs() :
-          TimeSource.NANO_TIME.getTimeNs();
+      createTime = session.cloudManager.getTimeSource().getTimeNs();
+      lastUpdateTime = createTime;
       this.session = session;
-      this.status = Status.UNUSED;
+      this.status = Status.COMPUTING; // Created for being used, so COMPUTING right away
       this.ref = ref;
-      this.zkVersion = session == null ?
-          0 :
-          session.getPolicy().getZkVersion();
+      this.zkVersion = session.getPolicy().getZkVersion();
     }
 
     public Policy.Session get() {
       return session;
     }
 
-    public SessionWrapper update(Policy.Session session) {
-      this.lastUpdateTime = session != null ?
-          session.cloudManager.getTimeSource().getTimeNs() :
-          TimeSource.NANO_TIME.getTimeNs();
+    public void update(Policy.Session session) {
+      // JMM multithreaded access issue on lastUpdateTime.
+      this.lastUpdateTime = session.cloudManager.getTimeSource().getTimeNs();
       this.session = session;
-      return this;
     }
 
     public int getRefCount() {
@@ -613,6 +708,10 @@ public class PolicyHelper {
      * ensure that this is done after computing the suggestions
      */
     public void returnSession(Policy.Session session) {
+      if (this.status != Status.COMPUTING) {
+        log.warn("returning session {} not in state COMPUTING", this.getCreateTime());
+      }
+
       this.update(session);
       this.returnSession();
     }
@@ -627,8 +726,9 @@ public class PolicyHelper {
 
     //all ops are executed now it can be destroyed
     public void release() {
-      refCount.decrementAndGet();
-      ref.release(this);
+      if (refCount.decrementAndGet() <= 0) {
+        ref.release(this);
+      }
     }
   }
 }
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
index fc8eefa..a47f612 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
@@ -37,6 +37,7 @@ import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrResponse;
@@ -1730,9 +1731,8 @@ public class TestPolicy extends SolrTestCaseJ4 {
     assertTrue(session.getPolicy() == config.getPolicy());
     assertEquals(sessionWrapper.status, PolicyHelper.Status.EXECUTING);
     sessionWrapper.release();
-    assertTrue(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEFAULT_INSTANCE);
+    assertTrue(sessionRef.isEmpty());
     PolicyHelper.SessionWrapper s1 = PolicyHelper.getSession(solrCloudManager);
-    assertEquals(sessionRef.getSessionWrapper().getCreateTime(), s1.getCreateTime());
     PolicyHelper.SessionWrapper[] s2 = new PolicyHelper.SessionWrapper[1];
     AtomicLong secondTime = new AtomicLong();
     Thread thread = new Thread(() -> {
@@ -1746,7 +1746,6 @@ public class TestPolicy extends SolrTestCaseJ4 {
     thread.start();
     Thread.sleep(50);
     long beforeReturn = System.nanoTime();
-    assertEquals(s1.getCreateTime(), sessionRef.getSessionWrapper().getCreateTime());
     s1.returnSession(s1.get());
     assertEquals(1, s1.getRefCount());
     thread.join();
@@ -1758,13 +1757,138 @@ public class TestPolicy extends SolrTestCaseJ4 {
     assertEquals(2, s1.getRefCount());
 
     s2[0].release();
-    assertFalse(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEFAULT_INSTANCE);
+    assertFalse(sessionRef.isEmpty());
     s1.release();
-    assertTrue(sessionRef.getSessionWrapper() == PolicyHelper.SessionWrapper.DEFAULT_INSTANCE);
+    assertTrue(sessionRef.isEmpty());
 
 
   }
 
+  @Test
+  public void testMultiSessionsCache() throws IOException, InterruptedException {
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    Map<String, Map> nodeValues = (Map<String, Map>) Utils.fromJSONString(" {" +
+        "    'node1':{ 'node':'10.0.0.4:8987_solr', 'cores':1 }," +
+        "    'node2':{ 'node':'10.0.0.4:8989_solr', 'cores':1 }," +
+        "    'node3':{ 'node':'10.0.0.4:7574_solr', 'cores':1 }" +
+        "}");
+
+    @SuppressWarnings({"rawtypes"})
+    Map policies = (Map) Utils.fromJSONString("{ 'cluster-preferences': [{ 'minimize': 'cores', 'precision': 1}]}");
+
+    @SuppressWarnings({"unchecked"})
+    AutoScalingConfig config = new AutoScalingConfig(policies);
+    final SolrCloudManager solrCloudManager = new DelegatingCloudManager(getSolrCloudManager(nodeValues, clusterState)) {
+      @Override
+      public DistribStateManager getDistribStateManager() {
+        return delegatingDistribStateManager(config);
+      }
+    };
+
+    PolicyHelper.SessionWrapper s1 = PolicyHelper.getSession(solrCloudManager);
+    // Must skip the wait time otherwise test takes a few seconds to run (and s1 is not returned now anyway so no point waiting).
+    PolicyHelper.SessionWrapper s2 = PolicyHelper.getSession(solrCloudManager, false);
+    // Got two sessions, they are different
+    assertNotSame(s1, s2);
+
+    // Done COMPUTING with first session, it can be reused
+    s1.returnSession(s1.get());
+
+    PolicyHelper.SessionWrapper s3 = PolicyHelper.getSession(solrCloudManager);
+    // First session indeed reused when a new session is requested
+    assertSame(s3, s1);
+
+    // Done COMPUTING with second session, it can be reused
+    s2.returnSession(s2.get());
+
+    PolicyHelper.SessionWrapper s4 = PolicyHelper.getSession(solrCloudManager);
+    // Second session indeed reused when a new session is requested
+    assertSame(s4, s2);
+
+    s4.returnSession(s4.get());
+    s4.release();
+
+    s2.release();
+
+    s3.returnSession(s3.get());
+    s3.release();
+
+    PolicyHelper.SessionRef sessionRef = (PolicyHelper.SessionRef) solrCloudManager.getObjectCache().get(PolicyHelper.SessionRef.class.getName());
+
+    // First session not yet released so is still in the cache
+    assertFalse(sessionRef.isEmpty());
+
+    s1.release();
+
+    assertTrue(sessionRef.isEmpty());
+  }
+
+  /**
+   * Verify number of sessions allocated when parallel session requests arrive is reasonable.
+   * Test takes about 3 seconds to run.
+   */
+  @Test
+  @Slow
+  public void testMultiThreadedSessionsCache() throws IOException, InterruptedException {
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    Map<String, Map> nodeValues = (Map<String, Map>) Utils.fromJSONString(" {" +
+        "    'node1':{ 'node':'10.0.0.4:8987_solr', 'cores':1 }," +
+        "    'node2':{ 'node':'10.0.0.4:8989_solr', 'cores':1 }," +
+        "    'node3':{ 'node':'10.0.0.4:7574_solr', 'cores':1 }" +
+        "}");
+
+    @SuppressWarnings({"rawtypes"})
+    Map policies = (Map) Utils.fromJSONString("{ 'cluster-preferences': [{ 'minimize': 'cores', 'precision': 1}]}");
+
+    @SuppressWarnings({"unchecked"})
+    AutoScalingConfig config = new AutoScalingConfig(policies);
+    final SolrCloudManager solrCloudManager = new DelegatingCloudManager(getSolrCloudManager(nodeValues, clusterState)) {
+      @Override
+      public DistribStateManager getDistribStateManager() {
+        return delegatingDistribStateManager(config);
+      }
+    };
+
+    final Set<PolicyHelper.SessionWrapper> seenSessions = Sets.newHashSet();
+    final AtomicInteger completedThreads = new AtomicInteger(0);
+
+    final int COUNT_THREADS = 100;
+    Thread[] threads = new Thread[COUNT_THREADS];
+
+    for (int i = 0; i < COUNT_THREADS; i++) {
+      threads[i] = new Thread(() -> {
+        try {
+          // This thread requests a session, computes using it for 50ms then returns is, executes for 1000ms more,
+          // releases the sessions and finishes.
+          PolicyHelper.SessionWrapper session = PolicyHelper.getSession(solrCloudManager);
+          seenSessions.add(session);
+          Thread.sleep(50);
+          session.returnSession(session.get());
+          Thread.sleep(1000);
+          session.release();
+
+          completedThreads.incrementAndGet();
+        } catch (InterruptedException | IOException ignored) {
+        }
+      });
+      threads[i].start();
+    }
+
+    for (int i = 0; i < COUNT_THREADS; i++) {
+      threads[i].join(12000);
+    }
+
+    assertEquals(COUNT_THREADS, completedThreads.get());
+    // The value asserted below is somewhat arbitrary. Running locally max seen is 10, so hopefully 30 is safe.
+    // Idea is to verify we do not allocate a high number of sessions even if many concurrent session
+    // requests arrive at the same time. The session computing time is short in purpose. If it were long, it would be
+    // expected for more sessions to be allocated.
+    assertTrue("Too many sessions created: " + seenSessions.size(), seenSessions.size() < 30);
+
+    PolicyHelper.SessionRef sessionRef = (PolicyHelper.SessionRef) solrCloudManager.getObjectCache().get(PolicyHelper.SessionRef.class.getName());
+    assertTrue(sessionRef.isEmpty());
+  }
+
   private DistribStateManager delegatingDistribStateManager(AutoScalingConfig config) {
     return new DelegatingDistribStateManager(null) {
       @Override