You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2019/10/30 20:45:14 UTC

[lucene-solr] 09/11: @W-6587412 Concurrent indexing, pull and pushes (#403)

This is an automated email from the ASF dual-hosted git repository.

yonik pushed a commit to branch jira/SOLR-13101
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 581f468f9914ce2488201efbed42fd43dc44b481
Author: Bilal Waheed <mw...@salesforce.com>
AuthorDate: Mon Oct 21 17:34:39 2019 -0400

    @W-6587412 Concurrent indexing, pull and pushes (#403)
    
    * @W-6587412 Concurrent indexing, pull and pushes
    
    -Pulls are protected from each other, local indexing and pushes.
    -Local indexing and pushes are protected from pulls.
    -Pushes are protected from each other.
    -Implicit hard commit on each indexing batch.
    -Unlike before we only ensure existence of zookeeper metadata node once.
    -In steady state we only push to shared store and update zk. we do not read from either zk or shared store. we rely on cached values with soft guarantee of being in sync.
    -We take away soft guarantee if conditional update to zk fails. Next indexing request then sync with zk and shared store first.
    -Sub shard push is handled for happy path only. Non-happy path are not understood yet. If they happen they should be detected and an exception thrown.
    
    -Core pulls deduped on core name instead of share shard name. Since at the local level we are dealing with core not a shard.
    -CorePullTask does not need pullsInFlight anymore because of new pull write lock.
    -PushPullData is cleaned up of version/metadata info. The accompanying task merge logic is gone(not needed).
    -SharedShardMetadataController cache is removed(was not enough and not needed anymore).
    -SharedMetadataResolutionResult has a separate collection of files to be deleted. Files to be pulled cannot play that role(with conflict handling pull collection can be everything blob has to offer)
    
    -take snapshot of active segment files in one go, so that later on pushing time they are not removed underneath us. this is important to not fail indexing attempts unnecessarily.
    -Reserve a commit point before capturing it.
    -Added guards and comments around lock contention and fairness
    -Added optimization where if we have nothing to push we can conclude without creating expensive ServerSideMetadata.
    -Removed the readFromSharedStoreIfNecessary call from commit processing, since it is not needed there.
---
 .../solr/handler/admin/RequestApplyUpdatesOp.java  |  35 +-
 .../java/org/apache/solr/servlet/HttpSolrCall.java |  21 +-
 .../solr/store/blob/metadata/BlobCoreSyncer.java   |  35 +-
 .../solr/store/blob/metadata/CorePushPull.java     |  92 +---
 .../solr/store/blob/metadata/PushPullData.java     |  61 +--
 .../store/blob/metadata/ServerSideMetadata.java    | 197 ++++++---
 .../blob/metadata/SharedStoreResolutionUtil.java   |  22 +-
 .../solr/store/blob/process/CorePullTask.java      | 149 ++++---
 .../solr/store/blob/process/CorePullTracker.java   |  44 +-
 .../solr/store/blob/process/CorePullerFeeder.java  |  88 +---
 .../solr/store/blob/process/CorePullerThread.java  |  91 +++-
 .../apache/solr/store/blob/process/CorePusher.java | 187 ++++++--
 .../solr/store/blob/process/CoreSyncStatus.java    |   2 -
 .../solr/store/blob/process/CoreUpdateTracker.java |  20 -
 .../solr/store/blob/util/BlobStoreUtils.java       | 118 ++---
 .../shared/SharedCoreConcurrencyController.java    | 351 +++++++++++++++
 .../solr/store/shared/SharedStoreManager.java      |  17 +-
 .../metadata/SharedShardMetadataController.java    | 117 ++---
 .../processor/DistributedZkUpdateProcessor.java    | 146 +++++-
 .../solr/store/blob/SharedStorageSplitTest.java    |   2 +-
 .../solr/store/blob/metadata/CorePushPullTest.java |  31 +-
 .../metadata/SharedStoreResolutionUtilTest.java    |  25 +-
 .../blob/process/PullMergeDeduplicationTest.java   |  96 +---
 .../solr/store/blob/util/BlobStoreUtilsTest.java   |  26 +-
 .../store/shared/SharedCoreConcurrencyTest.java    | 488 +++++++++++++++++++++
 .../shared/SimpleSharedStoreEndToEndPullTest.java  |  12 +-
 .../shared/SimpleSharedStoreEndToEndPushTest.java  |   9 +-
 .../store/shared/SolrCloudSharedStoreTestCase.java |  10 +-
 .../SharedShardMetadataControllerTest.java         | 159 +------
 29 files changed, 1758 insertions(+), 893 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/handler/admin/RequestApplyUpdatesOp.java b/solr/core/src/java/org/apache/solr/handler/admin/RequestApplyUpdatesOp.java
index 9277c08..3658ed0 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/RequestApplyUpdatesOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/RequestApplyUpdatesOp.java
@@ -17,16 +17,22 @@
 
 package org.apache.solr.handler.admin;
 
+import java.io.IOException;
 import java.util.concurrent.Future;
 
 import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.api.collections.Assign;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.SolrCore;
+import org.apache.solr.store.blob.client.BlobCoreMetadataBuilder;
 import org.apache.solr.store.blob.process.CoreUpdateTracker;
+import org.apache.solr.store.shared.SharedCoreConcurrencyController;
+import org.apache.solr.store.shared.metadata.SharedShardMetadataController;
+import org.apache.solr.store.shared.metadata.SharedShardMetadataController.SharedShardVersionMetadata;
 import org.apache.solr.update.UpdateLog;
 
 class RequestApplyUpdatesOp implements CoreAdminHandler.CoreAdminOp {
@@ -76,18 +82,39 @@ class RequestApplyUpdatesOp implements CoreAdminHandler.CoreAdminOp {
   }
 
 
-  private void pushToSharedStore(SolrCore core) {
+  private void pushToSharedStore(SolrCore core) throws IOException {
     // Push the index to blob storage before we set our state to ACTIVE
     CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
     if (cloudDesc.getReplicaType().equals(Replica.Type.SHARED)) {
       CoreContainer cc = core.getCoreContainer();
       CoreUpdateTracker sharedCoreTracker = new CoreUpdateTracker(cc);
 
+      String collectionName = cloudDesc.getCollectionName();
+      String shardName = cloudDesc.getShardId();
+      String coreName = core.getName();
+      SharedShardMetadataController metadataController = cc.getSharedStoreManager().getSharedShardMetadataController();
+      // creates the metadata node
+      metadataController.ensureMetadataNodeExists(collectionName, shardName);
+      SharedShardVersionMetadata shardVersionMetadata = metadataController.readMetadataValue(collectionName, shardName);
+      // TODO: We should just be initialized to a default value since this is a new shard.  
+      //       As of now we are only taking care of basic happy path. We still need to evaluate what will happen
+      //       if a split is abandoned because of failure(e.g. long GC pause) and is re-tried?
+      //       How to make sure our re-attempt wins even when the ghost of previous attempt resumes and intervenes?
+      if (!SharedShardMetadataController.METADATA_NODE_DEFAULT_VALUE.equals(shardVersionMetadata.getMetadataSuffix())) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "New sub shard has zk information that is not default");
+      }
+
+      // sync local cache with zk's default information i.e. equivalent of no-op pull 
+      SharedCoreConcurrencyController concurrencyController = cc.getSharedStoreManager().getSharedCoreConcurrencyController();
+      String sharedBlobName = Assign.buildSharedShardName(collectionName, shardName);
+      concurrencyController.updateCoreVersionMetadata(collectionName, shardName, coreName,
+          shardVersionMetadata, BlobCoreMetadataBuilder.buildEmptyCoreMetadata(sharedBlobName));
+
       sharedCoreTracker.persistShardIndexToSharedStore(
           cc.getZkController().zkStateReader.getClusterState(),
-          cloudDesc.getCollectionName(),
-          cloudDesc.getShardId(),
-          core.getName());
+          collectionName,
+          shardName,
+          coreName);
     }
   }
 }
diff --git a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
index c03a743..f4d495a 100644
--- a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
+++ b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
@@ -71,6 +71,7 @@ import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.MapSolrParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.params.UpdateParams;
 import org.apache.solr.common.util.CommandOperation;
 import org.apache.solr.common.util.ContentStream;
 import org.apache.solr.common.util.JsonSchemaValidator;
@@ -349,14 +350,19 @@ public class HttpSolrCall {
           solrReq = parser.parse(core, path, req);
         }
 
