You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2019/10/30 20:45:07 UTC

[lucene-solr] 02/11: @W-6635251 Refactor async pull code to avoid static initialized data structures from colliding in unit tests (#390)

This is an automated email from the ASF dual-hosted git repository.

yonik pushed a commit to branch jira/SOLR-13101
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit ce12e737e17225fed8d623a18fdf5ab52df825fe
Author: Andy Vuong <a....@salesforce.com>
AuthorDate: Thu Sep 26 11:58:12 2019 -0400

    @W-6635251 Refactor async pull code to avoid static initialized data structures from colliding in unit tests (#390)
    
    * Refactor async pulls to use non-static initialized pulling data structures
    * Update jenkin file to run tests on blob
---
 Jenkinsfile                                        |  0
 .../java/org/apache/solr/servlet/HttpSolrCall.java |  3 +-
 .../solr/store/blob/metadata/BlobCoreSyncer.java   | 30 ++++++-----
 .../solr/store/blob/process/BlobProcessUtil.java   |  6 ++-
 .../solr/store/blob/process/CorePullTask.java      | 54 ++++++++------------
 .../solr/store/blob/process/CorePullerFeeder.java  | 59 ++++++++++++++++++++--
 .../solr/store/blob/process/CoreSyncFeeder.java    |  6 +--
 .../solr/store/shared/SharedStoreManager.java      |  7 +++
 .../shared/SimpleSharedStoreEndToEndPullTest.java  | 52 ++++++++++---------
 9 files changed, 136 insertions(+), 81 deletions(-)

diff --git a/Jenkinsfile b/Jenkinsfile
new file mode 100644
index 0000000..e69de29
diff --git a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
index adfbc5d..56235fa 100644
--- a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
+++ b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
@@ -305,7 +305,8 @@ public class HttpSolrCall {
               if (replica != null) {
                 String coreName = replica.getCoreName();
                 String shardName = getShardName(collectionName, coreName);
-                BlobCoreSyncer.pull(coreName, shardName, collectionName, cores, true, false);
+                BlobCoreSyncer syncer = cores.getSharedStoreManager().getBlobCoreSyncer();
+                syncer.pull(coreName, shardName, collectionName, cores, true, false);
                 core = cores.getCore(coreName);
                 if (!retry) {
                   action = RETRY;
diff --git a/solr/core/src/java/org/apache/solr/store/blob/metadata/BlobCoreSyncer.java b/solr/core/src/java/org/apache/solr/store/blob/metadata/BlobCoreSyncer.java
index 520a361..b281b27 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/metadata/BlobCoreSyncer.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/metadata/BlobCoreSyncer.java
@@ -45,6 +45,8 @@ import java.lang.invoke.MethodHandles;
  */
 public class BlobCoreSyncer {
 
+    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
     /**
      * Threads wait for at most this duration before giving up on async pull to finish and returning with a PullInProgressException.
      */
@@ -70,20 +72,20 @@ public class BlobCoreSyncer {
 
     private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+    @GuardedBy("coreSyncsInFlight")
+    private int total_waiting_threads = 0;
+
     /** The shared store name for the core currently being pulled from blob. Value is collection of objects used for synchronization by all waiting threads.
      * If both the locks on this map and on a specific SyncOnPullWait in the map are needed, the lock on the map must be acquired first.
      */
     @GuardedBy("itself")
-    private static final Map<String, Collection<SyncOnPullWait>> coreSyncsInFlight = Maps.newHashMap();
-
-    @GuardedBy("coreSyncsInFlight")
-    private static int total_waiting_threads = 0;
+    private final Map<String, Collection<SyncOnPullWait>> coreSyncsInFlight = Maps.newHashMap();
 
     /**
      * @return Total number of threads across all cores waiting for their respective core to be pulled from blob store
      */
     @VisibleForTesting
-    protected static int getTotalWaitingThreads() {
+    protected int getTotalWaitingThreads() {
         synchronized (coreSyncsInFlight) {
             return total_waiting_threads;
         }
@@ -114,11 +116,11 @@ public class BlobCoreSyncer {
      * This is just a hint because as soon as the lock is released when the method returns, the status of the core could change.
      * Because of that, in method {@link #pull(PushPullData, boolean, boolean, CoreContainer)} we need to check again.
      */
-    public static boolean isEmptyCoreAwaitingPull(String coreName) {
-        return CorePullTask.isEmptyCoreAwaitingPull(coreName);
+    public boolean isEmptyCoreAwaitingPull(CoreContainer cores, String coreName) {
+      return CorePullerFeeder.isEmptyCoreAwaitingPull(cores, coreName);
     }
     
-    public static void pull(String coreName, String shardName, String collectionName, CoreContainer cores,
+    public void pull(String coreName, String shardName, String collectionName, CoreContainer cores,
         boolean waitForSearcher, boolean emptyCoreAwaitingPull) {
       // Initialize variables
       SharedShardMetadataController sharedShardMetadataController = cores.getSharedStoreManager().getSharedShardMetadataController();
@@ -183,7 +185,7 @@ public class BlobCoreSyncer {
      *
      * @throws PullInProgressException In case a thread does not wait or times out before the async pull is finished
      */
-    public static void pull(PushPullData pushPullData, boolean waitForSearcher, boolean emptyCoreAwaitingPull, CoreContainer cores) throws PullInProgressException {
+    public void pull(PushPullData pushPullData, boolean waitForSearcher, boolean emptyCoreAwaitingPull, CoreContainer cores) throws PullInProgressException {
         // Is there another thread already working on the async pull?
         final boolean pullAlreadyInProgress;
         // Indicates if thread waits for the pull to finish or too many waiters already
@@ -199,7 +201,7 @@ public class BlobCoreSyncer {
         // Only can have only one thread working on async pull of this core (and we do no logging while holding the lock)
         // Let's understand what our role and actions are while holding the global lock and then execute on them without the lock.
         synchronized (coreSyncsInFlight) {
-            if (emptyCoreAwaitingPull && !isEmptyCoreAwaitingPull(pushPullData.getCoreName())) {
+            if (emptyCoreAwaitingPull && !isEmptyCoreAwaitingPull(cores, pushPullData.getCoreName())) {
                 // Core was observed empty awaiting pull and is no longer awaiting pull. This means the pull happened.
                 return;
             }
@@ -336,7 +338,7 @@ public class BlobCoreSyncer {
      * This is called whenever core from {@link CorePullTracker} finish its async pull(successfully or unsuccessfully)
      * We use this to notify all waiting threads for a core that their wait has ended (if there are some waiting).
      */
-    public static void finishedPull(String sharedStoreName, CoreSyncStatus status, BlobCoreMetadata blobMetadata, String message) {
+    public void finishedPull(String sharedStoreName, CoreSyncStatus status, BlobCoreMetadata blobMetadata, String message) {
         Exception pullException = null;
         final boolean isPullSuccessful = (status.isSuccess() ||
                 // Following statuses are not considered success in strictest definition of pull but for BlobSyncer
@@ -351,7 +353,7 @@ public class BlobCoreSyncer {
         notifyEndOfPull(sharedStoreName, pullException);
     }
 
-    private static void throwPullInProgressException(String corename, String msgSuffix) throws PullInProgressException {
+    private void throwPullInProgressException(String corename, String msgSuffix) throws PullInProgressException {
         String msg = SKIPPING_PULLING_CORE + " " + corename + " from blob " + msgSuffix;
         log.info(msg);
         // Note that longer term, this is the place where we could decide that if the async
@@ -366,7 +368,7 @@ public class BlobCoreSyncer {
      * Also serves the purpose of being a memory barrier so that the waiting threads can check their SyncOnPullWait instances
      * for updates.
      */
-    private static void notifyEndOfPull(String sharedStoreName, Exception e) {
+    private void notifyEndOfPull(String sharedStoreName, Exception e) {
         final Collection<SyncOnPullWait> collectionOfWaiters = pullEnded(sharedStoreName);
         if (collectionOfWaiters != null) {
             for (SyncOnPullWait w : collectionOfWaiters) {
@@ -381,7 +383,7 @@ public class BlobCoreSyncer {
      * Cleans up the core from bookkeeping related to in-progress pulls and returns the collection of waiters for that core.
      * Collection of returned waiters could be null as well.
      */
-    private static Collection<SyncOnPullWait> pullEnded(String sharedStoreName) {
+    private Collection<SyncOnPullWait> pullEnded(String sharedStoreName) {
         final Collection<SyncOnPullWait> collectionOfWaiters;
         synchronized (coreSyncsInFlight) {
             // Note that threads waiting for the pull to finish have references on their individual SyncOnPullWait instances,
diff --git a/solr/core/src/java/org/apache/solr/store/blob/process/BlobProcessUtil.java b/solr/core/src/java/org/apache/solr/store/blob/process/BlobProcessUtil.java
index 164818d..7df8aa4 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/process/BlobProcessUtil.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/process/BlobProcessUtil.java
@@ -44,6 +44,10 @@ public class BlobProcessUtil {
     runningFeeder = initializeCorePullerFeeder(cpf);
   }
   
+  public CorePullerFeeder getCorePullerFeeder() {
+    return runningFeeder;
+  }
+  
   /**
    * Shutdown background blob puller process
    */
@@ -71,10 +75,10 @@ public class BlobProcessUtil {
    */
   private void shutdownCorePullerFeeder() {
     final CoreSyncFeeder rf = runningFeeder;
-    runningFeeder = null;
     if (rf != null) {
       log.info("Shutting down CorePullerFeeder");
       rf.close();
     }
+    runningFeeder = null;
   }
 }
diff --git a/solr/core/src/java/org/apache/solr/store/blob/process/CorePullTask.java b/solr/core/src/java/org/apache/solr/store/blob/process/CorePullTask.java
index e8af082..6450135 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/process/CorePullTask.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/process/CorePullTask.java
@@ -43,8 +43,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Throwables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 
 /**
  * Code for pulling updates on a specific core to the Blob store. see {@CorePushTask} for the push version of this.
@@ -60,52 +58,42 @@ public class CorePullTask implements DeduplicatingList.Deduplicatable<String> {
    */
   private static final long MIN_RETRY_DELAY_MS = 20000;
 
-  /** Cores currently being pulled and timestamp of pull start (to identify stuck ones in logs) */
-  private static final HashMap<String, Long> pullsInFlight = Maps.newHashMap();
-
-  /** Cores unknown locally that got created as part of the pull process but for which no data has been pulled yet
-   * from Blob store. If we ignore this transitory state, these cores can be accessed locally and simply look empty.
-   * We'd rather treat threads attempting to access such cores like threads attempting to access an unknown core and
-   * do a pull (or more likely wait for an ongoing pull to finish).<p>
-   *
-   * When this lock has to be taken as well as {@link #pullsInFlight}, then {@link #pullsInFlight} has to be taken first.
-   * Reading this set implies acquiring the monitor of the set (as if @GuardedBy("itself")), but writing to the set
-   * additionally implies holding the {@link #pullsInFlight}. This guarantees that while {@link #pullsInFlight}
-   * is held, no element in the set is changing.
-   */
-  private static final Set<String> coresCreatedNotPulledYet = Sets.newHashSet();
-
   private final CoreContainer coreContainer;
   private final PullCoreInfo pullCoreInfo;
+  
+  /**
+   * Data structures injected as dependencies that track the core pulls occurring
+   * in flight and the cores that have been created and not pulled. These should
+   * be passed in via a constructor from CorePullerFeeder where they are defined
+   * and are unique per CorePullerFeeder (itself a singleton).
+   */
+  private final HashMap<String, Long> pullsInFlight;
+  private final Set<String> coresCreatedNotPulledYet;
+  
   private final long queuedTimeMs;
   private int attempts;
   private long lastAttemptTimestamp;
   private final PullCoreCallback callback;
 
-  CorePullTask(CoreContainer coreContainer, PullCoreInfo pullCoreInfo, PullCoreCallback callback) {
-    this(coreContainer, pullCoreInfo, System.currentTimeMillis(), 0, 0L, callback);
+  CorePullTask(CoreContainer coreContainer, PullCoreInfo pullCoreInfo, PullCoreCallback callback,
+      HashMap<String, Long> pullsInFlight, Set<String> coresCreatedNotPulledYet) {
+    this(coreContainer, pullCoreInfo, System.currentTimeMillis(), 0, 0L, 
+        callback, pullsInFlight, coresCreatedNotPulledYet);
   }
 
   private CorePullTask(CoreContainer coreContainer, PullCoreInfo pullCoreInfo, long queuedTimeMs, int attempts,
-      long lastAttemptTimestamp, PullCoreCallback callback) {
+      long lastAttemptTimestamp, PullCoreCallback callback, HashMap<String, Long> pullsInFlight, 
+      Set<String> coresCreatedNotPulledYet) {
     this.coreContainer = coreContainer;
     this.pullCoreInfo = pullCoreInfo;
     this.queuedTimeMs = queuedTimeMs;
     this.attempts = attempts;
     this.lastAttemptTimestamp = lastAttemptTimestamp;
     this.callback = callback;
+    this.pullsInFlight = pullsInFlight;
+    this.coresCreatedNotPulledYet = coresCreatedNotPulledYet;
   }
-
-  /**
-   * Returns a _hint_ that the given core might be locally empty because it is awaiting pull from Blob store.
-   * This is just a hint because as soon as the lock is released when the method returns, the status of the core could change.
-   */
-  public static boolean isEmptyCoreAwaitingPull(String corename) {
-    synchronized (coresCreatedNotPulledYet) {
-      return coresCreatedNotPulledYet.contains(corename);
-    }
-  }
-
+  
   /**
    * Needed for the {@link CorePullTask} to be used in a {@link DeduplicatingList}.
    */
@@ -164,7 +152,7 @@ public class CorePullTask implements DeduplicatingList.Deduplicatable<String> {
       // We merge the tasks.
       return new CorePullTask(task1.coreContainer, mergedPullCoreInfo,
           Math.min(task1.queuedTimeMs, task2.queuedTimeMs), mergedAttempts, mergedLatAttemptsTimestamp,
-          task1.callback);
+          task1.callback, task1.pullsInFlight, task1.coresCreatedNotPulledYet);
     }
   }
 
@@ -303,7 +291,7 @@ public class CorePullTask implements DeduplicatingList.Deduplicatable<String> {
 
       // The following call can fail if blob is corrupt (in non trivial ways, trivial ways are identified by other cases)
       // pull was successful
-      if(isEmptyCoreAwaitingPull(pullCoreInfo.getCoreName())){
+      if (CorePullerFeeder.isEmptyCoreAwaitingPull(coreContainer, pullCoreInfo.getCoreName())) {
         // the javadoc for pulledBlob suggests that it is only meant to be called if we pulled from scratch
         // therefore only limiting this call when we created the local core for this pull ourselves
         // BlobTransientLog.get().getCorruptCoreTracker().pulledBlob(pullCoreInfo.coreName, blobMetadata);
diff --git a/solr/core/src/java/org/apache/solr/store/blob/process/CorePullerFeeder.java b/solr/core/src/java/org/apache/solr/store/blob/process/CorePullerFeeder.java
index b950d67..50bdff2 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/process/CorePullerFeeder.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/process/CorePullerFeeder.java
@@ -17,6 +17,8 @@
 package org.apache.solr.store.blob.process;
 
 import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Set;
 
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.store.blob.client.BlobCoreMetadata;
@@ -26,6 +28,9 @@ import org.apache.solr.store.blob.util.DeduplicatingList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
 /**
  * A pull version of {@link CoreSyncFeeder} then will continually ({@link #feedTheMonsters()}) to load up a work queue (
  * {@link #pullTaskQueue}) with such tasks {@link CorePullTask} to keep the created threads busy :) The tasks will be
@@ -35,19 +40,54 @@ import org.slf4j.LoggerFactory;
 public class CorePullerFeeder extends CoreSyncFeeder {
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private final CorePullTask.PullCoreCallback callback;
 
-  protected final DeduplicatingList<String, CorePullTask> pullTaskQueue;
   protected static final String PULLER_THREAD_PREFIX = "puller";
 
   private static final int numPullerThreads = 5; // TODO : make configurable
 
+  private final CorePullTask.PullCoreCallback callback;
+
+  protected final DeduplicatingList<String, CorePullTask> pullTaskQueue;
+
+  /**
+   * Cores currently being pulled and timestamp of pull start (to identify stuck ones in logs)
+   *
+   * Note, it is the client's responsibility to synchronize accesses
+   */
+  private final HashMap<String, Long> pullsInFlight = Maps.newHashMap();
+
+  /** Cores unknown locally that got created as part of the pull process but for which no data has been pulled yet
+   * from Blob store. If we ignore this transitory state, these cores can be accessed locally and simply look empty.
+   * We'd rather treat threads attempting to access such cores like threads attempting to access an unknown core and
+   * do a pull (or more likely wait for an ongoing pull to finish).<p>
+   *
+   * When this lock has to be taken as well as {@link #pullsInFlight}, then {@link #pullsInFlight} has to be taken first.
+   * Reading this set implies acquiring the monitor of the set (as if @GuardedBy("itself")), but writing to the set
+   * additionally implies holding the {@link #pullsInFlight}. This guarantees that while {@link #pullsInFlight}
+   * is held, no element in the set is changing.
+   *
+   * Note, it is the client's responsibility to synchronize accesses
+   */
+  private final Set<String> coresCreatedNotPulledYet = Sets.newHashSet();
+
   protected CorePullerFeeder(CoreContainer cores) {
     super(cores, numPullerThreads);
     this.pullTaskQueue = new DeduplicatingList<>(ALMOST_MAX_WORKER_QUEUE_SIZE, new CorePullTask.PullTaskMerger());
     this.callback = new CorePullResult();
   }
 
+  /**
+   * Returns a _hint_ that the given core might be locally empty because it is awaiting pull from Blob store.
+   * This is just a hint because as soon as the lock is released when the method returns, the status of the core could change.
+   */
+  public static boolean isEmptyCoreAwaitingPull(CoreContainer cores, String corename) {
+    CorePullerFeeder cpf = cores.getSharedStoreManager().getBlobProcessManager().getCorePullerFeeder();
+    Set<String> coresCreatedNotPulledYet = cpf.getCoresCreatedNotPulledYet();
+    synchronized (coresCreatedNotPulledYet) {
+      return coresCreatedNotPulledYet.contains(corename);
+    }
+  }
+
   @Override
   public Runnable getSyncer() {
     return new CorePullerThread(this, pullTaskQueue);
@@ -62,6 +102,14 @@ public class CorePullerFeeder extends CoreSyncFeeder {
     return callback;
   }
 
+  protected HashMap<String, Long> getPullsInFlight() {
+    return pullsInFlight;
+  }
+
+  protected Set<String> getCoresCreatedNotPulledYet() {
+    return coresCreatedNotPulledYet;
+  }
+
   @Override
   void feedTheMonsters() throws InterruptedException {
     CorePullTracker tracker = cores.getSharedStoreManager().getCorePullTracker();
@@ -73,7 +121,7 @@ public class CorePullerFeeder extends CoreSyncFeeder {
       PullCoreInfo pci = tracker.getCoreToPull();
 
       // Add the core to the list consumed by the thread doing the actual work
-      CorePullTask pt = new CorePullTask(cores, pci, getCorePullTaskCallback());
+      CorePullTask pt = new CorePullTask(cores, pci, getCorePullTaskCallback(), pullsInFlight, coresCreatedNotPulledYet);
       pullTaskQueue.addDeduplicated(pt, /* isReenqueue */ false);
       syncsEnqueuedSinceLastLog++;
 
@@ -95,7 +143,7 @@ public class CorePullerFeeder extends CoreSyncFeeder {
    * deduplicated on core name (the same core requiring two pulls from Blob will only be recorded one if the first
    * pull has not been processed yet).
    */
-  static class PullCoreInfo extends PushPullData implements DeduplicatingList.Deduplicatable<String> {
+  public static class PullCoreInfo extends PushPullData implements DeduplicatingList.Deduplicatable<String> {
 
     private final boolean waitForSearcher;
     private final boolean createCoreIfAbsent;
@@ -182,7 +230,8 @@ public class CorePullerFeeder extends CoreSyncFeeder {
           log.warn(String.format("Pulling core %s failed. Giving up. Last status=%s attempts=%s . %s",
               pullCoreInfo.getSharedStoreName(), status, pullTask.getAttempts(), message == null ? "" : message));
         }
-        BlobCoreSyncer.finishedPull(pullCoreInfo.getSharedStoreName(), status, blobMetadata, message);
+        BlobCoreSyncer syncer = cores.getSharedStoreManager().getBlobCoreSyncer();
+        syncer.finishedPull(pullCoreInfo.getSharedStoreName(), status, blobMetadata, message);
       } catch (InterruptedException ie) {
         close();
         throw ie;
diff --git a/solr/core/src/java/org/apache/solr/store/blob/process/CoreSyncFeeder.java b/solr/core/src/java/org/apache/solr/store/blob/process/CoreSyncFeeder.java
index 820b4ee..c9cef36 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/process/CoreSyncFeeder.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/process/CoreSyncFeeder.java
@@ -17,17 +17,16 @@
 package org.apache.solr.store.blob.process;
 
 import java.io.Closeable;
+import java.lang.invoke.MethodHandles;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.solr.core.CoreContainer;
-
 //import com.force.commons.util.concurrent.NamedThreadFactory; difference?
 import org.apache.lucene.util.NamedThreadFactory;
+import org.apache.solr.core.CoreContainer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.lang.invoke.MethodHandles;
 
 /**
  * A {@link Runnable} that will start a set of threads {@link CorePullerThread} to process tasks
@@ -131,6 +130,7 @@ public abstract class CoreSyncFeeder implements Runnable, Closeable {
             this.executionThread = null; // race to set to null but ok to try to interrupt twice
             log.info(String.format("Closing CoreSyncFeeder; interrupting execution thread %s.", thread.getName()));
             thread.interrupt();
+            
         } else {
             log.warn("Closing CoreSyncFeeder before any syncer thread was started. Weird.");
         }
diff --git a/solr/core/src/java/org/apache/solr/store/shared/SharedStoreManager.java b/solr/core/src/java/org/apache/solr/store/shared/SharedStoreManager.java
index 3dd525a..8a17f2b 100644
--- a/solr/core/src/java/org/apache/solr/store/shared/SharedStoreManager.java
+++ b/solr/core/src/java/org/apache/solr/store/shared/SharedStoreManager.java
@@ -17,6 +17,7 @@
 package org.apache.solr.store.shared;
 
 import org.apache.solr.cloud.ZkController;
+import org.apache.solr.store.blob.metadata.BlobCoreSyncer;
 import org.apache.solr.store.blob.process.BlobDeleteManager;
 import org.apache.solr.store.blob.process.BlobProcessUtil;
 import org.apache.solr.store.blob.process.CorePullTracker;
@@ -38,11 +39,13 @@ public class SharedStoreManager {
   private BlobDeleteManager blobDeleteManager;
   private BlobProcessUtil blobProcessUtil;
   private CorePullTracker corePullTracker;
+  private BlobCoreSyncer blobCoreSyncer;
   
   public SharedStoreManager(ZkController controller) {
     zkController = controller;
     // initialize BlobProcessUtil with the SharedStoreManager for background processes to be ready
     blobProcessUtil = new BlobProcessUtil(zkController.getCoreContainer());
+    blobCoreSyncer = new BlobCoreSyncer();
   }
   
   @VisibleForTesting
@@ -106,4 +109,8 @@ public class SharedStoreManager {
     corePullTracker = new CorePullTracker();
     return corePullTracker ;
   }
+  
+  public BlobCoreSyncer getBlobCoreSyncer() {
+    return blobCoreSyncer;
+  }
 }
\ No newline at end of file
diff --git a/solr/core/src/test/org/apache/solr/store/shared/SimpleSharedStoreEndToEndPullTest.java b/solr/core/src/test/org/apache/solr/store/shared/SimpleSharedStoreEndToEndPullTest.java
index 4a7921e..2c1bd7a 100644
--- a/solr/core/src/test/org/apache/solr/store/shared/SimpleSharedStoreEndToEndPullTest.java
+++ b/solr/core/src/test/org/apache/solr/store/shared/SimpleSharedStoreEndToEndPullTest.java
@@ -75,23 +75,21 @@ public class SimpleSharedStoreEndToEndPullTest extends SolrCloudSharedStoreTestC
     setupCluster(2);
     CloudSolrClient cloudClient = cluster.getSolrClient();
     
-    // setup the test harnesses
-    Map<String, CountDownLatch> cdlMap = new HashMap<>();
+    // this map tracks the async pull queues per solr process
+    Map<String, Map<String, CountDownLatch>> solrProcessesTaskTracker = new HashMap<>();
     
-    CountDownLatch latch1 = new CountDownLatch(1);
     JettySolrRunner solrProcess1 = cluster.getJettySolrRunner(0);
     CoreStorageClient storageClient1 = setupLocalBlobStoreClient(sharedStoreRootPath, DEFAULT_BLOB_DIR_NAME);
     setupTestSharedClientForNode(getBlobStorageProviderTestInstance(storageClient1), solrProcess1);
-    configureTestBlobProcessForNode(solrProcess1, setupCallback(latch1));
+    Map<String, CountDownLatch> asyncPullLatches1 = configureTestBlobProcessForNode(solrProcess1);
     
-    CountDownLatch latch2 = new CountDownLatch(1);
     JettySolrRunner solrProcess2 = cluster.getJettySolrRunner(1);
     CoreStorageClient storageClient2 = setupLocalBlobStoreClient(sharedStoreRootPath, DEFAULT_BLOB_DIR_NAME);
     setupTestSharedClientForNode(getBlobStorageProviderTestInstance(storageClient2), solrProcess2);
-    configureTestBlobProcessForNode(solrProcess2, setupCallback(latch2));
+    Map<String, CountDownLatch> asyncPullLatches2 = configureTestBlobProcessForNode(solrProcess2);
     
-    cdlMap.put(solrProcess1.getNodeName(), latch1);
-    cdlMap.put(solrProcess2.getNodeName(), latch2);
+    solrProcessesTaskTracker.put(solrProcess1.getNodeName(), asyncPullLatches1);
+    solrProcessesTaskTracker.put(solrProcess2.getNodeName(), asyncPullLatches2);
     
     String collectionName = "sharedCollection";
     int maxShardsPerNode = 1;
@@ -138,14 +136,20 @@ public class SimpleSharedStoreEndToEndPullTest extends SolrCloudSharedStoreTestC
       assertEquals(1, resp.getResults().getNumFound());
       assertEquals("1", (String) resp.getResults().get(0).getFieldValue("id"));
       
+      // we want to wait until the pull completes so set up a count down latch for the follower's
+      // core that we'll wait until pull finishes for
+      CountDownLatch latch = new CountDownLatch(1);
+      Map<String, CountDownLatch> asyncPullTasks = solrProcessesTaskTracker.get(followerReplica.getNodeName());
+      asyncPullTasks.put(followerReplica.getCoreName(), latch);
+      
       // query the follower directly to trigger the pull, this query should yield no results
-      // as we don't wait
+      // as it returns immediately 
       followerDirectClient = getHttpSolrClient(followerReplica.getBaseUrl() + "/" + followerReplica.getCoreName());
       resp = followerDirectClient.query(params);
       assertEquals(0, resp.getResults().getNumFound());
       
       // wait until pull is finished
-      CountDownLatch latch = cdlMap.get(followerReplica.getNodeName());
+      //CountDownLatch latch = asyncPullTasks.get(followerReplica.getCoreName());
       assertTrue(latch.await(120, TimeUnit.SECONDS));
       
       // do another query to verify we've pulled everything
@@ -164,26 +168,26 @@ public class SimpleSharedStoreEndToEndPullTest extends SolrCloudSharedStoreTestC
     }
   }
   
-  private BlobProcessUtil configureTestBlobProcessForNode(JettySolrRunner runner, PullCoreCallback callback) {
-    CorePullerFeeder cpf = new CorePullerFeeder(runner.getCoreContainer()) {
+  private Map<String, CountDownLatch> configureTestBlobProcessForNode(JettySolrRunner runner) {
+    Map<String, CountDownLatch> asyncPullTracker = new HashMap<>();
+    
+    CorePullerFeeder cpf = new CorePullerFeeder(runner.getCoreContainer()) {  
       @Override
       protected CorePullTask.PullCoreCallback getCorePullTaskCallback() {
-        return callback;
+        return new PullCoreCallback() {
+          @Override
+          public void finishedPull(CorePullTask pullTask, BlobCoreMetadata blobMetadata, CoreSyncStatus status,
+              String message) throws InterruptedException {
+            CountDownLatch latch = asyncPullTracker.get(pullTask.getPullCoreInfo().getCoreName());
+            latch.countDown();
+          }
+        };
       }
     };
+    
     BlobProcessUtil testUtil = new BlobProcessUtil(runner.getCoreContainer(), cpf);
     setupTestBlobProcessUtilForNode(testUtil, runner);
-    return testUtil;
+    return asyncPullTracker;
   }
   
-  private PullCoreCallback setupCallback(CountDownLatch latch) {
-    return new PullCoreCallback() {
-      @Override
-      public void finishedPull(CorePullTask pullTask, BlobCoreMetadata blobMetadata, CoreSyncStatus status,
-          String message) throws InterruptedException {
-        assertTrue(status.isSuccess());
-        latch.countDown();
-      }
-    };
-  }
 }