-        // don't enqueue a pull on updates as those will already trigger their own synchronous pulls
-        if (cores.isZooKeeperAware() && !doesPathContainUpdate()) {
+        if (cores.isZooKeeperAware()) {
           String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
           DocCollection collection = getCollection(collectionName);
           boolean belongsToSharedCollection =
               (collection != null) ? collection.getSharedIndex() : false;
           if (belongsToSharedCollection) {
-            enqueuePullFromSharedStore(core);
+            if (doesPathContainUpdate()) {
+              // Shared collection only supports hard commits therefore we always ensure one 
+              addCommitIfAbsent();
+              // don't enqueue a pull on updates as those will already trigger their own synchronous pulls
+            } else {
+              enqueuePullFromSharedStore(core);
+            }
           }
         }
 
@@ -373,6 +379,15 @@ public class HttpSolrCall {
     action = PASSTHROUGH;
   }
 
+  private void addCommitIfAbsent() {
+    Boolean currentValue = solrReq.getParams().getBool(UpdateParams.COMMIT);
+    if (currentValue == null || currentValue == false) {
+      ModifiableSolrParams params = new ModifiableSolrParams(solrReq.getParams());
+      params.set(UpdateParams.COMMIT, "true");
+      solrReq.setParams(params);
+    }
+  }
+
   /**
    * Attempt to initiate a pull from the shared store. It's the client responsibility to ensure
    * only Shared replicas use this method.
diff --git a/solr/core/src/java/org/apache/solr/store/blob/metadata/BlobCoreSyncer.java b/solr/core/src/java/org/apache/solr/store/blob/metadata/BlobCoreSyncer.java
index c352e50..a97c864 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/metadata/BlobCoreSyncer.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/metadata/BlobCoreSyncer.java
@@ -16,29 +16,29 @@
  */
 package org.apache.solr.store.blob.metadata;
 
-import java.util.*;
-import java.util.concurrent.*;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import com.amazonaws.annotation.GuardedBy;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
-
-import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
-//import net.jcip.annotations.GuardedBy;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.CoreContainer;
-import org.apache.solr.store.blob.client.BlobCoreMetadata;
 import org.apache.solr.store.blob.PullInProgressException;
-import org.apache.solr.store.blob.process.*;
-import org.apache.solr.store.blob.util.BlobStoreUtils;
+import org.apache.solr.store.blob.client.BlobCoreMetadata;
+import org.apache.solr.store.blob.process.CorePullTracker;
+import org.apache.solr.store.blob.process.CorePullerFeeder;
+import org.apache.solr.store.blob.process.CoreSyncStatus;
 import org.apache.solr.store.shared.metadata.SharedShardMetadataController;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.lang.invoke.MethodHandles;
 
 /**
  * Class to sync between local and blob copies of a core using
@@ -133,18 +133,6 @@ public class BlobCoreSyncer {
           log.warn("Pulling shard " + shardName + " that is inactive!");
         }
         log.info("Pulling for collection=" + collectionName + " shard=" + shardName + " coreName=" + coreName);
-        // creates the metadata node if it doesn't exist
-        sharedShardMetadataController.ensureMetadataNodeExists(collectionName, shardName);
-
-        /*
-         * Get the metadataSuffix value from ZooKeeper or from a cache if an entry exists for the
-         * given collection and shardName. If the leader has already changed, the conditional update
-         * later will fail and invalidate the cache entry if it exists.
-         */
-        VersionedData data = sharedShardMetadataController.readMetadataValue(collectionName, shardName,
-            /* readFromCache */ false);
-        Map<String, String> nodeUserData = (Map<String, String>) Utils.fromJSON(data.getData());
-        String metadataSuffix = nodeUserData.get(SharedShardMetadataController.SUFFIX_NODE_NAME);
 
         String sharedShardName = (String) shard.get(ZkStateReader.SHARED_SHARD_NAME);
 
@@ -153,9 +141,6 @@ public class BlobCoreSyncer {
           .setShardName(shardName)
           .setCoreName(coreName)
           .setSharedStoreName(sharedShardName)
-          .setLastReadMetadataSuffix(metadataSuffix)
-          .setNewMetadataSuffix(BlobStoreUtils.generateMetadataSuffix())
-          .setZkVersion(data.getVersion())
           .build();
         pull(pushPullData, waitForSearcher, emptyCoreAwaitingPull, cores);
       } catch (Exception ex) {
diff --git a/solr/core/src/java/org/apache/solr/store/blob/metadata/CorePushPull.java b/solr/core/src/java/org/apache/solr/store/blob/metadata/CorePushPull.java
index a96c5a5..817f230 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/metadata/CorePushPull.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/metadata/CorePushPull.java
@@ -12,6 +12,7 @@ import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.io.IOUtils;
 import org.apache.lucene.codecs.CodecUtil;
 import org.apache.lucene.store.Directory;
@@ -31,11 +32,10 @@ import org.apache.solr.store.blob.metadata.ServerSideMetadata.CoreFileData;
 import org.apache.solr.store.blob.metadata.SharedStoreResolutionUtil.SharedMetadataResolutionResult;
 import org.apache.solr.store.blob.process.BlobDeleteManager;
 import org.apache.solr.store.blob.util.BlobStoreUtils;
+import org.apache.solr.store.shared.metadata.SharedShardMetadataController;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * Class pushing updates from the local core to the Blob Store and pulling updates from Blob store to local core.
  * This class knows about the Solr core directory structure and can translate it into file system abstractions since that's
@@ -80,13 +80,6 @@ public class CorePushPull {
     }
 
     /**
-     * Calls {@link #pushToBlobStore(long, int)}  with current epoch time and attempt no 0.  
-     */
-    public BlobCoreMetadata pushToBlobStore() throws Exception {
-       return pushToBlobStore(System.currentTimeMillis(), 0);
-    }
-    
-    /**
      * Writes to the Blob store all the files that should be written to it, then updates and writes the {@link BlobCoreMetadata}.
      * After that call, the Blob store is fully updated for the core.<p>
      * In case of exception, it means that the new {@link BlobCoreMetadata} has not been written which means that the old
@@ -96,12 +89,12 @@ public class CorePushPull {
      * This method (need to verify this is indeed the case!) can be used either to push small updates from the local core
      * to Blob or to completely overwrite the Blob content for the core with the local content.
      * 
-     * @param requestQueuedTimeMs epoch time in milliseconds when the push request was queued(meaningful in case of async pushing)
-     *                            only used for logging purposes
-     * @param attempt 0 based attempt number (meaningful in case of pushing with retry mechanism)
-     *                only used for logging purposes 
+     * @param currentMetadataSuffix suffix of the core.metadata file corresponding to {@link CorePushPull#blobMetadata}
+     *                              TODO: there is an existing todo with delete logic where this parameter is consumed  
+     *                                    with that add a metadataSuffix field to {@link BlobCoreMetadata} 
+     * @param newMetadataSuffix suffix of the new core.metadata file to be created as part of this push
      */
-    public BlobCoreMetadata pushToBlobStore(long requestQueuedTimeMs, int attempt) throws Exception {
+    public BlobCoreMetadata pushToBlobStore(String currentMetadataSuffix, String newMetadataSuffix) throws Exception {
       long startTimeMs = System.currentTimeMillis();
       try {
         SolrCore solrCore = container.getCore(pushPullData.getCoreName());
@@ -113,39 +106,9 @@ public class CorePushPull {
           // Creating the new BlobCoreMetadata as a modified clone of the existing one
           BlobCoreMetadataBuilder bcmBuilder = new BlobCoreMetadataBuilder(blobMetadata, solrServerMetadata.getGeneration());
 
-          // First copy index files over to a temp directory and then push to blob store from there. 
-          // This is to avoid holding a lock over index directory involving network operation.
-          //
-          // Ideally, we don't need to lock source index directory to make temp copy because...:
-          // -all index files are write-once (http://mail-archives.apache.org/mod_mbox/lucene-java-user/201509.mbox/%3C00c001d0ed85$ee839b20$cb8ad160$@thetaphi.de%3E)
-          // -there is a possibility of a missing segment file because of merge but that could have happened even before reaching this point.
-          //  Whatever the timing maybe that will result in a failure (exception) when copying to temp and that will abort the push
-          // -segment file deletion(because of merge) and copying to temp happening concurrently should be ok as well since delete will wait 
-          //  for copy to finish (https://stackoverflow.com/questions/2028874/what-happens-to-an-open-file-handle-on-linux-if-the-pointed-file-gets-moved-del)
-          // ...But SfdcFSDirectoryFactory, that is based off CachingDirectoryFactory, can return a cached instance of Directory and
-          // that cache is only based off path and is rawLockType agnostic. Therefore passing "none" as rawLockType will not be honored if same path
-          // was accessed before with some other rawLockType until CachingDirectoryFactory#doneWithDirectory is called. 
-          // There is an overload that takes forceNew boolean and supposed to be returning new instance but CachingDirectoryFactory does not seem to honor that. 
-          // It is not worth fighting that therefore we lock the source index directory before copying to temp directory. If this much locking turns out
-          // to be problematic we can revisit this.
-          // 
-          // And without source locking we really don't need temp directory, one reason to still might have it is to avoid starting a push to blob store 
-          // that can potentially be stopped in the middle because of a concurrent merge deleting the segment files being pushed. 
-
-          // create a temp directory (within the core local folder).
-          String tempIndexDirPath = solrCore.getDataDir() + "index.push." + System.nanoTime();
-          Directory tempIndexDir = solrCore.getDirectoryFactory().get(tempIndexDirPath, DirectoryFactory.DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
+          Directory snapshotIndexDir = solrCore.getDirectoryFactory().get(solrServerMetadata.getSnapshotDirPath(), DirectoryFactory.DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
           try {
-            Directory indexDir = solrCore.getDirectoryFactory().get(solrCore.getIndexDir(), DirectoryFactory.DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
-            try {
-              // copy index files to the temp directory
-              for (CoreFileData cfd : resolvedMetadataResult.getFilesToPush()) {
-                copyFileToDirectory(indexDir, cfd.getFileName(), tempIndexDir);
-              }
-            } finally {
-              solrCore.getDirectoryFactory().release(indexDir);
-            }
-            
+
             /*
              * Removing from the core metadata the files that should no longer be there.
              * 
@@ -162,20 +125,21 @@ public class CorePushPull {
              * 
              * The deletion logic will move out of this class in the future and make this less confusing. 
              */
-            for (BlobCoreMetadata.BlobFile d : resolvedMetadataResult.getFilesToPull()) {
+            for (BlobCoreMetadata.BlobFile d : resolvedMetadataResult.getFilesToDelete()) {
                 bcmBuilder.removeFile(d);
                 BlobCoreMetadata.BlobFileToDelete bftd = new BlobCoreMetadata.BlobFileToDelete(d, System.currentTimeMillis());
                 bcmBuilder.addFileToDelete(bftd);
             }
             
             // add the old core.metadata file to delete
-            if (!pushPullData.getLastReadMetadataSuffix().equals("-1")) {
+            if (!currentMetadataSuffix.equals(SharedShardMetadataController.METADATA_NODE_DEFAULT_VALUE)) {
               // TODO This may be inefficient but we'll likely remove this when CorePushPull is refactored to have deletion elsewhere
+              //      could be added to resolvedMetadataResult#getFilesToDelete()
               ToFromJson<BlobCoreMetadata> converter = new ToFromJson<>();
               String json = converter.toJson(blobMetadata);
               int bcmSize = json.getBytes().length;
               
-              String blobCoreMetadataName = BlobStoreUtils.buildBlobStoreMetadataName(pushPullData.getLastReadMetadataSuffix());
+              String blobCoreMetadataName = BlobStoreUtils.buildBlobStoreMetadataName(currentMetadataSuffix);
               String coreMetadataPath = blobMetadata.getSharedBlobName() + "/" + blobCoreMetadataName;
               // so far checksum is not used for metadata file
               BlobCoreMetadata.BlobFileToDelete bftd = new BlobCoreMetadata.BlobFileToDelete("", coreMetadataPath, bcmSize, BlobCoreMetadataBuilder.UNDEFINED_VALUE, System.currentTimeMillis());
@@ -186,21 +150,13 @@ public class CorePushPull {
             // But this is untrue/totally false/misleading. SnapPuller has File all over.
             for (CoreFileData cfd : resolvedMetadataResult.getFilesToPush()) {
               // Sanity check that we're talking about the same file (just sanity, Solr doesn't update files so should never be different)
-              assert cfd.getFileSize() == tempIndexDir.fileLength(cfd.getFileName());
+              assert cfd.getFileSize() == snapshotIndexDir.fileLength(cfd.getFileName());
 
-              String blobPath = pushFileToBlobStore(coreStorageClient, tempIndexDir, cfd.getFileName(), cfd.getFileSize());
+              String blobPath = pushFileToBlobStore(coreStorageClient, snapshotIndexDir, cfd.getFileName(), cfd.getFileSize());
               bcmBuilder.addFile(new BlobCoreMetadata.BlobFile(cfd.getFileName(), blobPath, cfd.getFileSize(), cfd.getChecksum()));
             }
           } finally {
-            try {
-              // Remove temp directory
-              solrCore.getDirectoryFactory().doneWithDirectory(tempIndexDir);
-              solrCore.getDirectoryFactory().remove(tempIndexDir);
-            } catch (Exception e) {
-              log.warn("Cannot remove temp directory " + tempIndexDirPath, e);
-            } finally {
-              solrCore.getDirectoryFactory().release(tempIndexDir);
-            }
+            solrCore.getDirectoryFactory().release(snapshotIndexDir);
           }
           
           // delete what we need
@@ -208,7 +164,7 @@ public class CorePushPull {
           
           BlobCoreMetadata newBcm = bcmBuilder.build();
 
-          String blobCoreMetadataName = BlobStoreUtils.buildBlobStoreMetadataName(pushPullData.getNewMetadataSuffix());          
+          String blobCoreMetadataName = BlobStoreUtils.buildBlobStoreMetadataName(newMetadataSuffix);          
           coreStorageClient.pushCoreMetadata(blobMetadata.getSharedBlobName(), blobCoreMetadataName, newBcm);
           return newBcm;
         } finally {
@@ -219,7 +175,7 @@ public class CorePushPull {
         long bytesTransferred = resolvedMetadataResult.getFilesToPush().stream().mapToLong(cfd -> cfd.getFileSize()).sum();
         
         // todo correctness stuff
-        logBlobAction("PUSH", filesAffected, bytesTransferred, requestQueuedTimeMs, attempt, startTimeMs);
+        logBlobAction("PUSH", filesAffected, bytesTransferred, startTimeMs, 0, startTimeMs);
       }
     }
 
@@ -444,18 +400,6 @@ public class CorePushPull {
         return blobPath;
     }
 
-    private void removeTempDirectory(SolrCore solrCore, String tempDirPath, Directory tempDir) throws IOException {
-        try {
-            // Remove temp directory
-            solrCore.getDirectoryFactory().doneWithDirectory(tempDir);
-            solrCore.getDirectoryFactory().remove(tempDir);
-        } catch (Exception e) {
-            log.warn("Cannot remove temp directory " + tempDirPath, e);
-        } finally {
-            solrCore.getDirectoryFactory().release(tempDir);
-        }
-    }
-    
     /**
      * Logs soblb line for push or pull action 
      * TODO: This is for callers of this method.
diff --git a/solr/core/src/java/org/apache/solr/store/blob/metadata/PushPullData.java b/solr/core/src/java/org/apache/solr/store/blob/metadata/PushPullData.java
index d79316f..1723488 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/metadata/PushPullData.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/metadata/PushPullData.java
@@ -16,8 +16,6 @@
  */
 package org.apache.solr.store.blob.metadata;
 
-import org.apache.solr.store.shared.metadata.SharedShardMetadataController;
-
 /*
  * Class containing information needed to complete a push or a pull to a shared store in 
  * a Shared Collection.
@@ -33,36 +31,13 @@ public class PushPullData {
    */
   protected String sharedStoreName;
   
-  /**
-   * Unique value of the metadataSuffix for the last persisted shard index in the shared store.
-   * If this value is equal to {@link SharedShardMetadataController#METADATA_NODE_DEFAULT_VALUE} then 
-   * the shard index this instance is associated with has not yet been successfully persisted
-   */
-  protected String lastReadMetadataSuffix;
-  
-  /*
-   * Unique value of the metadataSuffix that will be appended to the core metadata file name if this 
-   * PushPullData instance is association with a push operation to the shared store.
-   */
-  protected String newMetadataSuffix;
-  
-  /*
-   * Value originating from a ZooKeeper node used to handle conditionally and safely update the 
-   * core.metadata file written to the shared store.
-   */
-  protected int version;
-  
   public PushPullData() {}
   
-  public PushPullData(String collectionName, String shardName, String coreName, String sharedStoreName, String lastReadMetadataSuffix,
-      String newMetadataSuffix, int version) {
+  public PushPullData(String collectionName, String shardName, String coreName, String sharedStoreName) {
     this.collectionName = collectionName;
     this.shardName = shardName;
     this.coreName = coreName;
     this.sharedStoreName = sharedStoreName;
-    this.lastReadMetadataSuffix = lastReadMetadataSuffix;
-    this.newMetadataSuffix = newMetadataSuffix;
-    this.version = version;
   }
 
   public String getCoreName() {
@@ -81,23 +56,10 @@ public class PushPullData {
     return sharedStoreName;
   }
 
-  public String getLastReadMetadataSuffix() {
-    return lastReadMetadataSuffix;
-  }
-  
-  public String getNewMetadataSuffix() {
-    return newMetadataSuffix;
-  }
-  
-  public int getZkVersion() {
-    return version;
-  }
-  
   @Override
   public String toString() {
-    return "collectionName=" + collectionName + " shardName=" + shardName + " coreName=" + 
-        sharedStoreName + " coreName=" + coreName + " lastReadMetadataSuffix=" + lastReadMetadataSuffix +
-        " newMetadataSuffix=" + newMetadataSuffix + " lastReadZkVersion=" + version;
+    return "collectionName=" + collectionName + " shardName=" + shardName + " sharedStoreName=" + 
+        sharedStoreName + " coreName=" + coreName;
   }
 
   public static class Builder {
@@ -124,24 +86,9 @@ public class PushPullData {
       return this;
     }
     
-    public Builder setLastReadMetadataSuffix(String lastReadMetadataSuffix) {
-      data.lastReadMetadataSuffix = lastReadMetadataSuffix;
-      return this;
-    }
-    
-    public Builder setNewMetadataSuffix(String newMetadataSuffix) {
-      data.newMetadataSuffix = newMetadataSuffix;
-      return this;
-    }
-    
-    public Builder setZkVersion(int version) {
-      data.version = version;
-      return this;
-    }
-    
     public PushPullData build() {
       return new PushPullData(data.collectionName, data.shardName, data.coreName, 
-          data.sharedStoreName, data.lastReadMetadataSuffix, data.newMetadataSuffix, data.version);
+          data.sharedStoreName);
     }    
   }
 
diff --git a/solr/core/src/java/org/apache/solr/store/blob/metadata/ServerSideMetadata.java b/solr/core/src/java/org/apache/solr/store/blob/metadata/ServerSideMetadata.java
index 88d76ad..241cad7 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/metadata/ServerSideMetadata.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/metadata/ServerSideMetadata.java
@@ -2,39 +2,46 @@ package org.apache.solr.store.blob.metadata;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.nio.file.NoSuchFileException;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.Arrays;
-import java.util.List;
 import java.util.Objects;
 
+import com.google.common.collect.ImmutableCollection;
+import com.google.common.collect.ImmutableCollection.Builder;
+import com.google.common.collect.ImmutableSet;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.lucene.codecs.CodecUtil;
-import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
+import org.apache.solr.common.SolrException;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.DirectoryFactory;
+import org.apache.solr.core.IndexDeletionPolicyWrapper;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.store.blob.client.BlobCoreMetadata;
 import org.apache.solr.store.blob.client.BlobException;
-
-import com.google.common.collect.ImmutableCollection;
-import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Object capturing the metadata of a shard index on a Solr node. 
- * 
+ *
  * This works in conjunction with {@link BlobCoreMetadata} to find the differences between 
  * local (Solr node) and remote (Blob store) commit point for a core.<p>
- * 
+ *
  * This object is somewhere between {@link org.apache.lucene.index.IndexCommit} and {@link org.apache.lucene.index.SegmentInfos}
  * and by implementing it separately we can add additional metadata to it as needed.
  */
 public class ServerSideMetadata {
-  
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static final int MAX_ATTEMPTS_TO_CAPTURE_COMMIT_POINT = 5;
   /**
    * Files composing the core. They are are referenced from the core's current commit point's segments_N file
    * which is ALSO included in this collection.
@@ -63,83 +70,155 @@ public class ServerSideMetadata {
   private final SolrCore core;
   private final String coreName;
   private final CoreContainer container;
+  /**
+   * path of snapshot directory if we are supposed to take snapshot of the active segment files, otherwise, null
+   */
+  private final String snapshotDirPath;
+
+  public ServerSideMetadata(String coreName, CoreContainer container) throws Exception {
+    this(coreName, container, false);
+  }
 
   /**
    * Given a core name, builds the local metadata
-   * 
-   * 
+   *
+   * @param takeSnapshot whether to take snapshot of active segments or not. If true then the snapshot directory path can be 
+   *                     found through {@link #getSnapshotDirPath()}.
+   *
    * @throws Exception if core corresponding to <code>coreName</code> can't be found.
    */
-  public ServerSideMetadata(String coreName, CoreContainer container) throws Exception {
+  public ServerSideMetadata(String coreName, CoreContainer container, boolean takeSnapshot) throws Exception {
     this.coreName = coreName;
     this.container = container;
     this.core = container.getCore(coreName);
 
     if (core == null) {
-      throw new Exception("Can't find core " + coreName);
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can't find core " + coreName);
     }
 
     try {
-      IndexCommit latestCommit = core.getDeletionPolicy().getLatestCommit();
-      if (latestCommit == null) {
-        throw new BlobException("Core " + coreName + " has no available commit point");
-      }
+      Directory coreDir = core.getDirectoryFactory().get(core.getIndexDir(), DirectoryFactory.DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);
+      try {
 
-      generation = latestCommit.getGeneration();
+        if (takeSnapshot) {
+          snapshotDirPath = core.getDataDir() + "index.snapshot." + System.nanoTime();
+        } else {
+          snapshotDirPath = null;
+        }
 
-      // Work around possible bug returning same file multiple times by using a set here
-      // See org.apache.solr.handler.ReplicationHandler.getFileList()
-      ImmutableCollection.Builder<CoreFileData> latestCommitBuilder = new ImmutableSet.Builder<>();
-      ImmutableCollection.Builder<CoreFileData> allCommitsBuilder;
+        ImmutableCollection.Builder<CoreFileData> latestCommitBuilder;
+        IndexCommit latestCommit;
+        int attempt = 1;
+        // we don't have an atomic way of capturing a commit point i.e. there is a slight chance of losing files between 
+        // getting a latest commit and reserving it. Therefore, we try to capture commit point in a loop with maximum 
+        // number of attempts. 
+        while (true) {
+          try {
+            // Work around possible bug returning same file multiple times by using a set here
+            // See org.apache.solr.handler.ReplicationHandler.getFileList()
+            latestCommitBuilder = new ImmutableSet.Builder<>();
+            latestCommit = tryCapturingLatestCommit(coreDir, latestCommitBuilder);
+            break;
+          } catch (FileNotFoundException | NoSuchFileException ex) {
+            attempt++;
+            if (attempt > MAX_ATTEMPTS_TO_CAPTURE_COMMIT_POINT) {
+              throw ex;
+            }
+            log.info(String.format("Failed to capture commit point: core=%s attempt=%s reason=%s",
+                coreName, attempt, ex.getMessage()));
+          }
+        }
+
+        generation = latestCommit.getGeneration();
+        latestCommitFiles = latestCommitBuilder.build();
 
-      Directory coreDir = core.getDirectoryFactory().get(core.getIndexDir(), DirectoryFactory.DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);
-      try {
         // Capture now the hash and verify again if we need to pull content from the Blob store into this directory,
         // to make sure there are no local changes at the same time that might lead to a corruption in case of interaction
         // with the download.
+        // TODO: revise with "design assumptions around pull pipeline" mentioned in allCommits TODO below
         directoryHash = getSolrDirectoryHash(coreDir);
 
-        buildCommitFiles(coreDir, latestCommit, latestCommitBuilder);
-
-        // A note on listCommits says that it does not guarantee consistent results if a commit is in progress.
-        // But in blob context we serialize commits and pulls by proper locking therefore we should be good here.
-        List<IndexCommit> allCommits = DirectoryReader.listCommits(coreDir);
-
-        // optimization:  normally we would only be dealing with one commit point. In that case just reuse latest commit files builder.
-        if (allCommits.size() > 1 ){
-          allCommitsBuilder = new ImmutableSet.Builder<>();
-          for (IndexCommit commit: allCommits) {
-            buildCommitFiles(coreDir, commit, allCommitsBuilder);
-          }
-        } else {
-          // we should always have a commit point as verified in the beginning of this method.
-          assert allCommits.size() == 1 && allCommits.get(0).equals(latestCommit);
-          allCommitsBuilder = latestCommitBuilder;
-        }
+        allCommitsFiles = latestCommitFiles;
+        // TODO: allCommits was added to detect special cases where inactive file segments can potentially conflict
+        //       with whats in shared store. But given the recent understanding of semantics around index directory locks
+        //       we need to revise our design assumptions around pull pipeline, including this one.
+        //       Disabling this for now so that unreliability around introspection of older commits 
+        //       might not get in the way of steady state indexing.
+//        // A note on listCommits says that it does not guarantee consistent results if a commit is in progress.
+//        // But in blob context we serialize commits and pulls by proper locking therefore we should be good here.
+//        List<IndexCommit> allCommits = DirectoryReader.listCommits(coreDir);
+//
+//        // we should always have a commit point as verified in the beginning of this method.
+//        assert (allCommits.size() > 1) || (allCommits.size() == 1 && allCommits.get(0).equals(latestCommit));
+//
+//        // optimization:  normally we would only be dealing with one commit point. In that case just reuse latest commit files builder.
+//        ImmutableCollection.Builder<CoreFileData> allCommitsBuilder = latestCommitBuilder;
+//        if (allCommits.size() > 1) {
+//          allCommitsBuilder = new ImmutableSet.Builder<>();
+//          for (IndexCommit commit : allCommits) {
+//            // no snapshot for inactive segments files
+//            buildCommitFiles(coreDir, commit, allCommitsBuilder, /* snapshotDir */ null);
+//          }
+//        }
+//        allCommitsFiles = allCommitsBuilder.build();
       } finally {
         core.getDirectoryFactory().release(coreDir);
       }
-      latestCommitFiles = latestCommitBuilder.build();
-      allCommitsFiles = allCommitsBuilder.build();
     } finally {
       core.close();
     }
   }
 
+  private IndexCommit tryCapturingLatestCommit(Directory coreDir, Builder<CoreFileData> latestCommitBuilder) throws BlobException, IOException {
+    IndexDeletionPolicyWrapper deletionPolicy = core.getDeletionPolicy();
+    IndexCommit latestCommit = deletionPolicy.getLatestCommit();
+    if (latestCommit == null) {
+      throw new BlobException("Core " + core.getName() + " has no available commit point");
+    }
+
+    deletionPolicy.saveCommitPoint(latestCommit.getGeneration());
+    try {
+      buildCommitFiles(coreDir, latestCommit, latestCommitBuilder);
+      return latestCommit;
+    } finally {
+      deletionPolicy.releaseCommitPoint(latestCommit.getGeneration());
+    }
+  }
+
   private void buildCommitFiles(Directory coreDir, IndexCommit commit, ImmutableCollection.Builder<CoreFileData> builder) throws IOException {
-    for (String fileName : commit.getFileNames()) {
-      // Note we add here all segment related files as well as the commit point's segments_N file
-      // Note commit points do not contain lock (write.lock) files.
-      try (final IndexInput indexInput = coreDir.openInput(fileName, IOContext.READONCE)) {
-        long length = indexInput.length();
-        long checksum = CodecUtil.retrieveChecksum(indexInput);
-        builder.add(new CoreFileData(fileName, length, checksum));
+    Directory snapshotDir = null;
+    try {
+      if (snapshotDirPath != null) {
+        snapshotDir = core.getDirectoryFactory().get(snapshotDirPath, DirectoryFactory.DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);
+      }
+      for (String fileName : commit.getFileNames()) {
+        // Note we add here all segment related files as well as the commit point's segments_N file
+        // Note commit points do not contain lock (write.lock) files.
+        try (final IndexInput indexInput = coreDir.openInput(fileName, IOContext.READONCE)) {
+          long length = indexInput.length();
+          long checksum = CodecUtil.retrieveChecksum(indexInput);
+          builder.add(new CoreFileData(fileName, length, checksum));
+        }
+        if (snapshotDir != null) {
+          // take snapshot of the file
+          snapshotDir.copyFrom(coreDir, fileName, fileName, DirectoryFactory.IOCONTEXT_NO_CACHE);
+        }
+      }
+    } catch (Exception ex) {
+      if (snapshotDir != null) {
+        core.getDirectoryFactory().doneWithDirectory(snapshotDir);
+        core.getDirectoryFactory().remove(snapshotDir);
+      }
+      throw ex;
+    } finally {
+      if (snapshotDir != null) {
+        core.getDirectoryFactory().release(snapshotDir);
       }
     }
   }
 
   public String getCoreName() {
-      return this.coreName;
+    return this.coreName;
   }
 
   public CoreContainer getCoreContainer() {
@@ -162,6 +241,10 @@ public class ServerSideMetadata {
     return this.allCommitsFiles;
   }
 
+  public String getSnapshotDirPath() {
+    return snapshotDirPath;
+  }
+
   /**
    * Returns <code>true</code> if the contents of the directory passed into this method is identical to the contents of
    * the directory of the Solr core of this instance, taken at instance creation time.<p>
@@ -206,9 +289,9 @@ public class ServerSideMetadata {
   @Override
   public String toString() {
     return "collectionName=" + core.getCoreDescriptor().getCollectionName() +
-      " shardName=" + core.getCoreDescriptor().getCloudDescriptor().getShardId() +
-      " coreName=" + core.getName() +
-      " generation=" + generation;
+        " shardName=" + core.getCoreDescriptor().getCloudDescriptor().getShardId() +
+        " coreName=" + core.getName() +
+        " generation=" + generation;
   }
 
   /**
@@ -231,12 +314,12 @@ public class ServerSideMetadata {
     public boolean equals(Object o) {
       if (this == o) return true;
       if (o == null || getClass() != o.getClass()) return false;
-  
+
       CoreFileData other = (CoreFileData) o;
-  
+
       return Objects.equals(fileName, other.fileName) &&
-        Objects.equals(fileSize, other.fileSize) &&
-        Objects.equals(checksum, other.checksum);
+          Objects.equals(fileSize, other.fileSize) &&
+          Objects.equals(checksum, other.checksum);
     }
 
     public String getFileName() {
diff --git a/solr/core/src/java/org/apache/solr/store/blob/metadata/SharedStoreResolutionUtil.java b/solr/core/src/java/org/apache/solr/store/blob/metadata/SharedStoreResolutionUtil.java
index b01ee94..50a2c5d 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/metadata/SharedStoreResolutionUtil.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/metadata/SharedStoreResolutionUtil.java
@@ -30,6 +30,8 @@ public class SharedStoreResolutionUtil {
     private final Collection<CoreFileData> filesToPush;
     // blob files needed to be pulled
     private final Collection<BlobFile> filesToPull;
+    // blob files needed to be deleted
+    private final Collection<BlobFile> filesToDelete;
     // Whether the local index contents conflict with contents to be pulled from blob. If they do we will move the
     // core to new index dir when pulling blob contents
     // Two cases:
@@ -39,7 +41,7 @@ public class SharedStoreResolutionUtil {
     
     
     public SharedMetadataResolutionResult(Collection<CoreFileData> filesToPush, 
-        Collection<BlobFile> filesToPull, boolean localConflictingWithBlob) {
+        Collection<BlobFile> filesToPull, Collection<BlobFile> filesToDelete, boolean localConflictingWithBlob) {
       if (filesToPush == null) {
         this.filesToPush = Collections.emptySet();
       } else {
@@ -52,6 +54,12 @@ public class SharedStoreResolutionUtil {
         this.filesToPull = filesToPull;
       }
 
+      if (filesToDelete == null) {
+        this.filesToDelete = Collections.emptySet();
+      } else {
+        this.filesToDelete = filesToDelete;
+      }
+
       this.localConflictingWithBlob = localConflictingWithBlob;
     }
     
@@ -62,7 +70,11 @@ public class SharedStoreResolutionUtil {
     public Collection<BlobFile> getFilesToPull() {
       return filesToPull;
     }
-    
+
+    public Collection<BlobFile> getFilesToDelete() {
+      return filesToDelete;
+    }
+
     public boolean isLocalConflictingWithBlob(){
       return localConflictingWithBlob;
     }
@@ -108,7 +120,7 @@ public class SharedStoreResolutionUtil {
         || distant.getBlobFiles().length == 0) {
       // The shard index data does not exist on the shared store. All we can do is push. 
       // We've computed localFilesMissingOnBlob above, and blobFilesMissingLocally is empty as it should be.
-      return new SharedMetadataResolutionResult(localFilesMissingOnBlob.values(), blobFilesMissingLocally.values(), false);
+      return new SharedMetadataResolutionResult(localFilesMissingOnBlob.values(), blobFilesMissingLocally.values(), blobFilesMissingLocally.values(), false);
     }
     
     // Verify we find one and only one segments_N file to download from Blob.
@@ -136,7 +148,7 @@ public class SharedStoreResolutionUtil {
     if (local == null) {
       // The shard index data does not exist locally. All we can do is pull.  
       // We've computed blobFilesMissingLocally and localFilesMissingOnBlob is empty as it should be.
-      return new SharedMetadataResolutionResult(localFilesMissingOnBlob.values(), blobFilesMissingLocally.values(), false);
+      return new SharedMetadataResolutionResult(localFilesMissingOnBlob.values(), blobFilesMissingLocally.values(), blobFilesMissingLocally.values(), false);
     }
 
     boolean localConflictingWithBlob = false;
@@ -170,7 +182,7 @@ public class SharedStoreResolutionUtil {
     // resolver to produce list of files to be pulled from blob and list of files to be pulled(read copied) from local index directory.
     // But that would have unnecessarily convoluted the design of this resolver.
     Collection<BlobFile> filesToPull = localConflictingWithBlob ? Arrays.asList(distant.getBlobFiles()) : blobFilesMissingLocally.values();
-    return new SharedMetadataResolutionResult(localFilesMissingOnBlob.values(), filesToPull, localConflictingWithBlob);
+    return new SharedMetadataResolutionResult(localFilesMissingOnBlob.values(), filesToPull, blobFilesMissingLocally.values(), localConflictingWithBlob);
   }
   
   /** Identify the segments_N file in Blob files. */
diff --git a/solr/core/src/java/org/apache/solr/store/blob/process/CorePullTask.java b/solr/core/src/java/org/apache/solr/store/blob/process/CorePullTask.java
index 0e1260d..05dcd84 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/process/CorePullTask.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/process/CorePullTask.java
@@ -23,6 +23,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Throwables;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
@@ -30,6 +32,7 @@ import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.store.blob.client.BlobCoreMetadata;
+import org.apache.solr.store.blob.client.BlobCoreMetadataBuilder;
 import org.apache.solr.store.blob.client.CoreStorageClient;
 import org.apache.solr.store.blob.metadata.CorePushPull;
 import org.apache.solr.store.blob.metadata.ServerSideMetadata;
@@ -39,12 +42,14 @@ import org.apache.solr.store.blob.process.CorePullerFeeder.PullCoreInfo;
 import org.apache.solr.store.blob.provider.BlobStorageProvider;
 import org.apache.solr.store.blob.util.BlobStoreUtils;
 import org.apache.solr.store.blob.util.DeduplicatingList;
+import org.apache.solr.store.shared.SharedCoreConcurrencyController;
+import org.apache.solr.store.shared.SharedCoreConcurrencyController.SharedCoreStage;
+import org.apache.solr.store.shared.SharedCoreConcurrencyController.SharedCoreVersionMetadata;
+import org.apache.solr.store.shared.metadata.SharedShardMetadataController;
+import org.apache.solr.store.shared.metadata.SharedShardMetadataController.SharedShardVersionMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Throwables;
-
 /**
  * Code for pulling updates on a specific core to the Blob store. see {@CorePushTask} for the push version of this.
  */
@@ -63,12 +68,10 @@ public class CorePullTask implements DeduplicatingList.Deduplicatable<String> {
   private final PullCoreInfo pullCoreInfo;
   
   /**
-   * Data structures injected as dependencies that track the core pulls occurring
-   * in flight and the cores that have been created and not pulled. These should
-   * be passed in via a constructor from CorePullerFeeder where they are defined
+   * Data structure injected as dependencies that track the cores that have been created and not pulled. 
+   * This should be passed in via a constructor from CorePullerFeeder where it is defined
    * and are unique per CorePullerFeeder (itself a singleton).
    */
-  private final HashMap<String, Long> pullsInFlight;
   private final Set<String> coresCreatedNotPulledYet;
   
   private final long queuedTimeMs;
@@ -76,23 +79,20 @@ public class CorePullTask implements DeduplicatingList.Deduplicatable<String> {
   private long lastAttemptTimestamp;
   private final PullCoreCallback callback;
 
-  CorePullTask(CoreContainer coreContainer, PullCoreInfo pullCoreInfo, PullCoreCallback callback,
-      HashMap<String, Long> pullsInFlight, Set<String> coresCreatedNotPulledYet) {
+  CorePullTask(CoreContainer coreContainer, PullCoreInfo pullCoreInfo, PullCoreCallback callback, Set<String> coresCreatedNotPulledYet) {
     this(coreContainer, pullCoreInfo, System.currentTimeMillis(), 0, 0L, 
-        callback, pullsInFlight, coresCreatedNotPulledYet);
+        callback, coresCreatedNotPulledYet);
   }
 
   @VisibleForTesting
   CorePullTask(CoreContainer coreContainer, PullCoreInfo pullCoreInfo, long queuedTimeMs, int attempts,
-      long lastAttemptTimestamp, PullCoreCallback callback, HashMap<String, Long> pullsInFlight, 
-      Set<String> coresCreatedNotPulledYet) {
+      long lastAttemptTimestamp, PullCoreCallback callback, Set<String> coresCreatedNotPulledYet) {
     this.coreContainer = coreContainer;
     this.pullCoreInfo = pullCoreInfo;
     this.queuedTimeMs = queuedTimeMs;
     this.attempts = attempts;
     this.lastAttemptTimestamp = lastAttemptTimestamp;
     this.callback = callback;
-    this.pullsInFlight = pullsInFlight;
     this.coresCreatedNotPulledYet = coresCreatedNotPulledYet;
   }
   
@@ -101,7 +101,7 @@ public class CorePullTask implements DeduplicatingList.Deduplicatable<String> {
    */
   @Override
   public String getDedupeKey() {
-    return this.pullCoreInfo.getSharedStoreName();
+    return this.pullCoreInfo.getCoreName();
   }
 
   /**
@@ -153,7 +153,7 @@ public class CorePullTask implements DeduplicatingList.Deduplicatable<String> {
       // We merge the tasks.
       return new CorePullTask(task1.coreContainer, mergedPullCoreInfo,
           Math.min(task1.queuedTimeMs, task2.queuedTimeMs), mergedAttempts, mergedLatAttemptsTimestamp,
-          task1.callback, task1.pullsInFlight, task1.coresCreatedNotPulledYet);
+          task1.callback, task1.coresCreatedNotPulledYet);
     }
   }
 
@@ -185,12 +185,16 @@ public class CorePullTask implements DeduplicatingList.Deduplicatable<String> {
     return this.queuedTimeMs;
   }
 
+  public CoreContainer getCoreContainer() {
+    return coreContainer;
+  }
+
   /**
    * Pulls the local core updates from the Blob store then calls the task callback to notify the
    * {@link CorePullerFeeder} of success or failure of the operation, give an indication of the reason the periodic
    * puller can decide to retry or not.
    */
-  void pullCoreFromBlob() throws InterruptedException {
+  void pullCoreFromBlob(boolean isLeaderPulling) throws InterruptedException {
     BlobCoreMetadata blobMetadata = null;
     if (coreContainer.isShutDown()) {
       this.callback.finishedPull(this, blobMetadata, CoreSyncStatus.SHUTTING_DOWN, null);
@@ -199,23 +203,6 @@ public class CorePullTask implements DeduplicatingList.Deduplicatable<String> {
       return;
     }
 
-    synchronized (pullsInFlight) {
-      Long pullInFlightTimestamp = pullsInFlight.get(pullCoreInfo.getSharedStoreName());
-      if (pullInFlightTimestamp != null) {
-        // Another pull is in progress, we'll retry later.
-        // Note we can't just cancel this pull, because the other pull might be working on a previous commit
-        // point.
-        long prevPullMs = System.currentTimeMillis() - pullInFlightTimestamp;
-        this.callback.finishedPull(this, blobMetadata, CoreSyncStatus.CONCURRENT_SYNC,
-            "Skipping core pull for " + pullCoreInfo.getSharedStoreName()
-            + " because another thread is currently pulling it (started " + prevPullMs
-            + " ms ago).");
-        return;
-      } else {
-        pullsInFlight.put(pullCoreInfo.getSharedStoreName(), System.currentTimeMillis());
-      }
-    }
-
     // Copying the non final variables so we're clean wrt the Java memory model and values do not change as we go
     // (even though we know that no other thread can be working on this CorePullTask when we handle it here).
     final int attemptsCopy = getAttempts();
@@ -228,6 +215,7 @@ public class CorePullTask implements DeduplicatingList.Deduplicatable<String> {
       }
     }
 
+    SharedCoreConcurrencyController concurrencyController = coreContainer.getSharedStoreManager().getSharedCoreConcurrencyController();
     CoreSyncStatus syncStatus = CoreSyncStatus.FAILURE;
     // Auxiliary information related to pull outcome. It can be metadata resolver message which can be null or exception detail in case of failure 
     String message = null;
@@ -236,8 +224,30 @@ public class CorePullTask implements DeduplicatingList.Deduplicatable<String> {
       BlobStorageProvider blobProvider = coreContainer.getSharedStoreManager().getBlobStorageProvider(); 
       CoreStorageClient blobClient = blobProvider.getClient();
 
+      SharedCoreVersionMetadata coreVersionMetadata = concurrencyController.getCoreVersionMetadata(pullCoreInfo.getCollectionName(),
+          pullCoreInfo.getShardName(),
+          pullCoreInfo.getCoreName());
+
+      SharedShardMetadataController metadataController = coreContainer.getSharedStoreManager().getSharedShardMetadataController();
+      SharedShardVersionMetadata shardVersionMetadata =  metadataController.readMetadataValue(pullCoreInfo.getCollectionName(), pullCoreInfo.getShardName());
+      
+      if(concurrencyController.areVersionsEqual(coreVersionMetadata, shardVersionMetadata)) {
+        // already in sync
+        this.callback.finishedPull(this, coreVersionMetadata.getBlobCoreMetadata(), CoreSyncStatus.SUCCESS_EQUIVALENT, null);
+        return;
+      } 
+      if (SharedShardMetadataController.METADATA_NODE_DEFAULT_VALUE.equals(shardVersionMetadata.getMetadataSuffix())) {
+        // no-op pull
+        BlobCoreMetadata emptyBlobCoreMetadata = BlobCoreMetadataBuilder.buildEmptyCoreMetadata(pullCoreInfo.getSharedStoreName());
+        concurrencyController.updateCoreVersionMetadata(pullCoreInfo.getCollectionName(), pullCoreInfo.getShardName(), pullCoreInfo.getCoreName(), 
+            shardVersionMetadata, emptyBlobCoreMetadata, isLeaderPulling);
+        this.callback.finishedPull(this, emptyBlobCoreMetadata, CoreSyncStatus.SUCCESS_EQUIVALENT, null);
+        return;
+      }
+
+      concurrencyController.recordState(pullCoreInfo.getCollectionName(), pullCoreInfo.getShardName(), pullCoreInfo.getCoreName(), SharedCoreStage.BlobPullStarted);
       // Get blob metadata
-      String blobCoreMetadataName = BlobStoreUtils.buildBlobStoreMetadataName(pullCoreInfo.getLastReadMetadataSuffix());
+      String blobCoreMetadataName = BlobStoreUtils.buildBlobStoreMetadataName(shardVersionMetadata.getMetadataSuffix());
       blobMetadata = blobClient.pullCoreMetadata(pullCoreInfo.getSharedStoreName(), blobCoreMetadataName);
       
       // Handle callback
@@ -264,10 +274,8 @@ public class CorePullTask implements DeduplicatingList.Deduplicatable<String> {
           // If we get to this point, we're setting the "created not pulled yet" status of the core here (only place
           // in the code where this happens) and we're clearing it in the finally below.
           // We're not leaking entries in coresCreatedNotPulledYet that might stay there forever...
-          synchronized (pullsInFlight) {
-            synchronized (coresCreatedNotPulledYet) {
-              coresCreatedNotPulledYet.add(pullCoreInfo.getSharedStoreName());
-            }
+          synchronized (coresCreatedNotPulledYet) {
+            coresCreatedNotPulledYet.add(pullCoreInfo.getSharedStoreName());
           }
           createCore(pullCoreInfo);
         } else {
@@ -288,9 +296,14 @@ public class CorePullTask implements DeduplicatingList.Deduplicatable<String> {
       if (resolutionResult.getFilesToPull().size() > 0) {
         BlobDeleteManager deleteManager = coreContainer.getSharedStoreManager().getBlobDeleteManager();
         CorePushPull cp = new CorePushPull(blobClient, deleteManager, pullCoreInfo, resolutionResult, serverMetadata, blobMetadata);
+        // TODO: we are computing/tracking attempts but we are not passing it along
         cp.pullUpdateFromBlob(/* waitForSearcher */ true);
+        concurrencyController.updateCoreVersionMetadata(pullCoreInfo.getCollectionName(), pullCoreInfo.getShardName(), pullCoreInfo.getCoreName(), 
+            shardVersionMetadata, blobMetadata, isLeaderPulling);
         syncStatus = CoreSyncStatus.SUCCESS;
       } else {
+        log.warn(String.format("Why there are no files to pull even when we do not match with the version in zk? collection=%s shard=%s core=%s",
+            pullCoreInfo.getCollectionName(), pullCoreInfo.getShardName(), pullCoreInfo.getCoreName()));
         syncStatus = CoreSyncStatus.SUCCESS_EQUIVALENT;
       }
 
@@ -308,43 +321,43 @@ public class CorePullTask implements DeduplicatingList.Deduplicatable<String> {
       message = Throwables.getStackTraceAsString(e);
       log.warn("Failed (attempt=" + attemptsCopy + ") to pull core " + pullCoreInfo.getSharedStoreName(), e);
     } finally {
-      // Remove ourselves from the in flight set before calling the callback method (just in case it takes
-      // forever)
-      synchronized (pullsInFlight) {
-        // No matter how the pull ends (success or any kind of error), we don't want to consider the core as awaiting pull,
-        // since it doesn't anymore (code is inline here rather than in a method or in notifyEndOfPull() to make
-        // it clear how coresCreatedNotPulledYet is managed).
-        synchronized (coresCreatedNotPulledYet) {
-          // TODO: Can we move this business of core creation and deletion outside of this task so that
-          //       we may not sub-optimally repeatedly create/delete core in case of reattempt of a transient pull error?
-          //       or get whether a reattempt will be made or not, and if there is a guaranteed reattempt then do not delete it
-          if (coresCreatedNotPulledYet.remove(pullCoreInfo.getSharedStoreName())) {
-            if (!syncStatus.isSuccess()) {
-              // If we created the core and we could not pull successfully then we should cleanup after ourselves by deleting it
-              // otherwise queries can incorrectly return 0 results from that core.
-              if(coreExists(pullCoreInfo.getCoreName())) {
-                try {
-                  // try to delete core within 3 minutes. In future when we time box our pull task then we 
-                  // need to make sure this value is within that bound. 
-                  // CoreDeleter.deleteCoreByName(coreContainer, pullCoreInfo.coreName, 3, TimeUnit.MINUTES);
-                  // TODO: need to migrate deleter
-                } catch (Exception ex) {
-                  // TODO: should we gack?
-                  //       can we do anything more here since we are unable to delete and we are leaving an empty core behind
-                  //       when we should not. Should we keep the core in coresCreatedNotPulledYet and try few more times
-                  //       but at some point we would have to let it go
-                  //       So may be, few more attempts here and then gack
-                  log.warn("CorePullTask successfully created local core but failed to pull it" +
-                      " and now is unable to delete that local core " + pullCoreInfo.getCoreName(), ex);
-                }
+      // No matter how the pull ends (success or any kind of error), we don't want to consider the core as awaiting pull,
+      // since it doesn't anymore (code is inline here rather than in a method or in notifyEndOfPull() to make
+      // it clear how coresCreatedNotPulledYet is managed).
+      synchronized (coresCreatedNotPulledYet) {
+        // TODO: Can we move this business of core creation and deletion outside of this task so that
+        //       we may not sub-optimally repeatedly create/delete core in case of reattempt of a transient pull error?
+        //       or get whether a reattempt will be made or not, and if there is a guaranteed reattempt then do not delete it
+        if (coresCreatedNotPulledYet.remove(pullCoreInfo.getSharedStoreName())) {
+          if (!syncStatus.isSuccess()) {
+            // If we created the core and we could not pull successfully then we should cleanup after ourselves by deleting it
+            // otherwise queries can incorrectly return 0 results from that core.
+            if (coreExists(pullCoreInfo.getCoreName())) {
+              try {
+                // try to delete core within 3 minutes. In future when we time box our pull task then we 
+                // need to make sure this value is within that bound. 
+                // CoreDeleter.deleteCoreByName(coreContainer, pullCoreInfo.coreName, 3, TimeUnit.MINUTES);
+                // TODO: need to migrate deleter
+              } catch (Exception ex) {
+                // TODO: should we gack?
+                //       can we do anything more here since we are unable to delete and we are leaving an empty core behind
+                //       when we should not. Should we keep the core in coresCreatedNotPulledYet and try few more times
+                //       but at some point we would have to let it go
+                //       So may be, few more attempts here and then gack
+                log.warn("CorePullTask successfully created local core but failed to pull it" +
+                    " and now is unable to delete that local core " + pullCoreInfo.getCoreName(), ex);
               }
             }
           }
         }
-        pullsInFlight.remove(pullCoreInfo.getSharedStoreName());
       }
     }
     this.callback.finishedPull(this, blobMetadata, syncStatus, message);
+    concurrencyController.recordState(pullCoreInfo.getCollectionName(), pullCoreInfo.getShardName(), pullCoreInfo.getCoreName(), SharedCoreStage.BlobPullFinished);
+  }
+
+  void finishedPull(BlobCoreMetadata blobCoreMetadata, CoreSyncStatus syncStatus, String message) throws InterruptedException {
+    this.callback.finishedPull(this, blobCoreMetadata, syncStatus, message);
   }
 
   /**
diff --git a/solr/core/src/java/org/apache/solr/store/blob/process/CorePullTracker.java b/solr/core/src/java/org/apache/solr/store/blob/process/CorePullTracker.java
index 6d53bc6..ecebc83 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/process/CorePullTracker.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/process/CorePullTracker.java
@@ -16,28 +16,24 @@
  */
 package org.apache.solr.store.blob.process;
 
+import javax.servlet.http.HttpServletRequest;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.Map;
 
-import javax.servlet.http.HttpServletRequest;
-
-import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.servlet.SolrRequestParsers;
 import org.apache.solr.store.blob.metadata.PushPullData;
 import org.apache.solr.store.blob.process.CorePullerFeeder.PullCoreInfo;
-import org.apache.solr.store.blob.util.BlobStoreUtils;
 import org.apache.solr.store.blob.util.DeduplicatingList;
-import org.apache.solr.store.shared.metadata.SharedShardMetadataController;
-
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.solr.store.shared.SharedCoreConcurrencyController;
+import org.apache.solr.store.shared.SharedCoreConcurrencyController.SharedCoreVersionMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,15 +66,21 @@ public class CorePullTracker {
   /**
    * If the local core is stale, enqueues it to be pulled in from blob
    * TODO: add stricter checks so that we don't pull on every request
+   *       one suggestion: 
+   *       For leaders we only need to do this once when they become leader so that 
+   *       they can pull without waiting for indexing to refresh a stale core
+   *       In other words, we should make use of 
+   *       {@link SharedCoreVersionMetadata#isSoftGuaranteeOfEquality()}
+   *       {@link SharedCoreConcurrencyController#updateCoreVersionMetadata(String, String, String, boolean)}
+   *       and knowledge of leader/follower and when they change.
+   *       
    */
   public void enqueueForPullIfNecessary(String requestPath, SolrCore core, String collectionName,
-      CoreContainer cores) throws IOException, SolrException {
+      CoreContainer cores) throws SolrException {
     // Initialize variables
     String coreName = core.getName();
     String shardName = core.getCoreDescriptor().getCloudDescriptor().getShardId();
-    SharedShardMetadataController sharedShardMetadataController = cores.getSharedStoreManager().getSharedShardMetadataController();
     DocCollection collection = cores.getZkController().getClusterState().getCollection(collectionName);
-
     Slice shard = collection.getSlicesMap().get(shardName);
     if (shard != null) {
       try {
@@ -87,18 +89,6 @@ public class CorePullTracker {
           log.warn("Enqueueing a pull for shard " + shardName + " that is inactive!");
         }
         log.info("Enqueue a pull for collection=" + collectionName + " shard=" + shardName + " coreName=" + coreName);
-        // creates the metadata node if it doesn't exist
-        sharedShardMetadataController.ensureMetadataNodeExists(collectionName, shardName);
-
-        /*
-         * Get the metadataSuffix value from ZooKeeper or from a cache if an entry exists for the 
-         * given collection and shardName. If the leader has already changed, the conditional update
-         * later will fail and invalidate the cache entry if it exists. 
-         */
-        VersionedData data = sharedShardMetadataController.readMetadataValue(collectionName, shardName, 
-            /* readFromCache */ true);
-        Map<String, String> nodeUserData = (Map<String, String>) Utils.fromJSON(data.getData());
-        String metadataSuffix = nodeUserData.get(SharedShardMetadataController.SUFFIX_NODE_NAME);
 
         String sharedShardName = (String) shard.get(ZkStateReader.SHARED_SHARD_NAME);
 
@@ -107,12 +97,9 @@ public class CorePullTracker {
             .setShardName(shardName)
             .setCoreName(coreName)
             .setSharedStoreName(sharedShardName)
-            .setLastReadMetadataSuffix(metadataSuffix)
-            .setNewMetadataSuffix(BlobStoreUtils.generateMetadataSuffix())
-            .setZkVersion(data.getVersion())
             .build();
 
-        enqueueForPullIfNecessary(requestPath, pushPullData, cores);
+        enqueueForPullIfNecessary(requestPath, pushPullData);
 
       } catch (Exception ex) {
         // wrap every thrown exception in a solr exception
@@ -125,8 +112,7 @@ public class CorePullTracker {
    * If the local core is stale, enqueues it to be pulled in from blob Note : If there is no coreName available in the
    * requestPath we simply ignore the request.
    */
-  public void enqueueForPullIfNecessary(String requestPath, PushPullData pushPullData, 
-      CoreContainer cores) throws IOException {
+  public void enqueueForPullIfNecessary(String requestPath, PushPullData pushPullData) {
     // TODO: do we need isBackgroundPullEnabled in addition to isBlobEnabled? If not we should remove this.
     // TODO: always pull for this hack - want to check if local core is up to date
     if (isBackgroundPullEnabled && pushPullData.getSharedStoreName() != null && shouldPullStale(requestPath)) {
diff --git a/solr/core/src/java/org/apache/solr/store/blob/process/CorePullerFeeder.java b/solr/core/src/java/org/apache/solr/store/blob/process/CorePullerFeeder.java
index 4f5a223..5fdfa6b 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/process/CorePullerFeeder.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/process/CorePullerFeeder.java
@@ -17,9 +17,9 @@
 package org.apache.solr.store.blob.process;
 
 import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
 import java.util.Set;
 
+import com.google.common.collect.Sets;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.store.blob.client.BlobCoreMetadata;
 import org.apache.solr.store.blob.metadata.BlobCoreSyncer;
@@ -28,9 +28,6 @@ import org.apache.solr.store.blob.util.DeduplicatingList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
 /**
  * A pull version of {@link CoreSyncFeeder} then will continually ({@link #feedTheMonsters()}) to load up a work queue (
  * {@link #pullTaskQueue}) with such tasks {@link CorePullTask} to keep the created threads busy :) The tasks will be
@@ -49,23 +46,11 @@ public class CorePullerFeeder extends CoreSyncFeeder {
 
   protected final DeduplicatingList<String, CorePullTask> pullTaskQueue;
 
-  /**
-   * Cores currently being pulled and timestamp of pull start (to identify stuck ones in logs)
-   *
-   * Note, it is the client's responsibility to synchronize accesses
-   */
-  private final HashMap<String, Long> pullsInFlight = Maps.newHashMap();
-
   /** Cores unknown locally that got created as part of the pull process but for which no data has been pulled yet
    * from Blob store. If we ignore this transitory state, these cores can be accessed locally and simply look empty.
    * We'd rather treat threads attempting to access such cores like threads attempting to access an unknown core and
    * do a pull (or more likely wait for an ongoing pull to finish).<p>
    *
-   * When this lock has to be taken as well as {@link #pullsInFlight}, then {@link #pullsInFlight} has to be taken first.
-   * Reading this set implies acquiring the monitor of the set (as if @GuardedBy("itself")), but writing to the set
-   * additionally implies holding the {@link #pullsInFlight}. This guarantees that while {@link #pullsInFlight}
-   * is held, no element in the set is changing.
-   *
    * Note, it is the client's responsibility to synchronize accesses
    */
   private final Set<String> coresCreatedNotPulledYet = Sets.newHashSet();
@@ -102,16 +87,17 @@ public class CorePullerFeeder extends CoreSyncFeeder {
     return callback;
   }
 
-  protected HashMap<String, Long> getPullsInFlight() {
-    return pullsInFlight;
-  }
-
   protected Set<String> getCoresCreatedNotPulledYet() {
     return coresCreatedNotPulledYet;
   }
 
   @Override
   void feedTheMonsters() throws InterruptedException {
+    while (cores.getSharedStoreManager() == null) {
+      // todo: Fix cyclic initialization sequence
+      // if thread starts early it will be killed since the initialization of sharedStoreManager has triggered the
+      // creation of this thread and following line will throw NPE.
+    }
     CorePullTracker tracker = cores.getSharedStoreManager().getCorePullTracker();
     final long minMsBetweenLogs = 15000;
     long lastLoggedTimestamp = 0L;
@@ -121,7 +107,7 @@ public class CorePullerFeeder extends CoreSyncFeeder {
       PullCoreInfo pci = tracker.getCoreToPull();
 
       // Add the core to the list consumed by the thread doing the actual work
-      CorePullTask pt = new CorePullTask(cores, pci, getCorePullTaskCallback(), pullsInFlight, coresCreatedNotPulledYet);
+      CorePullTask pt = new CorePullTask(cores, pci, getCorePullTaskCallback(), coresCreatedNotPulledYet);
       pullTaskQueue.addDeduplicated(pt, /* isReenqueue */ false);
       syncsEnqueuedSinceLastLog++;
 
@@ -149,24 +135,21 @@ public class CorePullerFeeder extends CoreSyncFeeder {
     private final boolean createCoreIfAbsent;
 
     PullCoreInfo(PushPullData data, boolean createCoreIfAbsent, boolean waitForSearcher) {
-      super(data.getCollectionName(), data.getShardName(), data.getCoreName(), data.getSharedStoreName(), 
-          data.getLastReadMetadataSuffix(), data.getNewMetadataSuffix(), data.getZkVersion());
+      super(data.getCollectionName(), data.getShardName(), data.getCoreName(), data.getSharedStoreName());
       this.waitForSearcher = waitForSearcher;
       this.createCoreIfAbsent = createCoreIfAbsent;
     }
 
     PullCoreInfo(String collectionName, String shardName, String coreName, String sharedStoreName,
-        String lastReadMetadataSuffix, String newMetadataSuffix, int zkVersion,
-        boolean createCoreIfAbsent, boolean waitForSearcher) {
-      super(collectionName, shardName, coreName, sharedStoreName, lastReadMetadataSuffix,
-          newMetadataSuffix, zkVersion);
+                 boolean createCoreIfAbsent, boolean waitForSearcher) {
+      super(collectionName, shardName, coreName, sharedStoreName);
       this.waitForSearcher = waitForSearcher;
       this.createCoreIfAbsent = createCoreIfAbsent;
     }
 
     @Override
     public String getDedupeKey() {
-      return sharedStoreName;
+      return coreName;
     }
 
     public boolean shouldWaitForSearcher() {
@@ -192,7 +175,7 @@ public class CorePullerFeeder extends CoreSyncFeeder {
       assert v1.getSharedStoreName().equals(v2.getSharedStoreName());
       assert v1.getDedupeKey().equals(v2.getDedupeKey());
       assert v1.getCoreName().equals(v2.getCoreName());
-      
+
       // Merging the version number here implies an ordering on the pull operation
       // enqueued as we want higher version-ed operations to be what the pulling
       // mechanisms pull off of due to presence of metadataSuffix information
@@ -201,46 +184,13 @@ public class CorePullerFeeder extends CoreSyncFeeder {
       boolean waitForSearcher = false;
       boolean createCoreIfAbsent = false;
 
-      // TODO newMetadataSuffix isn't used in the pull pipeline but the argument is nevertheless
-      // required when enqueuing a new pull and propagated downward. We should refactor 
-      // PullCoreInfo and PushPullData to make these concerns only relevant where they are needed
-      String newMetadataSuffix = null;
-      String lastReadMetadataSuffix = null;
-      int version = -1;
-      
-      // if the versions are the same then the last read metadata suffix should be the same
-      if (v1.getZkVersion() == v2.getZkVersion()) {
-        assert v1.getLastReadMetadataSuffix().equals(v2.getLastReadMetadataSuffix());
-        lastReadMetadataSuffix = v1.getLastReadMetadataSuffix();
-        // this doesn't matter which structure it comes from
-        newMetadataSuffix = v1.getNewMetadataSuffix();
-        version = v1.getZkVersion();
-        
-        // if one needs to wait then merged will have to wait as well
-        waitForSearcher = v1.waitForSearcher || v2.waitForSearcher;
-        // if one wants to create core if absent then merged will have to create as well
-        createCoreIfAbsent = v1.createCoreIfAbsent || v2.createCoreIfAbsent;
-      } else if (v1.getZkVersion() > v2.getZkVersion()) {
-        // version number increments on updates so the higher version will result in a pull
-        // from the most up-to-date state on blob
-        lastReadMetadataSuffix = v1.getLastReadMetadataSuffix();
-        newMetadataSuffix = v1.getNewMetadataSuffix();
-        version = v1.getZkVersion();
-        
-        waitForSearcher = v1.waitForSearcher;
-        createCoreIfAbsent = v1.createCoreIfAbsent;
-      } else {
-        lastReadMetadataSuffix = v2.getLastReadMetadataSuffix();
-        newMetadataSuffix = v2.getNewMetadataSuffix();
-        version = v2.getZkVersion();
-        
-        waitForSearcher = v2.waitForSearcher;
-        createCoreIfAbsent = v2.createCoreIfAbsent;
-      }
-      
-      return new PullCoreInfo(v1.getCollectionName(), v1.getShardName(), v1.getCoreName(), 
-          v1.getSharedStoreName(), lastReadMetadataSuffix, newMetadataSuffix, version, 
-          createCoreIfAbsent, waitForSearcher); 
+      // if one needs to wait then merged will have to wait as well
+      waitForSearcher = v1.waitForSearcher || v2.waitForSearcher;
+      // if one wants to create core if absent then merged will have to create as well
+      createCoreIfAbsent = v1.createCoreIfAbsent || v2.createCoreIfAbsent;
+
+      return new PullCoreInfo(v1.getCollectionName(), v1.getShardName(), v1.getCoreName(),
+          v1.getSharedStoreName(), createCoreIfAbsent, waitForSearcher);
     }
   }
 
diff --git a/solr/core/src/java/org/apache/solr/store/blob/process/CorePullerThread.java b/solr/core/src/java/org/apache/solr/store/blob/process/CorePullerThread.java
index ce4003e..d1ad641 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/process/CorePullerThread.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/process/CorePullerThread.java
@@ -16,11 +16,21 @@
  */
 package org.apache.solr.store.blob.process;
 
-import org.apache.solr.store.blob.util.DeduplicatingList;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import com.google.common.base.Throwables;
+import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.store.blob.util.DeduplicatingList;
+import org.apache.solr.store.shared.SharedCoreConcurrencyController;
+import org.apache.solr.store.shared.SharedCoreConcurrencyController.SharedCoreVersionMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.lang.invoke.MethodHandles;
 
 /**
  * A thread (there are a few of these created in {@link CorePullerFeeder#run}) that dequeues {@link CorePullTask} from a
@@ -47,9 +57,47 @@ public class CorePullerThread implements Runnable {
       try {
         // This call blocks if work queue is empty
         task = workQueue.removeFirst();
-        // TODO: we should timebox this request in case we are stuck for long time
-        task.pullCoreFromBlob();
 
+        CorePullerFeeder.PullCoreInfo info = task.getPullCoreInfo();
+        String collectionName = info.getCollectionName();
+        String shardName = info.getShardName();
+        String coreName = info.getCoreName();
+        SharedCoreConcurrencyController concurrencyController = task.getCoreContainer().getSharedStoreManager().getSharedCoreConcurrencyController();
+
+        SharedCoreVersionMetadata coreVersionMetadata = concurrencyController.getCoreVersionMetadata(collectionName, shardName, coreName);
+        // TODO: On leaders we can rely on soft guarantee of equality since indexing can correct them if they are wrong
+        //       There is a work item to make use of that knowledge and not enqueue pulls on leaders when soft guarantee of equality
+        //       is available. But until that work item is done this a stop gap measure to dismiss those unnecessary pulls.
+        //       The reason it is important to add this:
+        //       - is to prevent those unnecessary pulls to contend on pull write lock and cause any trouble to indexing which needs read lock.
+        //       The reason it is considered a stop gap measure:
+        //       - we need not to enqueue pull requests to begin with
+        //       - this isLeader computation is not complete, it does not handle cases when core is absent
+        //       - this isLeader computation might not be performant and efficient. I don't know if caching is involved here or not.
+        boolean isLeaderPulling = isLeader(task.getCoreContainer(), coreName);
+        if (coreVersionMetadata.isSoftGuaranteeOfEquality() && isLeaderPulling) {
+          // already in sync
+          task.finishedPull(coreVersionMetadata.getBlobCoreMetadata(), CoreSyncStatus.SUCCESS_EQUIVALENT, null);
+          // move to next task
+          continue;
+        }
+
+        ReentrantReadWriteLock corePullLock = concurrencyController.getCorePullLock(info.getCollectionName(), info.getShardName(), info.getCoreName());
+        // Try to acquire write lock, if possible. otherwise, we don't want to hold this background thread
+        // because it can be used for other cores in the mean time
+        // Avoiding the barging overload #tryLock() to let default fairness scheme play out, although, the way
+        // we are using this lock barging overload would not disrupt things that much 
+        if (corePullLock.writeLock().tryLock(0, TimeUnit.MILLISECONDS)) {
+          try {
+            // TODO: we should timebox this request in case we are stuck for long time
+            task.pullCoreFromBlob(isLeaderPulling);
+          } finally {
+            corePullLock.writeLock().unlock();
+          }
+        } else {
+          log.info(String.format("Could not acquire pull write lock, going back to task queue, pullCoreInfo=%s", task.getPullCoreInfo().toString()));
+          workQueue.addDeduplicated(task, true);
+        }
       } catch (InterruptedException ie) {
         log.info("Puller thread " + Thread.currentThread().getName()
             + " got interrupted. Shutting down Blob CorePullerFeeder if not already.");
@@ -60,9 +108,42 @@ public class CorePullerThread implements Runnable {
         break;
       } catch (Exception e) {
         // Exceptions other than InterruptedException should not stop the business
-        String taskInfo = task == null ? "" : String.format("Attempt=%s to pull core %s ", task.getAttempts(), task.getPullCoreInfo().getSharedStoreName()) ;
+        String taskInfo = "";
+        if (task != null) {
+          try {
+            taskInfo = String.format("Attempt=%s to pull core %s ", task.getAttempts(), task.getPullCoreInfo().getCoreName());
+            task.finishedPull(null, CoreSyncStatus.FAILURE, Throwables.getStackTraceAsString(e));
+          } catch (Exception fpe) {
+            log.warn("Cleaning up of pull task encountered a failure.", fpe);
+          }
+        }
         log.warn("CorePullerThread encountered a failure. " + taskInfo, e);
       }
     }
   }
+
+  // TODO: This is temporary, see detailed note where it is consumed above.
+  private boolean isLeader(CoreContainer coreContainer, String coreName) throws InterruptedException {
+    if (!coreContainer.isZooKeeperAware()) {
+      // not solr cloud
+      return false;
+    }
+    CoreDescriptor coreDescriptor = coreContainer.getCoreDescriptor(coreName);
+    if (coreDescriptor == null) {
+      // core does not exist
+      return false;
+    }
+    CloudDescriptor cd = coreDescriptor.getCloudDescriptor();
+    if (cd == null || cd.getReplicaType() != Replica.Type.SHARED) {
+      // not a shared replica
+      return false;
+    }
+    ZkController zkController = coreContainer.getZkController();
+    Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(cd.getCollectionName(), cd.getShardId());
+    if (leaderReplica == null || !cd.getCoreNodeName().equals(leaderReplica.getName())) {
+      // not a leader replica
+      return false;
+    }
+    return true;
+  }
 }
diff --git a/solr/core/src/java/org/apache/solr/store/blob/process/CorePusher.java b/solr/core/src/java/org/apache/solr/store/blob/process/CorePusher.java
index 1b6717d..97f51aa 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/process/CorePusher.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/process/CorePusher.java
@@ -1,13 +1,19 @@
 package org.apache.solr.store.blob.process;
 
+import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.index.IndexNotFoundException;
+import org.apache.lucene.store.Directory;
 import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.DirectoryFactory;
+import org.apache.solr.core.SolrCore;
 import org.apache.solr.store.blob.client.BlobCoreMetadata;
-import org.apache.solr.store.blob.client.BlobCoreMetadataBuilder;
 import org.apache.solr.store.blob.client.CoreStorageClient;
 import org.apache.solr.store.blob.metadata.CorePushPull;
 import org.apache.solr.store.blob.metadata.PushPullData;
@@ -16,7 +22,11 @@ import org.apache.solr.store.blob.metadata.SharedStoreResolutionUtil;
 import org.apache.solr.store.blob.metadata.SharedStoreResolutionUtil.SharedMetadataResolutionResult;
 import org.apache.solr.store.blob.provider.BlobStorageProvider;
 import org.apache.solr.store.blob.util.BlobStoreUtils;
+import org.apache.solr.store.shared.SharedCoreConcurrencyController;
+import org.apache.solr.store.shared.SharedCoreConcurrencyController.SharedCoreStage;
+import org.apache.solr.store.shared.SharedCoreConcurrencyController.SharedCoreVersionMetadata;
 import org.apache.solr.store.shared.metadata.SharedShardMetadataController;
+import org.apache.solr.store.shared.metadata.SharedShardMetadataController.SharedShardVersionMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,77 +38,162 @@ import org.slf4j.LoggerFactory;
  * changed locally and needs to be persisted to a shared store (blob store). 
  */
 public class CorePusher {
-  
+
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  
+
   private CoreContainer coreContainer;
-  
+
   public CorePusher(CoreContainer coreContainer) {
     this.coreContainer = coreContainer;
   }
-    
+
   /**
    * Pushes the local core updates to the Blob store and logs whether the push succeeded or failed.
    */
   public void pushCoreToBlob(PushPullData pushPullData) throws Exception {
-    BlobStorageProvider blobProvider = coreContainer.getSharedStoreManager().getBlobStorageProvider(); 
+    BlobStorageProvider blobProvider = coreContainer.getSharedStoreManager().getBlobStorageProvider();
     CoreStorageClient blobClient = blobProvider.getClient();
-    BlobDeleteManager deleteManager = coreContainer.getSharedStoreManager().getBlobDeleteManager(); // TODO, use a real client
-        
-    BlobCoreMetadata blobCoreMetadata = null;
-    log.info("Push to shared store initiating with PushPullData= " + pushPullData.toString());
-    
-    // Read the metadata file from shared store if this isn't the first push of this index shard
+    BlobDeleteManager deleteManager = coreContainer.getSharedStoreManager().getBlobDeleteManager();
     try {
-      if (!pushPullData.getLastReadMetadataSuffix().equals(
-          SharedShardMetadataController.METADATA_NODE_DEFAULT_VALUE)) {
-  
-        String blobCoreMetadataName = BlobStoreUtils.buildBlobStoreMetadataName(pushPullData.getLastReadMetadataSuffix());
-        blobCoreMetadata = blobClient.pullCoreMetadata(pushPullData.getSharedStoreName(), blobCoreMetadataName);
-  
-        if (blobCoreMetadata == null) {
-          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "The shard index " + pushPullData.getSharedStoreName() + 
-              " is missing for shard " + pushPullData.getShardName() + " for collection " + pushPullData.getCollectionName() + 
-              " using metadataSuffix " + pushPullData.getLastReadMetadataSuffix());
+      String shardName = pushPullData.getShardName();
+      String collectionName = pushPullData.getCollectionName();
+      String coreName = pushPullData.getCoreName();
+      SharedCoreConcurrencyController concurrencyController = coreContainer.getSharedStoreManager().getSharedCoreConcurrencyController();
+      ReentrantLock corePushLock = concurrencyController.getCorePushLock(collectionName, shardName, coreName);
+      String snapshotDirPath = null;
+      // TODO: Timebox the following push logic, in case we are stuck for long time. We would also need to respect any
+      //       time constraints that comes with indexing request since we will be doing wasteful work if the client has already 
+      //       bailed on us. This might be better done as part of bigger work item where we spike out how to allocate/configure
+      //       time quotas in cooperation with client.
+      // acquire push lock to serialize blob updates so that concurrent updates can't race with each other 
+      // and cause push failures because one was able to advance zk to newer version.
+      //
+      // Only need to read this if we ever establish starvation on push lock and want to do something about it:
+      // First thing we do after acquiring the lock is to see if latest commit's generation is equal to what we pushed last.
+      // If equal then we have nothing to push, its an optimization that saves us from creating somewhat expensive ServerSideMetadata.
+      // That equality check can also be duplicated here before acquiring the push lock but that would just be a simple optimization.
+      // It will not save us from starvation. Because that condition being "true" also means that there is no active pusher,
+      // so there is no question of starvation.
+      // One option could be to snapshot a queue of pusher threads, we are working on behalf of,
+      // before we capture the commit point to push. Once finished pushing, we can dismiss all those threads together.
+      long startTimeMs = System.currentTimeMillis();
+      corePushLock.lock();
+      try {
+        long lockAcquisitionTime = System.currentTimeMillis() - startTimeMs;
+        SolrCore core = coreContainer.getCore(coreName);
+        if (core == null) {
+          throw new SolrException(ErrorCode.SERVER_ERROR, "Can't find core " + coreName);
+        }
+        try {
+          concurrencyController.recordState(collectionName, shardName, coreName, SharedCoreStage.BlobPushStarted);
+
+          IndexCommit latestCommit = core.getDeletionPolicy().getLatestCommit();
+          if (latestCommit == null) {
+            throw new SolrException(ErrorCode.SERVER_ERROR, "Core " + coreName + " has no available commit point");
+          }
+
+          SharedCoreVersionMetadata coreVersionMetadata = concurrencyController.getCoreVersionMetadata(collectionName, shardName, coreName);
+          if (latestCommit.getGeneration() == coreVersionMetadata.getBlobCoreMetadata().getGeneration()) {
+            // Everything up to latest commit point has already been pushed 
+            // This can happen if another indexing batch comes in and acquires the push lock first and ends up pushing segments 
+            // produced by this indexing batch.
+            // This optimization saves us from creating somewhat expensive ServerSideMetadata.
+            log.info(String.format("Nothing to push, pushLockTime=%s pushPullData=%s", lockAcquisitionTime, pushPullData.toString()));
+            return;
+          }
+
+          log.info("Push to shared store initiating with PushPullData= " + pushPullData.toString());
+          // Resolve the differences between the local shard index data and shard index data on shared store
+          // if there is any
+          ServerSideMetadata localCoreMetadata = new ServerSideMetadata(coreName, coreContainer, /* takeSnapshot */true);
+          snapshotDirPath = localCoreMetadata.getSnapshotDirPath();
+          SharedMetadataResolutionResult resolutionResult = SharedStoreResolutionUtil.resolveMetadata(
+              localCoreMetadata, coreVersionMetadata.getBlobCoreMetadata());
+
+          if (resolutionResult.getFilesToPush().isEmpty()) {
+            log.warn(String.format("Why there is nothing to push even when there is a newer commit point since last push," +
+                " pushLockTime=%s pushPullData=%s", lockAcquisitionTime, pushPullData.toString()));
+            return;
+          }
+
+          String newMetadataSuffix = BlobStoreUtils.generateMetadataSuffix();
+          // begin the push process 
+          CorePushPull pushPull = new CorePushPull(blobClient, deleteManager, pushPullData, resolutionResult, localCoreMetadata, coreVersionMetadata.getBlobCoreMetadata());
+          BlobCoreMetadata blobCoreMetadata = pushPull.pushToBlobStore(coreVersionMetadata.getMetadataSuffix(), newMetadataSuffix);
+          concurrencyController.recordState(collectionName, shardName, coreName, SharedCoreStage.BlobPushed);
+          // at this point we've pushed the new metadata file with the newMetadataSuffix and now need to write to zookeeper
+          SharedShardMetadataController shardSharedMetadataController = coreContainer.getSharedStoreManager().getSharedShardMetadataController();
+          SharedShardVersionMetadata newShardVersionMetadata = null;
+          try {
+            newShardVersionMetadata = shardSharedMetadataController.updateMetadataValueWithVersion(pushPullData.getCollectionName(), pushPullData.getShardName(),
+                newMetadataSuffix, coreVersionMetadata.getVersion());
+          } catch (Exception ex) {
+            boolean isVersionMismatch = false;
+            Throwable cause = ex;
+            while (cause != null) {
+              if (cause instanceof BadVersionException) {
+                isVersionMismatch = true;
+                break;
+              }
+              cause = cause.getCause();
+            }
+            if (isVersionMismatch) {
+              // conditional update of zookeeper failed, take away soft guarantee of equality.
+              // That will make sure before processing next indexing batch, we sync with zookeeper and pull from shared store.
+              concurrencyController.updateCoreVersionMetadata(collectionName, shardName, coreName, false);
+            }
+            throw ex;
+          }
+          concurrencyController.recordState(collectionName, shardName, coreName, SharedCoreStage.ZkUpdateFinished);
+
+          assert newMetadataSuffix.equals(newShardVersionMetadata.getMetadataSuffix());
+          // after successful update to zookeeper, update core version metadata with new version info
+          // and we can also give soft guarantee that core is up to date w.r.to shared store, until unless failures happen and leadership changes 
+          concurrencyController.updateCoreVersionMetadata(collectionName, shardName, coreName, newShardVersionMetadata, blobCoreMetadata, /* softGuaranteeOfEquality */ true);
+          concurrencyController.recordState(collectionName, shardName, coreName, SharedCoreStage.LocalCacheUpdateFinished);
+          log.info(String.format("Successfully pushed to shared store, pushLockTime=%s pushPullData=%s", lockAcquisitionTime, pushPullData.toString()));
+        } finally {
+          try {
+            concurrencyController.recordState(collectionName, shardName, coreName, SharedCoreStage.BlobPushFinished);
+            if (snapshotDirPath != null) {
+              // we are done with push we can now remove the snapshot directory
+              removeSnapshotDirectory(core, snapshotDirPath);
+            }
+          } finally {
+            core.close();
+          }
         }
-      } else {
-        blobCoreMetadata = BlobCoreMetadataBuilder.buildEmptyCoreMetadata(pushPullData.getSharedStoreName());
-        log.info("This is the first time that shard " + pushPullData.getShardName() + " for collection " + 
-            pushPullData.getCollectionName() + " is getting pushed to blob store.");
+      } finally {
+        corePushLock.unlock();
       }
-      
-      // Resolve the differences between the local shard index data and shard index data on shared store
-      // if there is any
-      ServerSideMetadata localShardMetadata = new ServerSideMetadata(pushPullData.getCoreName(), coreContainer);
-      SharedMetadataResolutionResult resolutionResult = SharedStoreResolutionUtil.resolveMetadata(
-          localShardMetadata, blobCoreMetadata);
-      
-      // begin the push process 
-      CorePushPull pushPull = new CorePushPull(blobClient, deleteManager, pushPullData, resolutionResult, localShardMetadata, blobCoreMetadata);
-      pushPull.pushToBlobStore();
-      
-      // at this point we've pushed the new metadata file with the newMetadataSuffix and now need to write to zookeeper
-      SharedShardMetadataController shardSharedMetadataController = coreContainer.getSharedStoreManager().getSharedShardMetadataController(); 
-      shardSharedMetadataController.updateMetadataValueWithVersion(pushPullData.getCollectionName(), pushPullData.getShardName(),
-          pushPullData.getNewMetadataSuffix(), pushPullData.getZkVersion());
-      log.info("Successfully pushed to shared store");
-      
       // TODO - make error handling a little nicer?
     } catch (InterruptedException e) {
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "CorePusher was interrupted while pushing to blob store", e);
     } catch (IndexNotFoundException infe) {
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "CorePusher failed because the core " + pushPullData.getCoreName() +
-        " for the shard " + pushPullData.getShardName() + " was not found", infe);
+          " for the shard " + pushPullData.getShardName() + " was not found", infe);
     } catch (SolrException e) {
       Throwable t = e.getCause();
       if (t instanceof BadVersionException) {
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "CorePusher failed to push because the node "
-            + "version doesn't match. This shard is no longer the leader.", t); 
+            + "version doesn't match.", t);
       }
       throw e;
     } catch (Exception e) {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "CorePusher failed to push shard index for " 
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "CorePusher failed to push shard index for "
           + pushPullData.getShardName() + " due to unexpected exception", e);
     }
   }
+
+  private void removeSnapshotDirectory(SolrCore core, String snapshotDirPath) throws IOException {
+    Directory snapshotDir = core.getDirectoryFactory().get(snapshotDirPath, DirectoryFactory.DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);
+    try {
+      core.getDirectoryFactory().doneWithDirectory(snapshotDir);
+      core.getDirectoryFactory().remove(snapshotDir);
+    } catch (Exception e) {
+      log.warn("Cannot remove snapshot directory " + snapshotDirPath, e);
+    } finally {
+      core.getDirectoryFactory().release(snapshotDir);
+    }
+  }
 }
\ No newline at end of file
diff --git a/solr/core/src/java/org/apache/solr/store/blob/process/CoreSyncStatus.java b/solr/core/src/java/org/apache/solr/store/blob/process/CoreSyncStatus.java
index 960c726..4405e3a 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/process/CoreSyncStatus.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/process/CoreSyncStatus.java
@@ -50,8 +50,6 @@ public enum CoreSyncStatus {
   BLOB_DELETED_FOR_PULL(false, false),
   /** Core was not pushed because Blob version more up to date */
   BLOB_FRESHER(true, false),
-  /** No attempt to push/pull the core was made because another task was working on it */
-  CONCURRENT_SYNC(false, true),
   /** No attempt to push/pull the core was made as system was shutting down */
   SHUTTING_DOWN(false, false),
   /** The task was merged with another duplicate task in the queue (deduplicated) */
diff --git a/solr/core/src/java/org/apache/solr/store/blob/process/CoreUpdateTracker.java b/solr/core/src/java/org/apache/solr/store/blob/process/CoreUpdateTracker.java
index 8b8105b..8ef502e 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/process/CoreUpdateTracker.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/process/CoreUpdateTracker.java
@@ -1,18 +1,14 @@
 package org.apache.solr.store.blob.process;
 
 import java.lang.invoke.MethodHandles;
-import java.util.Map;
 
-import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.store.blob.metadata.PushPullData;
-import org.apache.solr.store.blob.util.BlobStoreUtils;
 import org.apache.solr.store.shared.metadata.SharedShardMetadataController;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,19 +58,6 @@ public class CoreUpdateTracker {
           log.warn("Performing a push for shard " + shardName + " that is inactive!");
         }
         log.info("Initiating push for collection=" + collectionName + " shard=" + shardName + " coreName=" + coreName);
-        // creates the metadata node if it doesn't exist
-        shardSharedMetadataController.ensureMetadataNodeExists(collectionName, shardName);
-
-        /*
-         * Get the metadataSuffix value from ZooKeeper or from a cache if an entry exists for the 
-         * given collection and shardName. If the leader has already changed, the conditional update
-         * later will fail and invalidate the cache entry if it exists. 
-         */
-        VersionedData data = shardSharedMetadataController.readMetadataValue(collectionName, shardName, 
-            /* readFromCache */ true);
-
-        Map<String, String> nodeUserData = (Map<String, String>) Utils.fromJSON(data.getData());
-        String metadataSuffix = nodeUserData.get(SharedShardMetadataController.SUFFIX_NODE_NAME);
 
         String sharedShardName = (String) shard.get(ZkStateReader.SHARED_SHARD_NAME);
 
@@ -83,9 +66,6 @@ public class CoreUpdateTracker {
             .setShardName(shardName)
             .setCoreName(coreName)
             .setSharedStoreName(sharedShardName)
-            .setLastReadMetadataSuffix(metadataSuffix)
-            .setNewMetadataSuffix(BlobStoreUtils.generateMetadataSuffix())
-            .setZkVersion(data.getVersion())
             .build();
         CorePusher pusher = new CorePusher(coreContainer);
         pusher.pushCoreToBlob(pushPullData);
diff --git a/solr/core/src/java/org/apache/solr/store/blob/util/BlobStoreUtils.java b/solr/core/src/java/org/apache/solr/store/blob/util/BlobStoreUtils.java
index 4a428c2..86407c8 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/util/BlobStoreUtils.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/util/BlobStoreUtils.java
@@ -1,17 +1,15 @@
 package org.apache.solr.store.blob.util;
 import java.lang.invoke.MethodHandles;
-import java.util.Map;
 import java.util.UUID;
 
-import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.store.blob.client.BlobCoreMetadata;
+import org.apache.solr.store.blob.client.BlobCoreMetadataBuilder;
 import org.apache.solr.store.blob.client.CoreStorageClient;
 import org.apache.solr.store.blob.metadata.CorePushPull;
 import org.apache.solr.store.blob.metadata.PushPullData;
@@ -19,7 +17,10 @@ import org.apache.solr.store.blob.metadata.ServerSideMetadata;
 import org.apache.solr.store.blob.metadata.SharedStoreResolutionUtil;
 import org.apache.solr.store.blob.metadata.SharedStoreResolutionUtil.SharedMetadataResolutionResult;
 import org.apache.solr.store.blob.process.BlobDeleteManager;
+import org.apache.solr.store.shared.SharedCoreConcurrencyController;
+import org.apache.solr.store.shared.SharedCoreConcurrencyController.SharedCoreStage;
 import org.apache.solr.store.shared.metadata.SharedShardMetadataController;
+import org.apache.solr.store.shared.metadata.SharedShardMetadataController.SharedShardVersionMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,76 +55,77 @@ public class BlobStoreUtils {
    * local content.
    * @throws SolrException if the local core was not successfully sync'd.
    */
-  public static void syncLocalCoreWithSharedStore(String collectionName, String coreName, String shardName, CoreContainer coreContainer) throws SolrException {
+  public static void syncLocalCoreWithSharedStore(String collectionName, String coreName, String shardName, CoreContainer coreContainer,
+                                                  SharedShardVersionMetadata shardVersionMetadata, boolean isLeaderPulling) throws SolrException {
     assert coreContainer.isZooKeeperAware();
 
     ZkController zkController = coreContainer.getZkController();
-    SharedShardMetadataController sharedMetadataController = coreContainer.getSharedStoreManager().getSharedShardMetadataController();
     DocCollection collection = zkController.getClusterState().getCollection(collectionName);
     CoreStorageClient blobClient = coreContainer.getSharedStoreManager().getBlobStorageProvider().getClient();
-    log.info("sync intialized for collection=" + collectionName + " shard=" + shardName + " coreName=" + coreName);
-    
+    log.info("sync initialized for collection=" + collectionName + " shard=" + shardName + " coreName=" + coreName);
+
     Slice shard = collection.getSlicesMap().get(shardName);
     if (shard != null) {
       try {
-        sharedMetadataController.ensureMetadataNodeExists(collectionName, shardName);
-        String sharedStoreName = (String)shard.get(ZkStateReader.SHARED_SHARD_NAME);
-        // Fetch the latest metadata from ZK.
-        // TODO: this can be optimized, depends on correct handling of leadership change.
-        VersionedData data = sharedMetadataController.readMetadataValue(collectionName, shardName, false);
-
-        Map<String, String> nodeUserData = (Map<String, String>) Utils.fromJSON(data.getData());
-        String metadataSuffix = nodeUserData.get(SharedShardMetadataController.SUFFIX_NODE_NAME);
-        if (SharedShardMetadataController.METADATA_NODE_DEFAULT_VALUE.equals(metadataSuffix)) {
+        String sharedStoreName = (String) shard.get(ZkStateReader.SHARED_SHARD_NAME);
+        SharedCoreConcurrencyController concurrencyController = coreContainer.getSharedStoreManager().getSharedCoreConcurrencyController();
+        if (SharedShardMetadataController.METADATA_NODE_DEFAULT_VALUE.equals(shardVersionMetadata.getMetadataSuffix())) {
+          //no-op pull
+          BlobCoreMetadata emptyBlobCoreMetadata = BlobCoreMetadataBuilder.buildEmptyCoreMetadata(sharedStoreName);
+          concurrencyController.updateCoreVersionMetadata(collectionName, shardName, coreName, shardVersionMetadata, emptyBlobCoreMetadata, isLeaderPulling);
           log.info("sync successful, nothing to pull, collection=" + collectionName + " shard=" + shardName + " coreName=" + coreName);
-          return ;
+          return;
         }
-        // Get blob metadata
-        String blobCoreMetadataName = BlobStoreUtils.buildBlobStoreMetadataName(metadataSuffix);
-        BlobCoreMetadata blobstoreMetadata = blobClient.pullCoreMetadata(sharedStoreName, blobCoreMetadataName);
-        if (null == blobstoreMetadata) {
-          // Zookepeer and blob are out of sync, could be due to eventual consistency model in blob or something else went wrong.
-          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-              "cannot get core.metadata file from shared store, blobCoreMetadataName=" + blobCoreMetadataName +
-              " shard=" + shardName +
-              " collectionName=" + collectionName +
-              " sharedStoreName=" + sharedStoreName );
-        } else if (blobstoreMetadata.getIsDeleted()) {
-          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-              "core.metadata file is marked deleted in shared store, blobCoreMetadataName=" + blobCoreMetadataName +
-              " shard=" + shardName +
-              " collectionName=" + collectionName +
-              " sharedStoreName=" + sharedStoreName );
-        } else if (blobstoreMetadata.getIsCorrupt()) {
-          log.warn("core.Metadata file is marked corrpt, skipping sync, collection=" + collectionName + 
-              " shard=" + shardName + " coreName=" + coreName + " sharedStoreName=" + sharedStoreName );
-          return ;
-        }
-
-        // Get local metadata + resolve with blob metadata
-        ServerSideMetadata serverMetadata = new ServerSideMetadata(coreName, coreContainer);
-        SharedMetadataResolutionResult resolutionResult = SharedStoreResolutionUtil.resolveMetadata(serverMetadata, blobstoreMetadata);
-        PushPullData pushPullData = new PushPullData.Builder()
-            .setCollectionName(collectionName)
-            .setShardName(shardName)
-            .setCoreName(coreName)
-            .setSharedStoreName(sharedStoreName)
-            .setLastReadMetadataSuffix(metadataSuffix)
-            .setNewMetadataSuffix(BlobStoreUtils.generateMetadataSuffix())
-            .setZkVersion(data.getVersion())
-            .build();
+        concurrencyController.recordState(collectionName, shardName, coreName, SharedCoreStage.BlobPullStarted);
+        try {
+          // Get blob metadata
+          String blobCoreMetadataName = BlobStoreUtils.buildBlobStoreMetadataName(shardVersionMetadata.getMetadataSuffix());
+          BlobCoreMetadata blobstoreMetadata = blobClient.pullCoreMetadata(sharedStoreName, blobCoreMetadataName);
+          if (null == blobstoreMetadata) {
+            // Zookepeer and blob are out of sync, could be due to eventual consistency model in blob or something else went wrong.
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+                "cannot get core.metadata file from shared store, blobCoreMetadataName=" + blobCoreMetadataName +
+                    " shard=" + shardName +
+                    " collectionName=" + collectionName +
+                    " sharedStoreName=" + sharedStoreName);
+          } else if (blobstoreMetadata.getIsDeleted()) {
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+                "core.metadata file is marked deleted in shared store, blobCoreMetadataName=" + blobCoreMetadataName +
+                    " shard=" + shardName +
+                    " collectionName=" + collectionName +
+                    " sharedStoreName=" + sharedStoreName);
+          } else if (blobstoreMetadata.getIsCorrupt()) {
+            log.warn("core.Metadata file is marked corrpt, skipping sync, collection=" + collectionName +
+                " shard=" + shardName + " coreName=" + coreName + " sharedStoreName=" + sharedStoreName);
+            return;
+          }
 
-        if (resolutionResult.getFilesToPull().size() > 0) {
+          // Get local metadata + resolve with blob metadata
+          ServerSideMetadata serverMetadata = new ServerSideMetadata(coreName, coreContainer);
+          SharedMetadataResolutionResult resolutionResult = SharedStoreResolutionUtil.resolveMetadata(serverMetadata, blobstoreMetadata);
+          PushPullData pushPullData = new PushPullData.Builder()
+              .setCollectionName(collectionName)
+              .setShardName(shardName)
+              .setCoreName(coreName)
+              .setSharedStoreName(sharedStoreName)
+              .build();
 
-          BlobDeleteManager deleteManager = coreContainer.getSharedStoreManager().getBlobDeleteManager();
-          CorePushPull cp = new CorePushPull(blobClient, deleteManager, pushPullData, resolutionResult, serverMetadata, blobstoreMetadata);
-          cp.pullUpdateFromBlob(/* waitForSearcher */ true);
-        } else {
-          log.info("sync successful, nothing to pull for collection=" + collectionName + " shard=" + shardName + " coreName=" + coreName);
+          if (resolutionResult.getFilesToPull().size() > 0) {
+            BlobDeleteManager deleteManager = coreContainer.getSharedStoreManager().getBlobDeleteManager();
+            CorePushPull cp = new CorePushPull(blobClient, deleteManager, pushPullData, resolutionResult, serverMetadata, blobstoreMetadata);
+            cp.pullUpdateFromBlob(/* waitForSearcher */ true);
+            concurrencyController.updateCoreVersionMetadata(pushPullData.getCollectionName(), pushPullData.getShardName(), pushPullData.getCoreName(),
+                shardVersionMetadata, blobstoreMetadata, isLeaderPulling);
+          } else {
+            log.warn(String.format("Why there are no files to pull even when we do not match with the version in zk? collection=%s shard=%s core=%s",
+                collectionName, shardName, coreName));
+          }
+        } finally {
+          concurrencyController.recordState(collectionName, shardName, coreName, SharedCoreStage.BlobPullFinished);
         }
       } catch (Exception ex) {
         // wrap every thrown exception in a solr exception
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error occured pulling shard=" + shardName + " collection=" + collectionName + " from shared store "+ ex);
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error occured pulling shard=" + shardName + " collection=" + collectionName + " from shared store " + ex);
       }
     } else {
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Sync requested for unknown shard=" + shardName + " in collection=" + collectionName);
diff --git a/solr/core/src/java/org/apache/solr/store/shared/SharedCoreConcurrencyController.java b/solr/core/src/java/org/apache/solr/store/shared/SharedCoreConcurrencyController.java
new file mode 100644
index 0000000..9a9d52e
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/store/shared/SharedCoreConcurrencyController.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.store.shared;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.StringUtils;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.store.blob.client.BlobCoreMetadata;
+import org.apache.solr.store.blob.metadata.PushPullData;
+import org.apache.solr.store.blob.process.CorePullTask;
+import org.apache.solr.store.blob.process.CorePullerThread;
+import org.apache.solr.store.blob.process.CorePusher;
+import org.apache.solr.store.blob.util.BlobStoreUtils;
+import org.apache.solr.store.shared.metadata.SharedShardMetadataController;
+import org.apache.solr.store.shared.metadata.SharedShardMetadataController.SharedShardVersionMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class helps coordinate synchronization of concurrent indexing, pushes and pulls
+ * happening on a core of a shared collection {@link DocCollection#getSharedIndex()}
+ */
+public class SharedCoreConcurrencyController {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  /**
+   * Time indexing thread needs to wait to try acquiring pull write lock before checking if someone else has already done the pull.
+   */
+  public static int SECONDS_TO_WAIT_INDEXING_PULL_WRITE_LOCK = 5;
+  /**
+   * Max attempts by indexing thread to try acquiring pull write lock before bailing out. Ideally bail out scenario should never happen.
+   * If it does then either we are too slow in pulling and can tune this value or something else is wrong.
+   */
+  public static int MAX_ATTEMPTS_INDEXING_PULL_WRITE_LOCK = 10;
+
+  private final CoreContainer cores;
+  /**
+   * This cache maintains the shared store version the each core is at or ahead of(core has to sometimes be ahead of
+   * shared store given indexing first happens locally before being propagated to shared store).
+   * todo: need to add eviction strategy.
+   */
+  private final ConcurrentHashMap<String, SharedCoreVersionMetadata> coresVersionMetadata;
+
+  public SharedCoreConcurrencyController(CoreContainer cores) {
+    this.cores = cores;
+    coresVersionMetadata = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * Returns true if {@link SharedCoreVersionMetadata} and {@link SharedShardVersionMetadata} represent the same version; otherwise false
+   */
+  public boolean areVersionsEqual(SharedCoreVersionMetadata coreVersionMetadata, SharedShardVersionMetadata shardVersionMetadata) {
+    boolean isVersionNumberSame = coreVersionMetadata.getVersion() == shardVersionMetadata.getVersion();
+    boolean isMetadataSuffixSame = StringUtils.equals(coreVersionMetadata.getMetadataSuffix(), shardVersionMetadata.getMetadataSuffix());
+
+    if (isVersionNumberSame && isMetadataSuffixSame) {
+      return true;
+    }
+    if (isVersionNumberSame || isMetadataSuffixSame) {
+      log.warn(String.format("Why only one of version number and metadata suffix matches?" +
+              " coreVersionNumber=%s shardVersionNumber=%s" +
+              " coreMetadataSuffix=%s shardMetadataSuffix=%s",
+          coreVersionMetadata.getVersion(), shardVersionMetadata.getVersion(),
+          coreVersionMetadata.getMetadataSuffix(), shardVersionMetadata.getMetadataSuffix()));
+    }
+    return false;
+  }
+
+  /**
+   * Logs the current {@link SharedCoreStage} a core is at.
+   */
+  public void recordState(String collectionName, String shardName, String coreName, SharedCoreStage stage) {
+    log.info(String.format("RecordSharedCoreStage: collection=%s shard=%s core=%s stage=%s", collectionName, shardName, coreName, stage));
+  }
+
+  /**
+   * Returns a {@link ReentrantReadWriteLock} corresponding to the core. It protects pulls from each other and indexing from pulls.
+   * A write lock is required whenever pulling contents into a core from shared store.
+   * A read lock is required for the whole duration of indexing on the core(including the push to shared store {@link CorePusher#pushCoreToBlob(PushPullData)}.)
+   */
+  public ReentrantReadWriteLock getCorePullLock(String collectionName, String shardName, String coreName) {
+    SharedCoreVersionMetadata coreVersionMetadata = getOrCreateCoreVersionMetadata(collectionName, shardName, coreName);
+    return coreVersionMetadata.getCorePullLock();
+  }
+
+  /**
+   * Returns a {@link ReentrantLock} corresponding to the core. It protects shared store pushes from each other.
+   * This lock is required for pushing the core to shared store {@link CorePusher#pushCoreToBlob(PushPullData)}.
+   */
+  public ReentrantLock getCorePushLock(String collectionName, String shardName, String coreName) {
+    SharedCoreVersionMetadata coreVersionMetadata = getOrCreateCoreVersionMetadata(collectionName, shardName, coreName);
+    return coreVersionMetadata.getCorePushLock();
+  }
+
+  /**
+   * Returns {@link SharedCoreVersionMetadata} representing shared store version the core is at or ahead of (core has
+   * to sometimes be ahead of shared store given indexing first happens locally before being propagated to shared store).
+   */
+  public SharedCoreVersionMetadata getCoreVersionMetadata(String collectionName, String shardName, String coreName) {
+    return getOrCreateCoreVersionMetadata(collectionName, shardName, coreName);
+  }
+
+  /**
+   * Updates {@link SharedCoreVersionMetadata} for the core with passed in
+   * {@link SharedShardVersionMetadata} and {@link BlobCoreMetadata}
+   */
+  public void updateCoreVersionMetadata(String collectionName, String shardName, String coreName,
+                                        SharedShardVersionMetadata shardVersionMetadata, BlobCoreMetadata blobCoreMetadata) {
+    updateCoreVersionMetadata(collectionName, shardName, coreName, shardVersionMetadata, blobCoreMetadata, false);
+  }
+
+  /**
+   * Updates {@link SharedCoreVersionMetadata} for the core with passed in
+   * {@link SharedShardVersionMetadata}, {@link BlobCoreMetadata} and {@code softGuaranteeOfEquality}
+   */
+  public void updateCoreVersionMetadata(String collectionName, String shardName, String coreName,
+                                        SharedShardVersionMetadata shardVersionMetadata, BlobCoreMetadata blobCoreMetadata,
+                                        boolean softGuaranteeOfEquality) {
+    SharedCoreVersionMetadata currentMetadata = getOrCreateCoreVersionMetadata(collectionName, shardName, coreName);
+    SharedCoreVersionMetadata updatedMetadata = currentMetadata.updatedOf(shardVersionMetadata.getVersion(), shardVersionMetadata.getMetadataSuffix(),
+        blobCoreMetadata, softGuaranteeOfEquality);
+    updateCoreVersionMetadata(collectionName, shardName, coreName, currentMetadata, updatedMetadata);
+  }
+
+  /**
+   * Updates {@link SharedCoreVersionMetadata} for the core with passed in
+   * {@code softGuaranteeOfEquality}
+   */
+  public void updateCoreVersionMetadata(String collectionName, String shardName, String coreName, boolean softGuaranteeOfEquality) {
+    SharedCoreVersionMetadata currentMetadata = getOrCreateCoreVersionMetadata(collectionName, shardName, coreName);
+    SharedCoreVersionMetadata updatedMetadata = currentMetadata.updatedOf(softGuaranteeOfEquality);
+    updateCoreVersionMetadata(collectionName, shardName, coreName, currentMetadata, updatedMetadata);
+  }
+
+  private void updateCoreVersionMetadata(String collectionName, String shardName, String coreName, SharedCoreVersionMetadata currentMetadata, SharedCoreVersionMetadata updatedMetadata) {
+    log.info(String.format("updateCoreVersionMetadata: collection=%s shard=%s core=%s  current={%s} updated={%s}",
+        collectionName, shardName, coreName, currentMetadata.toString(), updatedMetadata.toString()));
+    coresVersionMetadata.put(coreName, updatedMetadata);
+  }
+
+  private SharedCoreVersionMetadata getOrCreateCoreVersionMetadata(String collectionName, String shardName, String coreName) {
+    SharedCoreVersionMetadata coreVersionMetadata = coresVersionMetadata.get(coreName);
+    if (coreVersionMetadata != null) {
+      // already present
+      return coreVersionMetadata;
+    }
+    return initializeCoreVersionMetadata(collectionName, shardName, coreName);
+  }
+
+  private SharedCoreVersionMetadata initializeCoreVersionMetadata(String collectionName, String shardName, String coreName) {
+    // computeIfAbsent to ensure we only do single initialization
+    return coresVersionMetadata.computeIfAbsent(coreName, k -> {
+      ensureShardVersionMetadataNodeExists(collectionName, shardName);
+      // a value not to be found as a zk node version
+      int version = -1;
+      String metadataSuffix = null;
+      BlobCoreMetadata blobCoreMetadata = null;
+      boolean softGuaranteeOfEquality = false;
+      /** Should only be created once at initialization time, subsequent updates should reuse same lock instance
+       *  see {@link SharedCoreVersionMetadata#updatedOf(int, String, BlobCoreMetadata, boolean)} and other overload.
+       *
+       *  We don't need fair ordering policy for this lock, which normally has lower throughput.
+       *  We rely on softGuaranteeOfEquality and isLeader at query time so that queries may not contend
+       *  on this lock and let the steady state indexing do its job without contention. see  {@link CorePullerThread#run()} for details.
+       *
+       *  On indexing side we rely on read lock and we can have multiple readers just fine. Write lock is only needed
+       *  in a fail over scenario(leader changed) where we need to pull from shared store but that is only needed to be done by one thread.
+       *  Therefore we acquire write lock with a timeout and check for that condition after the timeout. Therefore no
+       *  concern of starvation there either.
+       *  */
+      ReentrantReadWriteLock corePullLock = new ReentrantReadWriteLock();
+      /** Should only be created once at initialization time, subsequent updates should reuse same lock instance
+       *  see {@link SharedCoreVersionMetadata#updatedOf(int, String, BlobCoreMetadata, boolean)} and other overload
+       *
+       *  We don't need fair ordering policy for this lock, which normally has lower throughput.
+       *  see {@link CorePusher#pushCoreToBlob(PushPullData)} for details */
+      ReentrantLock corePushLock = new ReentrantLock();
+      return new SharedCoreVersionMetadata(version, metadataSuffix, blobCoreMetadata, softGuaranteeOfEquality, corePullLock, corePushLock);
+    });
+  }
+
+  @VisibleForTesting
+  protected void ensureShardVersionMetadataNodeExists(String collectionName, String shardName) {
+    SharedShardMetadataController metadataController = cores.getSharedStoreManager().getSharedShardMetadataController();
+    try {
+      // creates the metadata node if it doesn't exist
+      metadataController.ensureMetadataNodeExists(collectionName, shardName);
+    } catch (IOException ioe) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+          String.format("Unable to ensure metadata for collection=%s shard=%s", collectionName, shardName), ioe);
+    }
+  }
+
+  /**
+   * This represents metadata that need to be cached for a core of a shared collection {@link DocCollection#getSharedIndex()}
+   * so that it can be properly synchronized for concurrent indexing, pushes and pulls.
+   * <p>
+   * The locks only need to be initialized once and can potentially be kept in a separate structure from version information. For that
+   * we would need to pay cost for another map in {@link SharedCoreConcurrencyController}. To avoid another map we are keeping locks here
+   * and making sure they are not re-initialized.
+   */
+  public static class SharedCoreVersionMetadata {
+    /**
+     * Value originating from a ZooKeeper node used to handle conditionally and safely update the
+     * core.metadata file written to the shared store.
+     */
+    private final int version;
+    /**
+     * Unique value representing the state of shared store with which the core is at least sync with.
+     */
+    private final String metadataSuffix;
+    /**
+     * {@link BlobCoreMetadata} representing the state corresponding to {@link #metadataSuffix}
+     */
+    private final BlobCoreMetadata blobCoreMetadata;
+    /**
+     * Whether there is a soft guarantee of being in sync with {@link SharedShardVersionMetadata} of the shard.
+     * In steady state this guarantee is provided for leader cores when they push
+     * {@link CorePusher#pushCoreToBlob(PushPullData)}
+     * and pull
+     * {@link BlobStoreUtils#syncLocalCoreWithSharedStore(String, String, String, CoreContainer, SharedShardVersionMetadata, boolean)}
+     * {@link CorePullTask#pullCoreFromBlob()}
+     * since followers cannot index. In presence of this guarantee we can skip consulting zookeeper before processing an indexing batch.
+     */
+    private final boolean softGuaranteeOfEquality;
+    /**
+     * See comments on {@link #getCorePullLock(String, String, String)}
+     */
+    private final ReentrantReadWriteLock corePullLock;
+    /**
+     * See comments on {@link #getCorePushLock(String, String, String)}
+     */
+    private final ReentrantLock corePushLock;
+
+    private SharedCoreVersionMetadata(int version, String metadataSuffix, BlobCoreMetadata blobCoreMetadata,
+                                      boolean softGuaranteeOfEquality, ReentrantReadWriteLock corePullLock, ReentrantLock corePushLock) {
+      this.version = version;
+      this.metadataSuffix = metadataSuffix;
+      this.blobCoreMetadata = blobCoreMetadata;
+      this.softGuaranteeOfEquality = softGuaranteeOfEquality;
+      this.corePullLock = corePullLock;
+      this.corePushLock = corePushLock;
+    }
+
+    public int getVersion() {
+      return version;
+    }
+
+    public BlobCoreMetadata getBlobCoreMetadata() {
+      return blobCoreMetadata;
+    }
+
+    public String getMetadataSuffix() {
+      return metadataSuffix;
+    }
+
+    public boolean isSoftGuaranteeOfEquality() {
+      return softGuaranteeOfEquality;
+    }
+
+    private ReentrantReadWriteLock getCorePullLock() {
+      return corePullLock;
+    }
+
+    private ReentrantLock getCorePushLock() {
+      return corePushLock;
+    }
+
+    private SharedCoreVersionMetadata updatedOf(int version, String metadataSuffix, BlobCoreMetadata blobCoreMetadata, boolean softGuaranteeOfEquality) {
+      return new SharedCoreVersionMetadata(version, metadataSuffix, blobCoreMetadata, softGuaranteeOfEquality, corePullLock, corePushLock);
+    }
+
+    private SharedCoreVersionMetadata updatedOf(boolean softGuaranteeOfEquality) {
+      return new SharedCoreVersionMetadata(version, metadataSuffix, blobCoreMetadata, softGuaranteeOfEquality, corePullLock, corePushLock);
+    }
+
+    @Override
+    public String toString() {
+      return String.format("version=%s  metadataSuffix=%s softGuaranteeOfEquality=%s", version, metadataSuffix, softGuaranteeOfEquality);
+    }
+  }
+
+  /**
+   * Various stages a core of a shared collection {@link DocCollection#getSharedIndex()} might go through during indexing and querying.
+   */
+  public enum SharedCoreStage {
+    /**
+     * Necessary locks have been acquired and we have started to pull from the shared store.
+     */
+    BlobPullStarted,
+    /**
+     * Pull(either successful or failed) has ended and we are about to release the necessary locks.
+     */
+    BlobPullFinished,
+    /**
+     * We have received an indexing batch but necessary locks have not been acquired yet.
+     */
+    IndexingBatchReceived,
+    /**
+     * We are passed the shared store pull stage(if applicable) and are in sync with shared store.
+     * Now we will proceed with local indexing.
+     */
+    LocalIndexingStarted,
+    /**
+     * Local indexing finished but not have been pushed to shared store.
+     */
+    LocalIndexingFinished,
+    /**
+     * Necessary locks have been acquired and push to shared store has started.
+     */
+    BlobPushStarted,
+    /**
+     * Files have been pushed to blob.
+     */
+    BlobPushed,
+    /**
+     * Zookeeper has been successfully updated with new metadata.
+     */
+    ZkUpdateFinished,
+    /**
+     * Local cache {@link #coresVersionMetadata} has been successfully updated with new metadata.
+     */
+    LocalCacheUpdateFinished,
+    /**
+     * Push(either successful or failed) has ended and we are about to release the necessary locks.
+     */
+    BlobPushFinished
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/store/shared/SharedStoreManager.java b/solr/core/src/java/org/apache/solr/store/shared/SharedStoreManager.java
index 8a17f2b..16a66b7 100644
--- a/solr/core/src/java/org/apache/solr/store/shared/SharedStoreManager.java
+++ b/solr/core/src/java/org/apache/solr/store/shared/SharedStoreManager.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.store.shared;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.store.blob.metadata.BlobCoreSyncer;
 import org.apache.solr.store.blob.process.BlobDeleteManager;
@@ -24,8 +25,6 @@ import org.apache.solr.store.blob.process.CorePullTracker;
 import org.apache.solr.store.blob.provider.BlobStorageProvider;
 import org.apache.solr.store.shared.metadata.SharedShardMetadataController;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * Provides access to Shared Store processes. Note that this class is meant to be 
  * more generic in the future and provide a cleaner API but for now we'll expose
@@ -40,12 +39,14 @@ public class SharedStoreManager {
   private BlobProcessUtil blobProcessUtil;
   private CorePullTracker corePullTracker;
   private BlobCoreSyncer blobCoreSyncer;
-  
+  private SharedCoreConcurrencyController sharedCoreConcurrencyController;
+
   public SharedStoreManager(ZkController controller) {
     zkController = controller;
     // initialize BlobProcessUtil with the SharedStoreManager for background processes to be ready
     blobProcessUtil = new BlobProcessUtil(zkController.getCoreContainer());
     blobCoreSyncer = new BlobCoreSyncer();
+    sharedCoreConcurrencyController = new SharedCoreConcurrencyController(zkController.getCoreContainer());
   }
   
   @VisibleForTesting
@@ -113,4 +114,14 @@ public class SharedStoreManager {
   public BlobCoreSyncer getBlobCoreSyncer() {
     return blobCoreSyncer;
   }
+
+  public SharedCoreConcurrencyController getSharedCoreConcurrencyController() {
+    return sharedCoreConcurrencyController;
+  }
+
+  @VisibleForTesting
+  public void initConcurrencyController(SharedCoreConcurrencyController concurrencyController) {
+    this.sharedCoreConcurrencyController = concurrencyController;
+  }
+
 }
\ No newline at end of file
diff --git a/solr/core/src/java/org/apache/solr/store/shared/metadata/SharedShardMetadataController.java b/solr/core/src/java/org/apache/solr/store/shared/metadata/SharedShardMetadataController.java
index 9629e54..0b48a52 100644
--- a/solr/core/src/java/org/apache/solr/store/shared/metadata/SharedShardMetadataController.java
+++ b/solr/core/src/java/org/apache/solr/store/shared/metadata/SharedShardMetadataController.java
@@ -20,7 +20,6 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.solr.client.solrj.cloud.DistribStateManager;
 import org.apache.solr.client.solrj.cloud.SolrCloudManager;
@@ -35,8 +34,6 @@ import org.apache.solr.common.util.Utils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * Class that manages metadata for shared index-based collections in Solr Cloud and 
  * ZooKeeper.
@@ -48,23 +45,10 @@ public class SharedShardMetadataController {
   
   private SolrCloudManager cloudManager;
   private DistribStateManager stateManager;
-  /* 
-   * Naive in-memory cache without any cache eviction logic used to cache zk version values.
-   * TODO - convert to a more memory efficient and intelligent caching strategy. 
-   */ 
-  private ConcurrentHashMap<String, VersionedData> cache;
-  
+
   public SharedShardMetadataController(SolrCloudManager cloudManager) {
     this.cloudManager = cloudManager;
     this.stateManager = cloudManager.getDistribStateManager();
-    cache = new ConcurrentHashMap<>();
-  }
-  
-  @VisibleForTesting
-  public SharedShardMetadataController(SolrCloudManager cloudManager, ConcurrentHashMap<String, VersionedData> cache) {
-    this.cloudManager = cloudManager;
-    this.stateManager = cloudManager.getDistribStateManager();
-    this.cache = cache;
   }
   
   /**
@@ -90,10 +74,8 @@ public class SharedShardMetadataController {
   }
 
   /**
-   * If the update is successful, the VersionedData will contain the new version as well as the 
-   * value of the data just written. Successful updates will cache the new VersionedData while 
-   * unsuccesful ones will invalidate any existing entries for the corresponding collectionName,
-   * shardName combination.
+   * If the update is successful, the returned {@link SharedShardVersionMetadata} will contain the new version as well as the 
+   * value of the data just written. 
    * 
    * Specify version to be -1 to skip the node version check before update. 
    * 
@@ -102,25 +84,23 @@ public class SharedShardMetadataController {
    * @param value the value to be written to ZooKeeper
    * @param version the ZooKeeper node version to conditionally update on
    */
-  public VersionedData updateMetadataValueWithVersion(String collectionName, String shardName, String value, int version) {
+  public SharedShardVersionMetadata updateMetadataValueWithVersion(String collectionName, String shardName, String value, int version) {
     String metadataPath = getMetadataBasePath(collectionName, shardName) + "/" + SUFFIX_NODE_NAME;
     try {
       Map<String, Object> nodeProps = new HashMap<>();
       nodeProps.put(SUFFIX_NODE_NAME, value);
-      
+
       VersionedData data = stateManager.setAndGetResult(metadataPath, Utils.toJSON(nodeProps), version);
-      cache.put(getCacheKey(collectionName, shardName), data);
-      return data;
+      Map<String, String> nodeUserData = (Map<String, String>) Utils.fromJSON(data.getData());
+      String metadataSuffix = nodeUserData.get(SharedShardMetadataController.SUFFIX_NODE_NAME);
+      return new SharedShardVersionMetadata(data.getVersion(), metadataSuffix);
     } catch (BadVersionException e) {
-      cache.remove(getCacheKey(collectionName, shardName));
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating path: " + metadataPath
           + " due to mismatching versions", e);
     } catch (IOException | NoSuchElementException | KeeperException e) {
-      cache.remove(getCacheKey(collectionName, shardName));
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating path: " + metadataPath
           + " in ZooKeeper", e);
     } catch (InterruptedException e) {
-      cache.remove(getCacheKey(collectionName, shardName));
       Thread.currentThread().interrupt();
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating path: " + metadataPath
           + " in ZooKeeper due to interruption", e);
@@ -128,37 +108,18 @@ public class SharedShardMetadataController {
   }
   
   /**
-   * The returned VersionedData will contain the version of the node as well as the contents of the node.
-   * The data content of VersionedData will be a byte array that needs to be converted into a {@link Map}.
-   * 
-   * If readFromCache is true, we'll attempt to read from an in-memory cache the VersionedData based on 
-   * the collectionName and shardName and return that if it exists. There is no gaurantee this cache
-   * entry is not stale.
-   * 
-   * @param collectionName name of the collection being updated
-   * @param shardName name of the shard that owns the metadataSuffix node
-   */
-  public VersionedData readMetadataValue(String collectionName, String shardName, boolean readfromCache) throws SolrException {
-    if (readfromCache) {
-      VersionedData cachedEntry = cache.get(getCacheKey(collectionName, shardName));
-      if (cachedEntry != null) {
-        return cachedEntry;
-      }
-    }
-    return readMetadataValue(collectionName, shardName);
-  }
-  
-  /**
-   * The returned VersionedData will contain the version of the node as well as the contents of the node.
-   * The data content of VersionedData will be a byte array that needs to be converted into a {@link Map}.
+   * Reads the {@link SharedShardVersionMetadata} for the shard from zookeeper. 
    * 
-   * @param collectionName name of the collection being updated
+   * @param collectionName name of the shared collection
    * @param shardName name of the shard that owns the metadataSuffix node
    */
-  public VersionedData readMetadataValue(String collectionName, String shardName) {
+  public SharedShardVersionMetadata readMetadataValue(String collectionName, String shardName) {
     String metadataPath = getMetadataBasePath(collectionName, shardName) + "/" + SUFFIX_NODE_NAME;
     try {
-      return stateManager.getData(metadataPath, null);      
+      VersionedData data = stateManager.getData(metadataPath, null);
+      Map<String, String> nodeUserData = (Map<String, String>) Utils.fromJSON(data.getData());
+      String metadataSuffix = nodeUserData.get(SharedShardMetadataController.SUFFIX_NODE_NAME);
+      return  new SharedShardVersionMetadata(data.getVersion(), metadataSuffix);
     } catch (IOException | NoSuchElementException | KeeperException e) {
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error reading data from path: " + metadataPath
           + " in ZooKeeper", e);
@@ -182,21 +143,9 @@ public class SharedShardMetadataController {
     } catch (Exception e) {
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error deleting path " + 
           metadataPath + " in Zookeeper", e);
-    } finally {
-      cache.clear();
     }
   }
-  
-  /**
-   * Clears any cached version value if it exists for the corresponding collection and shard
-   * 
-   * @param collectionName name of the collection being updated
-   * @param shardName name of the shard that owns the metadataSuffix node
-   */
-  public void clearCachedVersion(String collectionName, String shardName) {
-    cache.remove(getCacheKey(collectionName, shardName));
-  }
-  
+
   private void createPersistentNodeIfNonExistent(String path, byte[] data) {
     try {
       if (!stateManager.hasData(path)) {
@@ -219,13 +168,31 @@ public class SharedShardMetadataController {
   protected String getMetadataBasePath(String collectionName, String shardName) {
     return ZkStateReader.COLLECTIONS_ZKNODE + "/" + collectionName + "/" + ZkStateReader.SHARD_LEADERS_ZKNODE + "/" + shardName;
   }
-  
-  protected String getCacheKey(String collectionName, String shardName) {
-    return collectionName + "_" + shardName;
-  }
-  
-  @VisibleForTesting
-  protected ConcurrentHashMap<String, VersionedData> getVersionedDataCache() {
-    return cache;
+
+  /**
+   * This represents correctness metadata for a shard of a shared collection {@link DocCollection#getSharedIndex()}
+   */
+  public static class SharedShardVersionMetadata {
+    /**
+     * version of zookeeper node maintaining the metadata
+     */
+    private final int version;
+    /**
+     * Unique value of the metadataSuffix for the last persisted shard index in the shared store.
+     */
+    private final String metadataSuffix;
+
+    public SharedShardVersionMetadata(int version, String metadataSuffix) {
+      this.version = version;
+      this.metadataSuffix = metadataSuffix;
+    }
+
+    public int getVersion() {
+      return version;
+    }
+
+    public String getMetadataSuffix() {
+      return metadataSuffix;
+    }
   }
 }
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index 9b44ef4..7a47936 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -17,8 +17,6 @@
 
 package org.apache.solr.update.processor;
 
-import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
-
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
@@ -32,7 +30,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.cloud.CloudDescriptor;
 import org.apache.solr.cloud.Overseer;
@@ -60,8 +60,15 @@ import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.store.blob.metadata.PushPullData;
+import org.apache.solr.store.blob.process.CorePusher;
 import org.apache.solr.store.blob.process.CoreUpdateTracker;
 import org.apache.solr.store.blob.util.BlobStoreUtils;
+import org.apache.solr.store.shared.SharedCoreConcurrencyController;
+import org.apache.solr.store.shared.SharedCoreConcurrencyController.SharedCoreStage;
+import org.apache.solr.store.shared.SharedCoreConcurrencyController.SharedCoreVersionMetadata;
+import org.apache.solr.store.shared.metadata.SharedShardMetadataController;
+import org.apache.solr.store.shared.metadata.SharedShardMetadataController.SharedShardVersionMetadata;
 import org.apache.solr.update.AddUpdateCommand;
 import org.apache.solr.update.CommitUpdateCommand;
 import org.apache.solr.update.DeleteUpdateCommand;
@@ -75,7 +82,7 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
+import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
 
 public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
 
@@ -87,6 +94,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
   private final String collection;
   private boolean readOnlyCollection = false;
   private CoreUpdateTracker sharedCoreTracker;
+  private ReentrantReadWriteLock corePullLock;
 
   // The cached immutable clusterState for the update... usually refreshed for each individual update.
   // Different parts of this class used to request current clusterState views, which lead to subtle bugs and race conditions
@@ -209,9 +217,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
     } else {
       // zk
       ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
-      if (replicaType.equals(Replica.Type.SHARED)) {
-        readFromSharedStoreIfNecessary();
-      }
 
       List<SolrCmdDistributor.Node> useNodes = null;
       if (req.getParams().get(COMMIT_END_POINT) == null) {
@@ -1055,6 +1060,10 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
 
   @Override
   protected void doClose() {
+    if (corePullLock != null) {
+      // release read lock
+      corePullLock.readLock().unlock();
+    }
     if (cmdDistrib != null) {
       cmdDistrib.close();
     }
@@ -1081,30 +1090,131 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
     super.processRollback(cmd);
   }
 
-  private void writeToShareStore() throws SolrException {
-    log.info("Attempting to initiate index update write to shared store for collection=" + cloudDesc.getCollectionName() +
-        " and shard=" + cloudDesc.getShardId() + " using core=" + req.getCore().getName());
+
+  private void writeToSharedStore() {
+    String collectionName = cloudDesc.getCollectionName();
+    String shardName = cloudDesc.getShardId();
+    String coreName = req.getCore().getName();
+    SharedCoreConcurrencyController concurrencyController = req.getCore().getCoreContainer().getSharedStoreManager().getSharedCoreConcurrencyController();
+    concurrencyController.recordState(collectionName, shardName, coreName, SharedCoreStage.LocalIndexingFinished);
+
+    log.info("Attempting to initiate index update write to shared store for collection=" + collectionName +
+        " and shard=" + shardName + " using core=" + coreName);
 
     sharedCoreTracker.persistShardIndexToSharedStore(zkController.zkStateReader.getClusterState(),
-        cloudDesc.getCollectionName(),
-        cloudDesc.getShardId(),
-        req.getCore().getName());
+        collectionName,
+        shardName,
+        coreName);
   }
 
-  private void readFromSharedStoreIfNecessary() throws SolrException {
-    String coreName = req.getCore().getName();
-    String shardName = cloudDesc.getShardId();
+  private void readFromSharedStoreIfNecessary() {
     String collectionName = cloudDesc.getCollectionName();
+    String shardName = cloudDesc.getShardId();
+    String coreName = req.getCore().getName();
     assert Replica.Type.SHARED.equals(replicaType);
     // Peers and subShardLeaders should only forward the update request to leader replica,
     // hence not need to sync with the blob store at this point.
     if (!isLeader || isSubShardLeader) {
       return;
     }
-    BlobStoreUtils.syncLocalCoreWithSharedStore(collectionName,coreName,shardName,req.getCore().getCoreContainer());
-  }
 
+    // this lock acquire/release logic is built on the assumption that one particular instance of this processor
+    // will solely be consumed by a single thread.
+    // Following pull logic should only run once before the first document of indexing batch(add/delete) is processed by this processor
+    if (corePullLock != null) {
+      // we already have a lock i.e. we have already read from the shared store (if needed)
+      return;
+    }
 
+    CoreContainer coreContainer = req.getCore().getCoreContainer();
+    SharedCoreConcurrencyController concurrencyController = coreContainer.getSharedStoreManager().getSharedCoreConcurrencyController();
+    concurrencyController.recordState(collectionName, shardName, coreName, SharedCoreStage.IndexingBatchReceived);
+    corePullLock = concurrencyController.getCorePullLock(collectionName, shardName, coreName);
+    // acquire lock for the whole duration of update
+    // it will be release in close method
+    corePullLock.readLock().lock();
+    // from this point on wards we should always exit this method with read lock (no matter failure or what)
+
+    SharedCoreVersionMetadata coreVersionMetadata = concurrencyController.getCoreVersionMetadata(collectionName, shardName, coreName);
+    /**
+     * we only need to sync if there is no soft guarantee of being in sync.
+     * if there is one we will rely on that, and if we turned out to be wrong indexing will fail at push time
+     * and will remove this guarantee in {@link CorePusher#pushCoreToBlob(PushPullData)}
+     */
+    if (!coreVersionMetadata.isSoftGuaranteeOfEquality()) {
+      SharedShardMetadataController metadataController = coreContainer.getSharedStoreManager().getSharedShardMetadataController();
+      SharedShardVersionMetadata shardVersionMetadata = metadataController.readMetadataValue(collectionName, shardName);
+      if (!concurrencyController.areVersionsEqual(coreVersionMetadata, shardVersionMetadata)) {
+        // we need to pull before indexing therefore we need to upgrade to write lock
+        // we have to release read lock before we can acquire write lock
+        corePullLock.readLock().unlock();
+        boolean reacquireReadLock = true;
+        try {
+          // There is a likelihood that many indexing requests came at once and realized we are out of sync.
+          // They all would try to acquire write lock. One of them makes progress to pull from shared store.
+          // After that regular indexing will see soft guarantee of equality and moves straight to indexing
+          // under read lock. Now it is possible that new indexing keeps coming in and read lock is never free.
+          // In that case the poor guys that came in earlier and wanted to pull will still be struggling(starving) to
+          // acquire write lock. Since we know that write lock is only needed by one to do the work, we will
+          // try time boxed acquisition and in case of failed acquisition we will see if some one else has already completed the pull.
+          // We will make few attempts before we bail out. Ideally bail out scenario should never happen.
+          // If it does then either we are too slow in pulling and can tune following parameters or something else is wrong.
+          int attempt = 1;
+          while (true) {
+            try {
+              // try acquiring write lock
+              if (corePullLock.writeLock().tryLock(SharedCoreConcurrencyController.SECONDS_TO_WAIT_INDEXING_PULL_WRITE_LOCK, TimeUnit.SECONDS)) {
+                try {
+                  // in between upgrading locks things might have updated, should reestablish if pull is still needed
+                  coreVersionMetadata = concurrencyController.getCoreVersionMetadata(collectionName, shardName, coreName);
+                  if (!coreVersionMetadata.isSoftGuaranteeOfEquality()) {
+                    shardVersionMetadata = metadataController.readMetadataValue(collectionName, shardName);
+                    if (!concurrencyController.areVersionsEqual(coreVersionMetadata, shardVersionMetadata)) {
+                      BlobStoreUtils.syncLocalCoreWithSharedStore(collectionName, coreName, shardName, coreContainer, shardVersionMetadata, /* isLeaderSyncing */true);
+                    }
+                  }
+                  // reacquire read lock for the remainder of indexing before releasing write lock that was acquired for pull part
+                  corePullLock.readLock().lock();
+                  reacquireReadLock = false;
+                } finally {
+                  corePullLock.writeLock().unlock();
+                }
+                // write lock acquisition was successful and we are in sync with shared store
+                break;
+              } else {
+                // we could not acquire write lock but see if some other thread has already done the pulling
+                coreVersionMetadata = concurrencyController.getCoreVersionMetadata(collectionName, shardName, coreName);
+                if (coreVersionMetadata.isSoftGuaranteeOfEquality()) {
+                  log.info(String.format("Indexing thread waited to acquire to write lock and could not. " +
+                          "But someone else has done the pulling so we are good. attempt=%s collection=%s shard=%s core=%s",
+                      attempt, collectionName, shardName, coreName));
+                  break;
+                }
+                // no one else has pulled yet either, lets make another attempt ourselves
+                attempt++;
+                if (attempt > SharedCoreConcurrencyController.MAX_ATTEMPTS_INDEXING_PULL_WRITE_LOCK) {
+                  throw new SolrException(ErrorCode.SERVER_ERROR, String.format("Indexing thread failed to acquire write lock for pull in %s seconds. " +
+                          "And no one else either has done the pull during that time. collection=%s shard=%s core=%s",
+                      Integer.toString(SharedCoreConcurrencyController.SECONDS_TO_WAIT_INDEXING_PULL_WRITE_LOCK * SharedCoreConcurrencyController.MAX_ATTEMPTS_INDEXING_PULL_WRITE_LOCK),
+                      collectionName, shardName, coreName));
+                }
+              }
+            } catch (InterruptedException ie) {
+              Thread.currentThread().interrupt();
+              throw new SolrException(ErrorCode.SERVER_ERROR, String.format("Indexing thread interrupted while trying to acquire pull write lock." +
+                  " collection=%s shard=%s core=%s", collectionName, shardName, coreName), ie);
+            }
+          }
+        } finally {
+          // we should always leave with read lock acquired(failure or success), since it is the job of close method to release it
+          if (reacquireReadLock) {
+            corePullLock.readLock().lock();
+          }
+        }
+      }
+    }
+    concurrencyController.recordState(collectionName, shardName, coreName, SharedCoreStage.LocalIndexingStarted);
+  }
 
   // TODO: optionally fail if n replicas are not reached...
   protected void doDistribFinish() {
@@ -1138,7 +1248,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
        * what the DUP is concerned about so we may want to consider moving this code somewhere more appropriate
        * in the future (deeper in the stack)
        */
-      writeToShareStore();
+      writeToSharedStore();
     }
 
     // TODO: if not a forward and replication req is not specified, we could
diff --git a/solr/core/src/test/org/apache/solr/store/blob/SharedStorageSplitTest.java b/solr/core/src/test/org/apache/solr/store/blob/SharedStorageSplitTest.java
index e29bad9..9a58ebd 100644
--- a/solr/core/src/test/org/apache/solr/store/blob/SharedStorageSplitTest.java
+++ b/solr/core/src/test/org/apache/solr/store/blob/SharedStorageSplitTest.java
@@ -147,7 +147,7 @@ public class SharedStorageSplitTest extends SolrCloudSharedStoreTestCase  {
           // ensure we count down for all replicas per slice.
           String sharedShardName = (String) slice.getProperties().get(ZkStateReader.SHARED_SHARD_NAME);
           solrProcessesTaskTracker.get(replica.getNodeName())
-            .put(sharedShardName, cdl);
+            .put(replica.getCoreName(), cdl);
           SolrClient replicaClient = getHttpSolrClient(replica.getBaseUrl() + "/" + replica.getCoreName());
           try {
             replicaClient.query(params("q", "*:* priming pull", "distrib", "false"));
diff --git a/solr/core/src/test/org/apache/solr/store/blob/metadata/CorePushPullTest.java b/solr/core/src/test/org/apache/solr/store/blob/metadata/CorePushPullTest.java
index 9ccbf39..de31b62 100644
--- a/solr/core/src/test/org/apache/solr/store/blob/metadata/CorePushPullTest.java
+++ b/solr/core/src/test/org/apache/solr/store/blob/metadata/CorePushPullTest.java
@@ -24,6 +24,7 @@ import java.util.stream.Collectors;
 import org.apache.commons.io.FileUtils;
 import org.apache.lucene.index.IndexCommit;
 import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.cloud.api.collections.Assign;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.store.blob.client.BlobCoreMetadata;
@@ -33,6 +34,10 @@ import org.apache.solr.store.blob.client.LocalStorageClient;
 import org.apache.solr.store.blob.metadata.SharedStoreResolutionUtil.SharedMetadataResolutionResult;
 import org.apache.solr.store.blob.process.BlobDeleteManager;
 import org.apache.solr.store.blob.util.BlobStoreUtils;
+import org.apache.solr.store.shared.SharedCoreConcurrencyController;
+import org.apache.solr.store.shared.SharedCoreConcurrencyController.SharedCoreVersionMetadata;
+import org.apache.solr.store.shared.metadata.SharedShardMetadataController;
+import org.apache.solr.store.shared.metadata.SharedShardMetadataController.SharedShardVersionMetadata;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -95,7 +100,7 @@ public class CorePushPullTest extends SolrTestCaseJ4 {
     
     // verify an exception is thrown
     try {
-      pushPull.pushToBlobStore();
+      pushPull.pushToBlobStore(null, null);
       fail("pushToBlobStore should have thrown an exception");
     } catch (Exception ex) {
       // core missing from core container should throw exception
@@ -262,8 +267,20 @@ public class CorePushPullTest extends SolrTestCaseJ4 {
   }
 
   private BlobCoreMetadata doPush(SolrCore core) throws Exception {
+    String sharedBlobName = Assign.buildSharedShardName(collectionName, shardName);
+    // initialize metadata info to match initial zk version
+    SharedCoreConcurrencyController concurrencyController =  new SharedCoreConcurrencyController(null){
+      @Override
+      protected void ensureShardVersionMetadataNodeExists(String collectionName, String shardName) {
+        
+      }
+    };
+    concurrencyController.updateCoreVersionMetadata(collectionName, shardName, core.getName(), 
+        new SharedShardVersionMetadata(0, SharedShardMetadataController.METADATA_NODE_DEFAULT_VALUE),
+        BlobCoreMetadataBuilder.buildEmptyCoreMetadata(sharedBlobName));
+
     // build the require metadata
-    ServerSideMetadata solrServerMetadata = new ServerSideMetadata(core.getName(), h.getCoreContainer());
+    ServerSideMetadata solrServerMetadata = new ServerSideMetadata(core.getName(), h.getCoreContainer(), /* takeSnapshot */ true);
     
     // empty bcm means we should push everything we have locally 
     BlobCoreMetadata bcm = BlobCoreMetadataBuilder.buildEmptyCoreMetadata(sharedBlobName);
@@ -274,9 +291,6 @@ public class CorePushPullTest extends SolrTestCaseJ4 {
         .setShardName(shardName)
         .setCoreName(core.getName())
         .setSharedStoreName(sharedBlobName)
-        .setLastReadMetadataSuffix(metadataSuffix)
-        .setNewMetadataSuffix(randomSuffix)
-        .setZkVersion(1)
         .build();
     SharedMetadataResolutionResult resResult = SharedStoreResolutionUtil.resolveMetadata(solrServerMetadata, bcm);
     
@@ -288,22 +302,19 @@ public class CorePushPullTest extends SolrTestCaseJ4 {
         return;
       }
     };
-    return pushPull.pushToBlobStore();
+    SharedCoreVersionMetadata coreVersionMetadata = concurrencyController.getCoreVersionMetadata(collectionName, shardName, core.getName());
+    return pushPull.pushToBlobStore(coreVersionMetadata.getMetadataSuffix(), randomSuffix);
   }
   
   private SharedMetadataResolutionResult doPull(SolrCore core, BlobCoreMetadata bcm) throws Exception {
     // build the require metadata
     ServerSideMetadata solrServerMetadata = new ServerSideMetadata(core.getName(), h.getCoreContainer());
     
-    String randomSuffix = BlobStoreUtils.generateMetadataSuffix();
     PushPullData ppd = new PushPullData.Builder()
         .setCollectionName(collectionName)
         .setShardName(shardName)
         .setCoreName(core.getName())
         .setSharedStoreName(sharedBlobName)
-        .setLastReadMetadataSuffix(metadataSuffix)
-        .setNewMetadataSuffix(randomSuffix)
-        .setZkVersion(1)
         .build();
     SharedMetadataResolutionResult resResult = SharedStoreResolutionUtil.resolveMetadata(solrServerMetadata, bcm);
     
diff --git a/solr/core/src/test/org/apache/solr/store/blob/metadata/SharedStoreResolutionUtilTest.java b/solr/core/src/test/org/apache/solr/store/blob/metadata/SharedStoreResolutionUtilTest.java
index d145d7c..c4b69d9 100644
--- a/solr/core/src/test/org/apache/solr/store/blob/metadata/SharedStoreResolutionUtilTest.java
+++ b/solr/core/src/test/org/apache/solr/store/blob/metadata/SharedStoreResolutionUtilTest.java
@@ -16,15 +16,13 @@
  */
 package org.apache.solr.store.blob.metadata;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.ImmutableSet;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.store.blob.client.BlobCoreMetadata;
@@ -35,7 +33,8 @@ import org.apache.solr.store.blob.metadata.SharedStoreResolutionUtil.SharedMetad
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import com.google.common.collect.ImmutableSet;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Unit tests for {@link SharedStoreResolutionUtil}.
@@ -71,7 +70,7 @@ public class SharedStoreResolutionUtilTest extends SolrTestCaseJ4 {
       
     // expected resolution
     SharedMetadataResolutionResult expectedResult = new 
-        SharedMetadataResolutionResult(Collections.emptySet(), Collections.emptySet(), false);
+        SharedMetadataResolutionResult(Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), false);
     
     // do resolution
     SharedMetadataResolutionResult actual = SharedStoreResolutionUtil.resolveMetadata(null, blobMetadata);
@@ -94,7 +93,7 @@ public class SharedStoreResolutionUtilTest extends SolrTestCaseJ4 {
       
     // expected resolution
     SharedMetadataResolutionResult expectedResult = new 
-        SharedMetadataResolutionResult(serverMetadata.getLatestCommitFiles(), Collections.emptySet(), false);
+        SharedMetadataResolutionResult(serverMetadata.getLatestCommitFiles(), Collections.emptySet(), Collections.emptySet(), false);
     
     // do resolution
     SharedMetadataResolutionResult actual = SharedStoreResolutionUtil.resolveMetadata(serverMetadata, null);
@@ -117,7 +116,7 @@ public class SharedStoreResolutionUtilTest extends SolrTestCaseJ4 {
 
     // expected resolution
     SharedMetadataResolutionResult expectedResult = new 
-        SharedMetadataResolutionResult(Collections.emptySet(), Arrays.asList(blobMetadata.getBlobFiles()), false);
+        SharedMetadataResolutionResult(Collections.emptySet(), Arrays.asList(blobMetadata.getBlobFiles()), Collections.emptySet(), false);
     
     // do resolution
     SharedMetadataResolutionResult actual = SharedStoreResolutionUtil.resolveMetadata(null, blobMetadata);
@@ -182,7 +181,7 @@ public class SharedStoreResolutionUtilTest extends SolrTestCaseJ4 {
     
     // expected resolution
     SharedMetadataResolutionResult expectedResult = new 
-        SharedMetadataResolutionResult(Arrays.asList(expectedFileToPush), Collections.emptySet(), false);
+        SharedMetadataResolutionResult(Arrays.asList(expectedFileToPush), Collections.emptySet(), Collections.emptySet(), false);
     
     // do resolution
     SharedMetadataResolutionResult actual = SharedStoreResolutionUtil.resolveMetadata(serverMetadata, blobMetadata);
@@ -212,7 +211,7 @@ public class SharedStoreResolutionUtilTest extends SolrTestCaseJ4 {
     
     // expected resolution
     SharedMetadataResolutionResult expectedResult = new 
-        SharedMetadataResolutionResult(Collections.emptySet(), Arrays.asList(expectedFileToPull), false);
+        SharedMetadataResolutionResult(Collections.emptySet(), Arrays.asList(expectedFileToPull), Collections.emptySet(), false);
     
     // do resolution
     SharedMetadataResolutionResult actual = SharedStoreResolutionUtil.resolveMetadata(serverMetadata, blobMetadata);
@@ -245,7 +244,7 @@ public class SharedStoreResolutionUtilTest extends SolrTestCaseJ4 {
     
     // expected resolution
     SharedMetadataResolutionResult expectedResult = new 
-        SharedMetadataResolutionResult(Arrays.asList(expectedFileToPush), Arrays.asList(expectedFileToPull), false);
+        SharedMetadataResolutionResult(Arrays.asList(expectedFileToPush), Arrays.asList(expectedFileToPull), Collections.emptySet(), false);
     
     // do resolution
     SharedMetadataResolutionResult actual = SharedStoreResolutionUtil.resolveMetadata(serverMetadata, blobMetadata);
@@ -276,7 +275,7 @@ public class SharedStoreResolutionUtilTest extends SolrTestCaseJ4 {
 
     // expected resolution
     SharedMetadataResolutionResult expectedResult = new
-        SharedMetadataResolutionResult(Arrays.asList(expectedFileToPush), Arrays.asList(blobMetadata.getBlobFiles()), true);
+        SharedMetadataResolutionResult(Arrays.asList(expectedFileToPush), Arrays.asList(blobMetadata.getBlobFiles()), Collections.emptySet(), true);
 
     // do resolution
     SharedMetadataResolutionResult actual = SharedStoreResolutionUtil.resolveMetadata(serverMetadata, blobMetadata);
@@ -303,7 +302,7 @@ public class SharedStoreResolutionUtilTest extends SolrTestCaseJ4 {
 
     // expected resolution
     SharedMetadataResolutionResult expectedResult = new
-        SharedMetadataResolutionResult(Collections.emptySet(), Arrays.asList(blobMetadata.getBlobFiles()), true);
+        SharedMetadataResolutionResult(Collections.emptySet(), Arrays.asList(blobMetadata.getBlobFiles()), Collections.emptySet(), true);
 
     // do resolution
     SharedMetadataResolutionResult actual = SharedStoreResolutionUtil.resolveMetadata(serverMetadata, blobMetadata);
@@ -335,7 +334,7 @@ public class SharedStoreResolutionUtilTest extends SolrTestCaseJ4 {
 
     // expected resolution
     SharedMetadataResolutionResult expectedResult = new
-        SharedMetadataResolutionResult(Arrays.asList(expectedFileToPush), Arrays.asList(blobMetadata.getBlobFiles()), true);
+        SharedMetadataResolutionResult(Arrays.asList(expectedFileToPush), Arrays.asList(blobMetadata.getBlobFiles()), Collections.emptySet(), true);
 
     // do resolution
     SharedMetadataResolutionResult actual = SharedStoreResolutionUtil.resolveMetadata(serverMetadata, blobMetadata);
diff --git a/solr/core/src/test/org/apache/solr/store/blob/process/PullMergeDeduplicationTest.java b/solr/core/src/test/org/apache/solr/store/blob/process/PullMergeDeduplicationTest.java
index 9f6d129..1bfcb55 100644
--- a/solr/core/src/test/org/apache/solr/store/blob/process/PullMergeDeduplicationTest.java
+++ b/solr/core/src/test/org/apache/solr/store/blob/process/PullMergeDeduplicationTest.java
@@ -16,10 +16,7 @@
  */
 package org.apache.solr.store.blob.process;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
+import com.google.common.collect.Sets;
 import org.apache.solr.store.blob.client.BlobCoreMetadata;
 import org.apache.solr.store.blob.process.CorePullTask.PullCoreCallback;
 import org.apache.solr.store.blob.process.CorePullTask.PullTaskMerger;
@@ -28,8 +25,8 @@ import org.apache.solr.store.blob.process.CorePullerFeeder.PullCoreInfoMerger;
 import org.apache.solr.store.blob.util.DeduplicatingList;
 import org.junit.Test;
 
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
 /**
  * Tests for {@link DeduplicatingList} using {@link PullTaskMerger} and
@@ -37,9 +34,6 @@ import com.google.common.collect.Sets;
  */
 public class PullMergeDeduplicationTest {
   
-  // not relevant in pulls but value required
-  private static final String NEW_METADATA_SUFFIX = "undefined";
-  
   private final String COLLECTION_NAME_1 = "collection1";
   private final String COLLECTION_NAME_2 = "collection2";
   private final String SHARD_NAME = "_shard_";
@@ -55,69 +49,28 @@ public class PullMergeDeduplicationTest {
     PullCoreInfoMerger merger = new CorePullerFeeder.PullCoreInfoMerger();
     
     // merging two exactly the same PCI should return exactly the same
-    String lastReadMetadataSuffix = "randomString";
-    PullCoreInfo pullCoreInfo1 = getTestPullCoreInfo(COLLECTION_NAME_1, 
-        lastReadMetadataSuffix, NEW_METADATA_SUFFIX, 5, true, true);
-    PullCoreInfo pullCoreInfo2 = getTestPullCoreInfo(COLLECTION_NAME_1, 
-        lastReadMetadataSuffix, NEW_METADATA_SUFFIX, 5, true, true);
+    PullCoreInfo pullCoreInfo1 = getTestPullCoreInfo(COLLECTION_NAME_1, true, true);
+    PullCoreInfo pullCoreInfo2 = getTestPullCoreInfo(COLLECTION_NAME_1, true, true);
     
     PullCoreInfo mergedPullCoreInfo = merger.merge(pullCoreInfo1, pullCoreInfo2);
     assertPullCoreInfo(mergedPullCoreInfo, pullCoreInfo2); 
     
     // boolean flags on merge flip
-    pullCoreInfo1 = getTestPullCoreInfo(COLLECTION_NAME_1, 
-        lastReadMetadataSuffix, NEW_METADATA_SUFFIX, 5, false, true);
-    pullCoreInfo2 = getTestPullCoreInfo(COLLECTION_NAME_1, 
-        lastReadMetadataSuffix, NEW_METADATA_SUFFIX, 5, true, false);
+    pullCoreInfo1 = getTestPullCoreInfo(COLLECTION_NAME_1, false, true);
+    pullCoreInfo2 = getTestPullCoreInfo(COLLECTION_NAME_1, true, false);
     
     mergedPullCoreInfo = merger.merge(pullCoreInfo1, pullCoreInfo2);
     assertPullCoreInfo(
-        getTestPullCoreInfo(COLLECTION_NAME_1, lastReadMetadataSuffix, NEW_METADATA_SUFFIX, 5, true, true), 
+        getTestPullCoreInfo(COLLECTION_NAME_1, true, true), 
         mergedPullCoreInfo);
     
     // boolean flags on merge flip
-    pullCoreInfo1 = getTestPullCoreInfo(COLLECTION_NAME_1, 
-        lastReadMetadataSuffix, NEW_METADATA_SUFFIX, 5, true, false);
-    pullCoreInfo2 = getTestPullCoreInfo(COLLECTION_NAME_1, 
-        lastReadMetadataSuffix, NEW_METADATA_SUFFIX, 5, false, true);
-    
-    mergedPullCoreInfo = merger.merge(pullCoreInfo1, pullCoreInfo2);
-    assertPullCoreInfo(
-        getTestPullCoreInfo(COLLECTION_NAME_1, lastReadMetadataSuffix, NEW_METADATA_SUFFIX, 5, true, true), 
-        mergedPullCoreInfo);
-    
-    // ensure the booleans are merged correctly when versions are equivalent 
-    pullCoreInfo1 = getTestPullCoreInfo(COLLECTION_NAME_1, 
-        lastReadMetadataSuffix, NEW_METADATA_SUFFIX, 5, true, false);
-    pullCoreInfo2 = getTestPullCoreInfo(COLLECTION_NAME_1, 
-        lastReadMetadataSuffix, NEW_METADATA_SUFFIX, 5, true, false);
-    mergedPullCoreInfo = merger.merge(pullCoreInfo1, pullCoreInfo2);
-    assertPullCoreInfo(
-        getTestPullCoreInfo(COLLECTION_NAME_1, lastReadMetadataSuffix, NEW_METADATA_SUFFIX, 5, true, false), 
-        mergedPullCoreInfo);
-    
-    // somehow if the versions are the same but lastReadMetadataSuffix are not, we fail 
-    // due to programming error
-    pullCoreInfo1 = getTestPullCoreInfo(COLLECTION_NAME_1, 
-        "notRandomString", NEW_METADATA_SUFFIX, 5, false, false);
-    pullCoreInfo2 = getTestPullCoreInfo(COLLECTION_NAME_1, 
-        lastReadMetadataSuffix, NEW_METADATA_SUFFIX, 5, false, false );
-    try {
-      merger.merge(pullCoreInfo1, pullCoreInfo2);
-      fail();
-    } catch (Throwable e) {
-      // success
-    }
-    
-    // higher version wins 
-    pullCoreInfo1 = getTestPullCoreInfo(COLLECTION_NAME_1, 
-        "differentSuffix", NEW_METADATA_SUFFIX, 6, true, true);
-    pullCoreInfo2 = getTestPullCoreInfo(COLLECTION_NAME_1, 
-        lastReadMetadataSuffix, NEW_METADATA_SUFFIX, 5, false, false);
+    pullCoreInfo1 = getTestPullCoreInfo(COLLECTION_NAME_1, true, false);
+    pullCoreInfo2 = getTestPullCoreInfo(COLLECTION_NAME_1, false, true);
     
     mergedPullCoreInfo = merger.merge(pullCoreInfo1, pullCoreInfo2);
     assertPullCoreInfo(
-        getTestPullCoreInfo(COLLECTION_NAME_1, "differentSuffix", NEW_METADATA_SUFFIX, 6, true, true), 
+        getTestPullCoreInfo(COLLECTION_NAME_1, true, true), 
         mergedPullCoreInfo);
   }
   
@@ -130,9 +83,7 @@ public class PullMergeDeduplicationTest {
     DeduplicatingList<String, PullCoreInfo> dedupList = 
         new DeduplicatingList<>(10, new CorePullerFeeder.PullCoreInfoMerger());
     
-    String lastReadMetadataSuffix = "randomString";
-    PullCoreInfo pullCoreInfo = getTestPullCoreInfo(COLLECTION_NAME_1, 
-        lastReadMetadataSuffix, NEW_METADATA_SUFFIX, 5, true, true);
+    PullCoreInfo pullCoreInfo = getTestPullCoreInfo(COLLECTION_NAME_1, true, true);
     dedupList.addDeduplicated(pullCoreInfo, false);
     assertEquals(1, dedupList.size());
     
@@ -141,31 +92,29 @@ public class PullMergeDeduplicationTest {
     assertEquals(1, dedupList.size());
     
     assertPullCoreInfo(
-        getTestPullCoreInfo(COLLECTION_NAME_1, lastReadMetadataSuffix, NEW_METADATA_SUFFIX, 5, true, true), 
+        getTestPullCoreInfo(COLLECTION_NAME_1, true, true), 
         dedupList.removeFirst());
     
     // add a different PCI that should still be merged
     dedupList.addDeduplicated(pullCoreInfo, false);
     
-    pullCoreInfo = getTestPullCoreInfo(COLLECTION_NAME_1, 
-        "differentString", NEW_METADATA_SUFFIX, 6, false, false);
+    pullCoreInfo = getTestPullCoreInfo(COLLECTION_NAME_1, false, false);
     dedupList.addDeduplicated(pullCoreInfo, false);
     
     assertEquals(1, dedupList.size());
     assertPullCoreInfo(
-        getTestPullCoreInfo(COLLECTION_NAME_1, "differentString", NEW_METADATA_SUFFIX, 6, false, false), 
+        getTestPullCoreInfo(COLLECTION_NAME_1, true, true), 
         dedupList.removeFirst());
     
     dedupList.addDeduplicated(pullCoreInfo, false);
-    pullCoreInfo = getTestPullCoreInfo(COLLECTION_NAME_2, 
-        "differentString", NEW_METADATA_SUFFIX, 6, false, false);
+    pullCoreInfo = getTestPullCoreInfo(COLLECTION_NAME_2, false, false);
     // add a different PCI that should not be merged
     dedupList.addDeduplicated(pullCoreInfo, false);
     assertEquals(2, dedupList.size());
     
     // check the integrity of the first 
     assertPullCoreInfo(
-        getTestPullCoreInfo(COLLECTION_NAME_1, "differentString", NEW_METADATA_SUFFIX, 6, false, false), 
+        getTestPullCoreInfo(COLLECTION_NAME_1, false, false), 
         dedupList.removeFirst());
   }
   
@@ -240,21 +189,18 @@ public class PullMergeDeduplicationTest {
   private CorePullTask getTestCorePullTask(String dedupKey, long queuedTimeMs, int attempts, long lastAttemptTimestamp, PullCoreCallback callback) {
     // assume PullCoreInfo merging works correctly, just create the same static one per CorePullTask
     // with the given dedup key
-    PullCoreInfo pullCoreInfo = getTestPullCoreInfo(dedupKey, 
-        "randomString", NEW_METADATA_SUFFIX, 5, true, true);
+    PullCoreInfo pullCoreInfo = getTestPullCoreInfo(dedupKey, true, true);
     
     CorePullTask pullTask = new CorePullTask(/* coreContainer */ null, pullCoreInfo, queuedTimeMs,
-        attempts, lastAttemptTimestamp, /* callback */ callback, Maps.newHashMap(), Sets.newHashSet());
+        attempts, lastAttemptTimestamp, /* callback */ callback, Sets.newHashSet());
     return pullTask;
   }
   
-  private PullCoreInfo getTestPullCoreInfo(String collectionName, String lastReadMetadataSuffix,
-      String newMetadataSuffix, int version, boolean createCoreIfAbsent, boolean waitForSearcher) {
+  private PullCoreInfo getTestPullCoreInfo(String collectionName, boolean createCoreIfAbsent, boolean waitForSearcher) {
     return new PullCoreInfo(collectionName, 
         collectionName + SHARD_NAME,
         collectionName + CORE_NAME,
         collectionName + SHARED_NAME,
-        lastReadMetadataSuffix, newMetadataSuffix, version, 
         createCoreIfAbsent, waitForSearcher);
   }
   
@@ -271,8 +217,6 @@ public class PullMergeDeduplicationTest {
     assertEquals("PullCoreInfos core names do not match", expected.getCoreName(), actual.getCoreName());
     assertEquals("PullCoreInfos shard names do not match", expected.getShardName(), actual.getShardName());
     assertEquals("PullCoreInfos sharedStore names do not match", expected.getSharedStoreName(), actual.getSharedStoreName());
-    assertEquals("PullCoreInfos lastReadMetadataSuffix do not match", expected.getLastReadMetadataSuffix(), actual.getLastReadMetadataSuffix());
-    assertEquals("PullCoreInfos zkVersions do not match", expected.getZkVersion(), actual.getZkVersion());
     assertEquals("PullCoreInfos shouldCreateCoreifAbsent flag do not match", expected.shouldCreateCoreIfAbsent(), actual.shouldCreateCoreIfAbsent());
     assertEquals("PullCoreInfos shouldWaitForSearcher flag do not match", expected.shouldWaitForSearcher(), actual.shouldWaitForSearcher());
   }
diff --git a/solr/core/src/test/org/apache/solr/store/blob/util/BlobStoreUtilsTest.java b/solr/core/src/test/org/apache/solr/store/blob/util/BlobStoreUtilsTest.java
index 3cb3c86..33fa342 100644
--- a/solr/core/src/test/org/apache/solr/store/blob/util/BlobStoreUtilsTest.java
+++ b/solr/core/src/test/org/apache/solr/store/blob/util/BlobStoreUtilsTest.java
@@ -17,10 +17,6 @@
 
 package org.apache.solr.store.blob.util;
 
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-
 import java.nio.file.Path;
 import java.util.UUID;
 
@@ -38,11 +34,16 @@ import org.apache.solr.core.SolrCore;
 import org.apache.solr.store.blob.client.CoreStorageClient;
 import org.apache.solr.store.shared.SolrCloudSharedStoreTestCase;
 import org.apache.solr.store.shared.metadata.SharedShardMetadataController;
+import org.apache.solr.store.shared.metadata.SharedShardMetadataController.SharedShardVersionMetadata;
 import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
 /**
  * Unit tests for {@link BlobStoreUtils}
  */
@@ -91,7 +92,8 @@ public class BlobStoreUtilsTest extends SolrCloudSharedStoreTestCase {
     
     CoreStorageClient blobClientSpy = Mockito.spy(storageClient);    
     try {
-      BlobStoreUtils.syncLocalCoreWithSharedStore(collectionName, newReplica.getCoreName(), shardName, cc);
+      SharedShardVersionMetadata shardVersionMetadata = new SharedShardVersionMetadata(0, SharedShardMetadataController.METADATA_NODE_DEFAULT_VALUE);
+      BlobStoreUtils.syncLocalCoreWithSharedStore(collectionName, newReplica.getCoreName(), shardName, cc, shardVersionMetadata, true);
       verify(blobClientSpy, never()).pullCoreMetadata(anyString(), anyString());
     } catch (Exception ex){
       fail("syncLocalCoreWithSharedStore failed with exception: " + ex.getMessage());
@@ -116,11 +118,9 @@ public class BlobStoreUtilsTest extends SolrCloudSharedStoreTestCase {
     newReplica = collection.getReplicas().get(0);
     cc = getCoreContainer(newReplica.getNodeName());
     
-    SharedShardMetadataController sharedMetadataController = cc.getSharedStoreManager().getSharedShardMetadataController();
-    sharedMetadataController.ensureMetadataNodeExists(collectionName, shardName);
-    sharedMetadataController.updateMetadataValueWithVersion(collectionName, shardName, UUID.randomUUID().toString(), -1);
     try {
-      BlobStoreUtils.syncLocalCoreWithSharedStore(collectionName, newReplica.getCoreName(), shardName, cc);
+      SharedShardVersionMetadata shardVersionMetadata = new SharedShardVersionMetadata(0, UUID.randomUUID().toString());
+      BlobStoreUtils.syncLocalCoreWithSharedStore(collectionName, newReplica.getCoreName(), shardName, cc, shardVersionMetadata, true);
       fail("syncLocalCoreWithSharedStore should throw exception if shared store doesn't have the core.metadata file.");
     } catch (Exception ex){
       String expectedException = "cannot get core.metadata file from shared store";
@@ -156,8 +156,10 @@ public class BlobStoreUtilsTest extends SolrCloudSharedStoreTestCase {
     req.add(doc);
     req.commit(cloudClient, collectionName);
     try {
+      SharedShardMetadataController metadataController = cc.getSharedStoreManager().getSharedShardMetadataController();
+      SharedShardVersionMetadata shardVersionMetadata = metadataController.readMetadataValue(collectionName, shardName);
       // we push and already have the latest updates so we should not pull here
-      BlobStoreUtils.syncLocalCoreWithSharedStore(collectionName, newReplica.getCoreName(), shardName, cc);
+      BlobStoreUtils.syncLocalCoreWithSharedStore(collectionName, newReplica.getCoreName(), shardName, cc, shardVersionMetadata, true);
       verify(blobClientSpy, never()).pullCoreMetadata(anyString(), anyString());
     } catch (Exception ex) { 
       fail("syncLocalCoreWithSharedStore failed with exception: " + ex.getMessage());
@@ -209,8 +211,10 @@ public class BlobStoreUtilsTest extends SolrCloudSharedStoreTestCase {
     assertEquals(1, core.getDeletionPolicy().getLatestCommit().getFileNames().size());
     
     try {
+      SharedShardMetadataController metadataController = cc.getSharedStoreManager().getSharedShardMetadataController();
+      SharedShardVersionMetadata shardVersionMetadata = metadataController.readMetadataValue(collectionName, shardName);
       // we pushed on the leader, try sync on the follower
-      BlobStoreUtils.syncLocalCoreWithSharedStore(collectionName, follower.getCoreName(), shardName, cc);
+      BlobStoreUtils.syncLocalCoreWithSharedStore(collectionName, follower.getCoreName(), shardName, cc, shardVersionMetadata, true);
       
       // did we pull?
       assertTrue(core.getDeletionPolicy().getLatestCommit().getFileNames().size() > 1);
diff --git a/solr/core/src/test/org/apache/solr/store/shared/SharedCoreConcurrencyTest.java b/solr/core/src/test/org/apache/solr/store/shared/SharedCoreConcurrencyTest.java
new file mode 100644
index 0000000..3bce31e
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/store/shared/SharedCoreConcurrencyTest.java
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.store.shared;
+
+import java.lang.invoke.MethodHandles;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.store.blob.client.BlobCoreMetadata;
+import org.apache.solr.store.blob.client.CoreStorageClient;
+import org.apache.solr.store.blob.process.BlobProcessUtil;
+import org.apache.solr.store.blob.process.CorePullTask;
+import org.apache.solr.store.blob.process.CorePullerFeeder;
+import org.apache.solr.store.blob.process.CoreSyncStatus;
+import org.apache.solr.store.shared.SharedCoreConcurrencyController.SharedCoreStage;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests around synchronization of concurrent indexing, pushes and pulls
+ * happening on a core of a shared collection {@link DocCollection#getSharedIndex()}
+ * todo: add tests for failover scenarios and involve query pulls 
+ */
+public class SharedCoreConcurrencyTest extends SolrCloudSharedStoreTestCase {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static Path sharedStoreRootPath;
+
+  @BeforeClass
+  public static void setupClass() throws Exception {
+    sharedStoreRootPath = createTempDir("tempDir");
+  }
+
+  /**
+   * Tests issuing random number of concurrent indexing requests in a given range for a shared core and making sure they all succeed.
+   */
+  @Test
+  public void testHighConcurrentIndexing() throws Exception {
+    int maxIndexingThreads = 100;
+    int maxDocsPerThread = 100;
+    testConcurrentIndexing(maxIndexingThreads, maxDocsPerThread);
+  }
+
+  /**
+   * Test ensuring two indexing requests interleave in desired ways and succeed.
+   */
+  //  @Test todo: leaking threads, test only issue
+  public void TODOtestPossibleInterleaving() throws Exception {
+
+    // completely serialized
+    testConcurrentIndexing(
+        new IndexingThreadInterleaver(
+            SharedCoreStage.BlobPushFinished,
+            SharedCoreStage.LocalIndexingStarted,
+            true));
+
+    // second pushes for first one too
+    testConcurrentIndexing(
+        new IndexingThreadInterleaver(
+            SharedCoreStage.LocalIndexingFinished, 
+            SharedCoreStage.BlobPushFinished,
+            true));
+  }
+
+  /**
+   * Test ensuring two indexing requests do not interleave in impossible ways even when forced and still succeed.
+   */
+  //  @Test todo: leaking threads, test only issue
+  public void TODOtestImpossibleInterleaving() throws Exception {
+    // push critical section
+    testConcurrentIndexing(
+        new IndexingThreadInterleaver(
+            SharedCoreStage.ZkUpdateFinished,
+            SharedCoreStage.ZkUpdateFinished,
+            false));
+
+    // another push critical section
+    testConcurrentIndexing(
+        new IndexingThreadInterleaver(
+            SharedCoreStage.ZkUpdateFinished,
+            SharedCoreStage.LocalCacheUpdateFinished,
+            false));
+  }
+
+  private void testConcurrentIndexing(int maxIndexingThreads, int maxDocsPerThread) throws Exception {
+    int numIndexingThreads = new Random().nextInt(maxIndexingThreads) + 1;;
+    testConcurrentIndexing(numIndexingThreads, maxDocsPerThread, null);
+  }
+
+  private void testConcurrentIndexing(IndexingThreadInterleaver interleaver) throws Exception {
+    testConcurrentIndexing(2, 10, interleaver);
+  }
+
+  /**
+   * Start desired number of concurrent indexing threads with each indexing random number of docs between 1 and maxDocsPerThread.
+   *
+   * At the end it verifies everything got indexed successfully on leader. No critical section got breached.
+   * Also verify the integrity of shared store contents by pulling them on a follower replica.
+   */
+  private void testConcurrentIndexing(int numIndexingThreads, int maxDocsPerThread, IndexingThreadInterleaver interleaver) throws Exception {
+    setupCluster(2);
+    CloudSolrClient cloudClient = cluster.getSolrClient();
+
+    // this map tracks the async pull queues per solr process
+    Map<String, Map<String, CountDownLatch>> solrProcessesTaskTracker = new HashMap<>();
+
+    JettySolrRunner solrProcess1 = cluster.getJettySolrRunner(0);
+    CoreStorageClient storageClient1 = setupLocalBlobStoreClient(sharedStoreRootPath, DEFAULT_BLOB_DIR_NAME);
+    setupTestSharedClientForNode(getBlobStorageProviderTestInstance(storageClient1), solrProcess1);
+    Map<String, CountDownLatch> asyncPullLatches1 = configureTestBlobProcessForNode(solrProcess1);
+
+    JettySolrRunner solrProcess2 = cluster.getJettySolrRunner(1);
+    CoreStorageClient storageClient2 = setupLocalBlobStoreClient(sharedStoreRootPath, DEFAULT_BLOB_DIR_NAME);
+    setupTestSharedClientForNode(getBlobStorageProviderTestInstance(storageClient2), solrProcess2);
+    Map<String, CountDownLatch> asyncPullLatches2 = configureTestBlobProcessForNode(solrProcess2);
+
+    solrProcessesTaskTracker.put(solrProcess1.getNodeName(), asyncPullLatches1);
+    solrProcessesTaskTracker.put(solrProcess2.getNodeName(), asyncPullLatches2);
+
+    String collectionName = "sharedCollection";
+    int maxShardsPerNode = 1;
+    int numReplicas = 2;
+    // specify a comma-delimited string of shard names for multiple shards when using
+    // an implicit router
+    String shardNames = "shard1";
+    setupSharedCollectionWithShardNames(collectionName, maxShardsPerNode, numReplicas, shardNames);
+
+    // get the leader replica and follower replicas
+    DocCollection collection = cloudClient.getZkStateReader().getClusterState().getCollection(collectionName);
+    Replica shardLeaderReplica = collection.getLeader("shard1");
+    Replica followerReplica = null;
+    for (Replica repl : collection.getSlice("shard1").getReplicas()) {
+      if (repl.getName() != shardLeaderReplica.getName()) {
+        followerReplica = repl;
+        break;
+      }
+    }
+
+    
+   JettySolrRunner shardLeaderSolrRunner = null;
+   for (JettySolrRunner solrRunner : cluster.getJettySolrRunners()) {
+     if(solrRunner.getNodeName().equals(shardLeaderReplica.getNodeName())){
+       shardLeaderSolrRunner = solrRunner;
+       break;
+     }
+   }
+   List<String> progress = new ArrayList<>();
+
+   if(interleaver != null) {
+     configureTestSharedConcurrencyControllerForNode(shardLeaderSolrRunner, progress, interleaver);
+   } else {
+     configureTestSharedConcurrencyControllerForNode(shardLeaderSolrRunner, progress);
+   }
+
+    AtomicInteger totalDocs= new AtomicInteger(0);
+    log.info("numIndexingThreads=" + numIndexingThreads);
+    Thread[] indexingThreads = new Thread[numIndexingThreads];
+    ConcurrentLinkedQueue<String> indexingErrors = new ConcurrentLinkedQueue<>();
+    for (int i = 0; i < numIndexingThreads; i++) {
+      indexingThreads[i] = new Thread(() -> {
+        try {
+          // index between 1 to maxDocsPerThread docs
+          int numDocs = new Random().nextInt(maxDocsPerThread) + 1;
+          log.info("numDocs=" + numDocs);
+          UpdateRequest updateReq = new UpdateRequest();
+          for (int j = 0; j < numDocs; j++) {
+            int docId = totalDocs.incrementAndGet();
+            updateReq.add("id", Integer.toString(docId));
+          }
+          updateReq.commit(cloudClient, collectionName);
+        } catch (Exception ex) {
+          indexingErrors.add(ex.getMessage());
+        }
+      });
+    }
+
+    for (int i = 0; i < numIndexingThreads; i++) {
+      indexingThreads[i].start();
+    }
+
+    for (int i = 0; i < numIndexingThreads; i++) {
+      indexingThreads[i].join();
+    }
+
+    log.info("totalDocs=" + totalDocs.intValue());
+
+    assertTrue(indexingErrors.toString(), indexingErrors.isEmpty());
+
+    if(interleaver != null) {
+      assertNull(interleaver.error, interleaver.error);
+    }
+
+    assertFalse("no progress recorded", progress.isEmpty());
+
+    log.info(progress.toString());
+
+    assertCriticalSections(progress);
+
+    // verify the update wasn't forwarded to the follower and it didn't commit by checking the core
+    // this gives us confidence that the subsequent query we do triggers the pull
+    CoreContainer replicaCC = getCoreContainer(followerReplica.getNodeName());
+    SolrCore core = null;
+    SolrClient followerDirectClient = null;
+    SolrClient leaderDirectClient = null;
+    try {
+      core = replicaCC.getCore(followerReplica.getCoreName());
+      // the follower should only have the default segments file
+      assertEquals(1, core.getDeletionPolicy().getLatestCommit().getFileNames().size());
+
+      // query the leader directly to verify it should have the document
+      leaderDirectClient = getHttpSolrClient(shardLeaderReplica.getBaseUrl() + "/" + shardLeaderReplica.getCoreName());
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params
+          .set("q", "*:*")
+          .set("distrib", "false");
+      QueryResponse resp = leaderDirectClient.query(params);
+      assertEquals(totalDocs.intValue(), resp.getResults().getNumFound());
+
+      // we want to wait until the pull completes so set up a count down latch for the follower's
+      // core that we'll wait until pull finishes for
+      CountDownLatch latch = new CountDownLatch(1);
+      Map<String, CountDownLatch> asyncPullTasks = solrProcessesTaskTracker.get(followerReplica.getNodeName());
+      asyncPullTasks.put(followerReplica.getCoreName(), latch);
+
+      // query the follower directly to trigger the pull, this query should yield no results
+      // as it returns immediately 
+      followerDirectClient = getHttpSolrClient(followerReplica.getBaseUrl() + "/" + followerReplica.getCoreName());
+      resp = followerDirectClient.query(params);
+      assertEquals(0, resp.getResults().getNumFound());
+
+      // wait until pull is finished
+      assertTrue(latch.await(120, TimeUnit.SECONDS));
+
+      // do another query to verify we've pulled everything
+      resp = followerDirectClient.query(params);
+
+      // verify we pulled
+      assertTrue(core.getDeletionPolicy().getLatestCommit().getFileNames().size() > 1);
+
+      // verify the document is present
+      assertEquals(totalDocs.intValue(), resp.getResults().getNumFound());
+    } finally {
+      if (leaderDirectClient != null) {
+        leaderDirectClient.close();
+      }
+      if (followerDirectClient != null) {
+        followerDirectClient.close();
+      }
+      if (core != null) {
+        core.close();
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+      // clean up the shared store. The temp dir should clean up itself after the test class finishes
+      FileUtils.cleanDirectory(sharedStoreRootPath.toFile());
+    }
+  }
+
+  private void assertCriticalSections(List<String> progress) {
+    String currentThreadId = null;
+    SharedCoreStage currentStage = null;
+    String prevThreadId = null;
+    SharedCoreStage prevStage = null;
+    int activeIndexers = 0; // number of threads that have started indexing and not finished 
+    int activeBlobPushers = 0; // number of threads that are actively pushing at any given time
+    for (String p : progress) {
+      String[] parts = p.split("\\.");
+      currentThreadId = parts[0];
+      currentStage = SharedCoreStage.valueOf(parts[1]);
+      if (currentStage == SharedCoreStage.LocalIndexingStarted) {
+        activeIndexers++;
+      } else if (currentStage == SharedCoreStage.BlobPushStarted) {
+        activeBlobPushers++;
+      } else if (currentStage == SharedCoreStage.BlobPushFinished) {
+        // both blob pushing and indexing finish at this stage 
+        activeBlobPushers--;
+        activeIndexers--;
+      }
+
+      // making sure no other activity takes place during pull
+      if (prevStage == SharedCoreStage.BlobPullStarted) {
+        assertTrue("Pull critical section breached, currentStage=" + p, prevThreadId==currentThreadId && currentStage == SharedCoreStage.BlobPullFinished);
+      }
+
+      // making sure indexing is not disrupted by a pull from blob
+      assertFalse("Indexing breached by a pull, currentStage=" + p,
+          activeIndexers > 0 &&
+              (currentStage == SharedCoreStage.BlobPullStarted || currentStage == SharedCoreStage.BlobPullFinished));
+
+      // making sure push to blob are not disrupted by another push to blob
+      assertFalse("Blob push breached by another blob push,  currentStage=" + p, activeBlobPushers > 1);
+
+      prevThreadId = currentThreadId;
+      prevStage = currentStage;
+    }
+  }
+
+  private Map<String, CountDownLatch> configureTestBlobProcessForNode(JettySolrRunner runner) {
+    Map<String, CountDownLatch> asyncPullTracker = new HashMap<>();
+
+    CorePullerFeeder cpf = new CorePullerFeeder(runner.getCoreContainer()) {
+      @Override
+      protected CorePullTask.PullCoreCallback getCorePullTaskCallback() {
+        return new CorePullTask.PullCoreCallback() {
+          @Override
+          public void finishedPull(CorePullTask pullTask, BlobCoreMetadata blobMetadata, CoreSyncStatus status,
+                                   String message) throws InterruptedException {
+            CountDownLatch latch = asyncPullTracker.get(pullTask.getPullCoreInfo().getCoreName());
+            if(latch != null) {
+              latch.countDown();
+            }
+          }
+        };
+      }
+    };
+
+    BlobProcessUtil testUtil = new BlobProcessUtil(runner.getCoreContainer(), cpf);
+    setupTestBlobProcessUtilForNode(testUtil, runner);
+    return asyncPullTracker;
+  }
+
+  private void configureTestSharedConcurrencyControllerForNode(JettySolrRunner runner, List<String> progress) {
+    SharedCoreConcurrencyController concurrencyController = new SharedCoreConcurrencyController(runner.getCoreContainer()) {
+      Object recorderLock = new Object();
+      @Override
+      public void recordState(String collectionName, String shardName, String coreName, SharedCoreStage stage) {
+        super.recordState(collectionName, shardName, coreName, stage);
+
+        synchronized (recorderLock) {
+          progress.add(Thread.currentThread().getId() + "." + stage.name());
+        }
+
+      }
+    };
+    setupTestSharedConcurrencyControllerForNode(concurrencyController, runner);
+  }
+
+  private void configureTestSharedConcurrencyControllerForNode(JettySolrRunner runner, List<String> progress, IndexingThreadInterleaver interleaver) {
+    SharedCoreConcurrencyController concurrencyController = new SharedCoreConcurrencyController(runner.getCoreContainer()) {
+      Object recorderLock = new Object();
+      long firstThreadId = -1;
+      long secondThreadId = -1;
+      boolean firstAchieved = false;
+      boolean secondAchieved = false;
+      boolean secondAlreadyWaitingToStart = false;
+      CountDownLatch secondWaitingToStart = new CountDownLatch(1);
+      CountDownLatch firstWaitingForSecondToCompleteItsDesiredState = new CountDownLatch(1);
+      int timeoutSeconds = 10;
+      @Override
+      public void recordState(String collectionName, String shardName, String coreName, SharedCoreStage stage) {
+        super.recordState(collectionName, shardName, coreName, stage);
+
+        CountDownLatch latchToWait = null;
+        boolean latchShouldSucceed = true;
+        String succeedError = null;
+        String failureError = null;
+        String interruptionError = null;
+        // compute stalling decision under synchronized block and then stall(if needed) outside of synchronized block
+        synchronized (recorderLock) {
+          progress.add(Thread.currentThread().getId() + "." + stage.name());
+
+          if (interleaver.error != null) {
+            // already errored out
+            return;
+          }
+
+          long currentThreadId = Thread.currentThread().getId();
+          if (stage == SharedCoreStage.IndexingBatchReceived) {
+            // identify first and second thread and initialize their thread ids
+            // first is whichever comes first
+            if (firstThreadId == -1) {
+              firstThreadId = currentThreadId;
+            } else {
+              if(secondThreadId != -1) {
+                interleaver.error = "why there is a third indexing thread?";
+                return;
+              }
+              secondThreadId = currentThreadId;
+            }
+          }
+
+          if (firstAchieved && secondAchieved) {
+            // nothing left
+            return;
+          } else if (!secondAlreadyWaitingToStart && currentThreadId == secondThreadId) {
+            secondAlreadyWaitingToStart = true;
+            latchToWait = secondWaitingToStart;
+            latchShouldSucceed = true;
+            failureError = "second failed to wait for first to reach at desired state";
+            interruptionError = "second was interrupted while waiting for first to reach at desired state";
+          } else if (currentThreadId == firstThreadId && stage == interleaver.firstStage) {
+            // first reached to desired stage, now release second
+            secondWaitingToStart.countDown();
+            latchToWait = firstWaitingForSecondToCompleteItsDesiredState;
+            latchShouldSucceed = interleaver.isPossible;
+            succeedError = "first was able to wait for second to reach at desired state, even when it was not possible";
+            failureError = "first failed to wait for second to reach at desired state";
+            interruptionError = "first was interrupted while waiting for second to reach at desired state";
+            firstAchieved = true;
+          } else if (currentThreadId == secondThreadId && stage == interleaver.secondStage) {
+            // second also reached desired state, we are done
+            if (!firstAchieved) {
+              interleaver.error = "how come second reached its desired without first reaching its";
+            }
+            firstWaitingForSecondToCompleteItsDesiredState.countDown();
+            secondAchieved = true;
+          }
+        }
+
+        if (latchToWait != null) {
+          // need to stall this stage on the provided latch
+          try {
+            boolean successfulWait = latchToWait.await(timeoutSeconds, TimeUnit.SECONDS);
+            if (successfulWait && !latchShouldSucceed) {
+              interleaver.error = succeedError;
+            } else if (!successfulWait && latchShouldSucceed) {
+              interleaver.error = failureError;
+            }
+          } catch (InterruptedException iex) {
+            interleaver.error = interruptionError;
+          }
+        }
+      }
+    };
+    setupTestSharedConcurrencyControllerForNode(concurrencyController, runner);
+  }
+
+  /**
+   * 1. Second thread is stalled after {@link SharedCoreStage#IndexingBatchReceived} 
+   * 2. First thread is run up to firstStage then stalled
+   * 3. Second thread is resumed and run up to secondStage
+   * 4. First thread is resumed
+   */
+  private static class IndexingThreadInterleaver {
+    private final SharedCoreStage firstStage;
+    /**
+     * Let second indexing thread run up to this stage before resuming stalled first thread
+     */
+    private final SharedCoreStage secondStage;
+    private final boolean isPossible;
+    private String error = null;
+
+    private IndexingThreadInterleaver(SharedCoreStage firstStage,
+                                      SharedCoreStage secondStage,
+                                      boolean isPossible) {
+      this.firstStage = firstStage;
+      this.secondStage = secondStage;
+      this.isPossible = isPossible;
+    }
+  }
+}
diff --git a/solr/core/src/test/org/apache/solr/store/shared/SimpleSharedStoreEndToEndPullTest.java b/solr/core/src/test/org/apache/solr/store/shared/SimpleSharedStoreEndToEndPullTest.java
index fcd8459..e735315 100644
--- a/solr/core/src/test/org/apache/solr/store/shared/SimpleSharedStoreEndToEndPullTest.java
+++ b/solr/core/src/test/org/apache/solr/store/shared/SimpleSharedStoreEndToEndPullTest.java
@@ -169,21 +169,23 @@ public class SimpleSharedStoreEndToEndPullTest extends SolrCloudSharedStoreTestC
   
   private Map<String, CountDownLatch> configureTestBlobProcessForNode(JettySolrRunner runner) {
     Map<String, CountDownLatch> asyncPullTracker = new HashMap<>();
-    
-    CorePullerFeeder cpf = new CorePullerFeeder(runner.getCoreContainer()) {  
+
+    CorePullerFeeder cpf = new CorePullerFeeder(runner.getCoreContainer()) {
       @Override
       protected CorePullTask.PullCoreCallback getCorePullTaskCallback() {
         return new PullCoreCallback() {
           @Override
           public void finishedPull(CorePullTask pullTask, BlobCoreMetadata blobMetadata, CoreSyncStatus status,
-              String message) throws InterruptedException {
+                                   String message) throws InterruptedException {
             CountDownLatch latch = asyncPullTracker.get(pullTask.getPullCoreInfo().getCoreName());
-            latch.countDown();
+            if (latch != null) {
+              latch.countDown();
+            }
           }
         };
       }
     };
-    
+
     BlobProcessUtil testUtil = new BlobProcessUtil(runner.getCoreContainer(), cpf);
     setupTestBlobProcessUtilForNode(testUtil, runner);
     return asyncPullTracker;
diff --git a/solr/core/src/test/org/apache/solr/store/shared/SimpleSharedStoreEndToEndPushTest.java b/solr/core/src/test/org/apache/solr/store/shared/SimpleSharedStoreEndToEndPushTest.java
index eb04529..8e44125 100644
--- a/solr/core/src/test/org/apache/solr/store/shared/SimpleSharedStoreEndToEndPushTest.java
+++ b/solr/core/src/test/org/apache/solr/store/shared/SimpleSharedStoreEndToEndPushTest.java
@@ -21,7 +21,6 @@ import java.util.Map;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.solr.client.solrj.SolrQuery;
-import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
@@ -29,13 +28,13 @@ import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.store.blob.client.BlobCoreMetadata;
 import org.apache.solr.store.blob.client.CoreStorageClient;
 import org.apache.solr.store.blob.util.BlobStoreUtils;
 import org.apache.solr.store.shared.metadata.SharedShardMetadataController;
+import org.apache.solr.store.shared.metadata.SharedShardMetadataController.SharedShardVersionMetadata;
 import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -100,13 +99,11 @@ public class SimpleSharedStoreEndToEndPushTest extends SolrCloudSharedStoreTestC
     CoreContainer leaderCC = getCoreContainer(shardLeaderReplica.getNodeName());
     SharedShardMetadataController metadataController = leaderCC.getSharedStoreManager().getSharedShardMetadataController();
     try (SolrCore leaderCore = leaderCC.getCore(shardLeaderReplica.getCoreName())) {
-      VersionedData data = metadataController.readMetadataValue(collectionName, "shard1", false);
-      Map<String, Object> readData = (Map<String, Object>) Utils.fromJSON(data.getData());
-      String metadataSuffix = (String) readData.get(SharedShardMetadataController.SUFFIX_NODE_NAME);
+      SharedShardVersionMetadata shardMetadata = metadataController.readMetadataValue(collectionName, "shard1");
       Map<String, Object> props = collection.getSlice("shard1").getProperties();
       
       String sharedShardName = (String) props.get(ZkStateReader.SHARED_SHARD_NAME);
-      String blobCoreMetadataName = BlobStoreUtils.buildBlobStoreMetadataName(metadataSuffix);
+      String blobCoreMetadataName = BlobStoreUtils.buildBlobStoreMetadataName(shardMetadata.getMetadataSuffix());
       
       // verify that we pushed the core to blob
       assertTrue(storageClient.coreMetadataExists(sharedShardName, blobCoreMetadataName));
diff --git a/solr/core/src/test/org/apache/solr/store/shared/SolrCloudSharedStoreTestCase.java b/solr/core/src/test/org/apache/solr/store/shared/SolrCloudSharedStoreTestCase.java
index 738ff3f..5a40270 100644
--- a/solr/core/src/test/org/apache/solr/store/shared/SolrCloudSharedStoreTestCase.java
+++ b/solr/core/src/test/org/apache/solr/store/shared/SolrCloudSharedStoreTestCase.java
@@ -84,7 +84,15 @@ public class SolrCloudSharedStoreTestCase extends SolrCloudTestCase {
     SharedStoreManager manager = solrRunner.getCoreContainer().getSharedStoreManager();
     manager.initBlobProcessUtil(testBlobProcessUtil);
   }
-  
+
+  /**
+   * Configures the Solr process with the given {@link SharedCoreConcurrencyController}
+   */
+  protected static void setupTestSharedConcurrencyControllerForNode(SharedCoreConcurrencyController concurrencyController, JettySolrRunner solrRunner) {
+    SharedStoreManager manager = solrRunner.getCoreContainer().getSharedStoreManager();
+    manager.initConcurrencyController(concurrencyController);
+  }
+
   /**
    * Return a new CoreStorageClient that writes to the specified sharedStoreRootPath and blobDirectoryName
    * The sharedStoreRootPath should already exist when passed to this method
diff --git a/solr/core/src/test/org/apache/solr/store/shared/metadata/SharedShardMetadataControllerTest.java b/solr/core/src/test/org/apache/solr/store/shared/metadata/SharedShardMetadataControllerTest.java
index 845e780..6e67b5c 100644
--- a/solr/core/src/test/org/apache/solr/store/shared/metadata/SharedShardMetadataControllerTest.java
+++ b/solr/core/src/test/org/apache/solr/store/shared/metadata/SharedShardMetadataControllerTest.java
@@ -16,25 +16,20 @@
  */
 package org.apache.solr.store.shared.metadata;
 
-import java.util.Arrays;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.solr.client.solrj.cloud.DistribStateManager;
 import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
-import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.util.Utils;
-import org.apache.zookeeper.CreateMode;
+import org.apache.solr.store.shared.metadata.SharedShardMetadataController.SharedShardVersionMetadata;
 import org.apache.zookeeper.data.Stat;
 import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 /**
  * Tests for {@link SharedShardMetadataController}
@@ -152,18 +147,17 @@ public class SharedShardMetadataControllerTest extends SolrCloudTestCase {
     // try a conditional update that should pass and return a VersionedData instance with
     // the right written value and incremented version
     testMetadataValue = "testValue2";
-    VersionedData versionedData = shardMetadataController.updateMetadataValueWithVersion(TEST_COLLECTION_NAME, TEST_SHARD_NAME, 
+    SharedShardVersionMetadata shardMetadata = shardMetadataController.updateMetadataValueWithVersion(TEST_COLLECTION_NAME, TEST_SHARD_NAME, 
         testMetadataValue, version);
     
     
     // the version monotonically increases, increments on updates. We should expect only one update
-    readData = (Map<String, Object>) Utils.fromJSON(versionedData.getData());
-    assertEquals(testMetadataValue, readData.get(SharedShardMetadataController.SUFFIX_NODE_NAME));
-    assertEquals(version + 1, versionedData.getVersion());
+    assertEquals(testMetadataValue, shardMetadata.getMetadataSuffix());
+    assertEquals(version + 1, shardMetadata.getVersion());
     
     // try a conditional update that fails with the wrong version number
     try {
-      versionedData = shardMetadataController.updateMetadataValueWithVersion(TEST_COLLECTION_NAME, TEST_SHARD_NAME, 
+      shardMetadataController.updateMetadataValueWithVersion(TEST_COLLECTION_NAME, TEST_SHARD_NAME,
           testMetadataValue, 100);
       fail();
     } catch (SolrException ex) {
@@ -187,10 +181,9 @@ public class SharedShardMetadataControllerTest extends SolrCloudTestCase {
     shardMetadataController.updateMetadataValueWithVersion(TEST_COLLECTION_NAME, TEST_SHARD_NAME, 
         testMetadataValue, -1);
     
-    VersionedData versionedData = shardMetadataController.readMetadataValue(TEST_COLLECTION_NAME, TEST_SHARD_NAME);
+    SharedShardVersionMetadata shardMetadata = shardMetadataController.readMetadataValue(TEST_COLLECTION_NAME, TEST_SHARD_NAME);
     
-    Map<String, Object> readData = (Map<String, Object>) Utils.fromJSON(versionedData.getData());
-    assertEquals(testMetadataValue, readData.get(SharedShardMetadataController.SUFFIX_NODE_NAME));
+    assertEquals(testMetadataValue, shardMetadata.getMetadataSuffix());
   }
   
   /**
@@ -218,142 +211,4 @@ public class SharedShardMetadataControllerTest extends SolrCloudTestCase {
       assertTrue(t instanceof NoSuchElementException);
     }
   }
-  
-  /**
-   * Test that successful conditional updates caches in VersionedData in memory
-   */
-  @Test
-  public void testSuccessfulUpdateCaches() throws Exception {
-    // reset cache 
-    shardMetadataController.getVersionedDataCache().clear();
-    
-    shardMetadataController.ensureMetadataNodeExists(TEST_COLLECTION_NAME, TEST_SHARD_NAME);
-    assertTrue(cluster.getZkClient().exists(metadataNodePath, false));
-    
-    String testMetadataValue = "testValue1";
-    
-    // setup with an initial value by writing
-    VersionedData newData = shardMetadataController.updateMetadataValueWithVersion(TEST_COLLECTION_NAME, TEST_SHARD_NAME, 
-        testMetadataValue, -1);
-    String cacheKey = shardMetadataController.getCacheKey(TEST_COLLECTION_NAME, TEST_SHARD_NAME);
-    VersionedData cachedData = shardMetadataController.getVersionedDataCache().get(cacheKey);
-    // verify the VersionedData returned is the same as the one cached
-    assertNotNull(cachedData);
-    assertEquals(newData.getVersion(), cachedData.getVersion());
-    assertTrue(Arrays.equals(newData.getData(), cachedData.getData()));
-  }
-  
-  /**
-   * Test that if an entry a VersionedData exists for a given collection and shard, and a conditional
-   * update fails, we remove the existing cached entry
-   */
-  @Test
-  public void testUnsuccessfulUpdateRemovesCacheEntry() throws Exception {
-    // reset cache 
-    shardMetadataController.getVersionedDataCache().clear();
-    
-    shardMetadataController.ensureMetadataNodeExists(TEST_COLLECTION_NAME, TEST_SHARD_NAME);
-    assertTrue(cluster.getZkClient().exists(metadataNodePath, false));
-    
-    String testMetadataValue = "testValue1";
-    
-    // setup with an initial value by writing
-    shardMetadataController.updateMetadataValueWithVersion(TEST_COLLECTION_NAME, TEST_SHARD_NAME, 
-        testMetadataValue, -1);
-    String cacheKey = shardMetadataController.getCacheKey(TEST_COLLECTION_NAME, TEST_SHARD_NAME);
-    // verify our entry is cached now
-    VersionedData cachedData = shardMetadataController.getVersionedDataCache().get(cacheKey);
-    assertNotNull(cachedData);
-    
-    // write a value that should fail with a bad version value
-    try {
-      shardMetadataController.updateMetadataValueWithVersion(TEST_COLLECTION_NAME, TEST_SHARD_NAME, 
-          testMetadataValue, 100);
-      fail("Updating metadata value with the wrong version should have failed");
-    } catch (Exception ex) {
-      
-    }
-    
-    // verify the cached value was removed
-    cachedData = shardMetadataController.getVersionedDataCache().get(cacheKey);
-    assertNull(cachedData);
-  }
-  
-  /**
-   * Test that if an entry a VersionedData exists for a given collection and shard, it is returned
-   * on read if specified so.
-   */
-  @Test
-  public void testReadReturnsCachedEntry() throws Exception {
-    // set up mocks to verify we read from the cache
-    ConcurrentHashMap<String, VersionedData> cacheSpy = Mockito.spy(new ConcurrentHashMap<String, VersionedData>());
-    SolrCloudManager cloudManagerMock = Mockito.mock(SolrCloudManager.class);
-    DistribStateManager distribStateManagerMock = Mockito.mock(DistribStateManager.class);
-    Mockito.when(cloudManagerMock.getDistribStateManager()).thenReturn(distribStateManagerMock);
-    
-    SharedShardMetadataController shardMetadataControllerWithMock = 
-        new SharedShardMetadataController(cloudManagerMock, cacheSpy); 
-    
-    // set up some fake data
-    String cacheKey = shardMetadataControllerWithMock.getCacheKey(TEST_COLLECTION_NAME, TEST_SHARD_NAME);
-    VersionedData mockData = new VersionedData(1, new byte[] {}, CreateMode.EPHEMERAL, "test");
-    shardMetadataControllerWithMock.getVersionedDataCache().put(cacheKey, mockData);
-    
-    // read and verify we pull from the cache if the value exists
-    shardMetadataControllerWithMock.readMetadataValue(TEST_COLLECTION_NAME, TEST_SHARD_NAME, true);
-    
-    // verify we pull from cache and not zookeeper
-    Mockito.verify(cacheSpy).get(shardMetadataControllerWithMock.getCacheKey(TEST_COLLECTION_NAME, TEST_SHARD_NAME));
-    Mockito.verifyZeroInteractions(distribStateManagerMock);
-  }
-  
-  /**
-   * Test that if readFromCache is specified when reading the metadata node and an entry in the
-   * cache for the given collection and shard doesn't exist, we read directly from zookeeper
-   */
-  @Test
-  public void testReadRetrievesFromZooKeeper() throws Exception {
-    ConcurrentHashMap<String, VersionedData> cacheSpy = Mockito.spy(new ConcurrentHashMap<String, VersionedData>());
-    SolrCloudManager cloudManagerMock = Mockito.mock(SolrCloudManager.class);
-    DistribStateManager distribStateManagerMock = Mockito.mock(DistribStateManager.class);
-    
-    Mockito.when(cloudManagerMock.getDistribStateManager()).thenReturn(distribStateManagerMock);
-    Mockito.when(distribStateManagerMock.getData(Mockito.any(), Mockito.any())).thenReturn(null);
-    
-    SharedShardMetadataController shardMetadataControllerWithMock = 
-        new SharedShardMetadataController(cloudManagerMock, cacheSpy); 
-    
-    // read and verify we pull from the cache if the value exists
-    shardMetadataControllerWithMock.readMetadataValue(TEST_COLLECTION_NAME, TEST_SHARD_NAME, true);
-    
-    // verify we're reading from zookeeper and not cache (even though there's no real data there)
-    Mockito.verify(distribStateManagerMock, Mockito.times(1)).getData(Mockito.any(), Mockito.any());
-  }
-  
-  /**
-   * Verify we can clear the cached version for some collection and shard
-   */
-  public void testClearCachedVersion() throws Exception {
-    // set up mocks to verify we read from the cache
-    ConcurrentHashMap<String, VersionedData> cacheSpy = Mockito.spy(new ConcurrentHashMap<String, VersionedData>());
-    SolrCloudManager cloudManagerMock = Mockito.mock(SolrCloudManager.class);
-    DistribStateManager distribStateManagerMock = Mockito.mock(DistribStateManager.class);
-    
-    Mockito.when(cloudManagerMock.getDistribStateManager()).thenReturn(distribStateManagerMock);
-    Mockito.when(distribStateManagerMock.getData(Mockito.any(), Mockito.any())).thenReturn(null);
-    
-    SharedShardMetadataController shardMetadataControllerWithMock = 
-        new SharedShardMetadataController(cloudManagerMock, cacheSpy);
-    
-    // set up some fake data
-    String cacheKey = shardMetadataControllerWithMock.getCacheKey(TEST_COLLECTION_NAME, TEST_SHARD_NAME);
-    VersionedData mockData = new VersionedData(1, new byte[] {}, CreateMode.EPHEMERAL, "test");
-    shardMetadataControllerWithMock.getVersionedDataCache().put(cacheKey, mockData);
-    
-    assertEquals(1, shardMetadataControllerWithMock.getVersionedDataCache().size());
-    // try to clean and verify 
-    shardMetadataControllerWithMock.clearCachedVersion(TEST_COLLECTION_NAME, TEST_SHARD_NAME);
-    Mockito.verify(cacheSpy).remove(Mockito.any());
-    assertEquals(0, shardMetadataControllerWithMock.getVersionedDataCache().size());
-  }
 }
\ No newline at end of file