You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ds...@apache.org on 2020/05/18 19:19:10 UTC

[lucene-solr] branch jira/SOLR-13101 updated: SOLR-13101: SHARED replica's distributed indexing (#1430)

This is an automated email from the ASF dual-hosted git repository.

dsmiley pushed a commit to branch jira/SOLR-13101
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/jira/SOLR-13101 by this push:
     new ad3f0a3  SOLR-13101: SHARED replica's distributed indexing (#1430)
ad3f0a3 is described below

commit ad3f0a30181aa4d7ba099ee67fd45fa98cf9c2c0
Author: Bilal Waheed <23...@users.noreply.github.com>
AuthorDate: Mon May 18 12:19:01 2020 -0700

    SOLR-13101: SHARED replica's distributed indexing (#1430)
    
    * SOLR-13101: SHARED replica's distributed indexing
    The basic purpose of this change was to refactor most of the SHARED replica logic out of DistributedZkUpdateProcessor. Along with that refactoring I came across couple of functional issues in the logic that have been fixed too.
    
    Functional fixes:
    -If a replica looses its leadership in the middle of indexing batch, it should still push its changes to the shared store.
    -SHARED replica does not need to process soft commits and does not need to broadcast hard commits to all the shards of a collection.
    -Previously we would pull from the shared store even when the doc being added/deleted is not meant for the current core/shard.
    -Previously DistributedZkUpdateProcessor#processDelete was running its pull logic even before the request had been setup (DistributedZkUpdateProcessor#setupRequest).
    -DistributedZkUpdateProcessorTest have been deleted in favor of new SharedStoreDistributedIndexingTest SharedCoreIndexingBatchProcessorTest.
    
    Refactoring:
    -Most of DistributedZkUpdateProcessor's SHARED replica logic is moved into a new SharedCoreIndexingBatchProcessor. The purpose of this class is to pull from the shared store at the start of an indexing batch (if the core is stale) and push to the shared store at the end of a successfully committed indexing batch.
    -CoreUpdateTracker has been deleted and its only persistShardIndexToSharedStore method has been renamed to pushCoreToSharedStore and moved to CorePusher.
    -BlobStoreUtilsTest#syncLocalCoreWithSharedStore is renamed to pullCoreFromSharedStore and moved into a new CorePuller class and the tests to CorePullerTests.
    -I did rename phrase "blob store" to "shared store" at some places in the changed classes. But it was not meant to be an exhaustive attempt.
    
    * -Throw error for pull request for an unknown.
    -Log warning when indexing a non-active shard.
    
    * Address CR feedback.
---
 .../solr/handler/admin/RequestApplyUpdatesOp.java  |  19 +-
 .../solr/store/blob/process/CorePullTask.java      |   1 +
 .../solr/store/blob/process/CorePullTracker.java   |  53 ++--
 .../apache/solr/store/blob/process/CorePuller.java | 130 +++++++++
 .../apache/solr/store/blob/process/CorePusher.java |  92 +++---
 .../solr/store/blob/process/CoreSyncFeeder.java    |   5 +-
 .../solr/store/blob/process/CoreUpdateTracker.java |  98 -------
 .../solr/store/blob/util/BlobStoreUtils.java       | 111 +------
 .../shared/SharedCoreConcurrencyController.java    |  25 +-
 .../shared/SharedCoreIndexingBatchProcessor.java   | 305 ++++++++++++++++++++
 .../processor/DistributedZkUpdateProcessor.java    | 318 +++++++--------------
 .../shared-distrib-indexing/conf/schema.xml        |  29 ++
 .../shared-distrib-indexing/conf/solrconfig.xml    |  57 ++++
 .../CorePullerTest.java}                           | 173 +++++------
 .../solr/store/blob/util/BlobStoreUtilsTest.java   | 211 +-------------
 .../store/shared/SharedCoreConcurrencyTest.java    |   2 +-
 .../SharedCoreIndexingBatchProcessorTest.java      | 278 ++++++++++++++++++
 .../shared/SharedStoreDistributedIndexingTest.java | 289 +++++++++++++++++++
 .../store/shared/SolrCloudSharedStoreTestCase.java |   6 +-
 .../DistributedZkUpdateProcessorTest.java          | 287 -------------------
 20 files changed, 1370 insertions(+), 1119 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/handler/admin/RequestApplyUpdatesOp.java b/solr/core/src/java/org/apache/solr/handler/admin/RequestApplyUpdatesOp.java
index 0477e4a..5c3c61c 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/RequestApplyUpdatesOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/RequestApplyUpdatesOp.java
@@ -22,15 +22,17 @@ import java.util.Locale;
 import java.util.concurrent.Future;
 
 import org.apache.solr.cloud.CloudDescriptor;
-import org.apache.solr.cloud.api.collections.Assign;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.store.blob.client.BlobCoreMetadataBuilder;
-import org.apache.solr.store.blob.process.CoreUpdateTracker;
+import org.apache.solr.store.blob.process.CorePusher;
 import org.apache.solr.store.shared.SharedCoreConcurrencyController;
 import org.apache.solr.store.shared.metadata.SharedShardMetadataController;
 import org.apache.solr.store.shared.metadata.SharedShardMetadataController.SharedShardVersionMetadata;
@@ -88,7 +90,6 @@ class RequestApplyUpdatesOp implements CoreAdminHandler.CoreAdminOp {
     CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
     if (cloudDesc.getReplicaType().equals(Replica.Type.SHARED)) {
       CoreContainer cc = core.getCoreContainer();
-      CoreUpdateTracker sharedCoreTracker = new CoreUpdateTracker(cc);
 
       String collectionName = cloudDesc.getCollectionName();
       String shardName = cloudDesc.getShardId();
@@ -113,15 +114,13 @@ class RequestApplyUpdatesOp implements CoreAdminHandler.CoreAdminOp {
       // sync local cache with zk's default information i.e. equivalent of no-op pull
       // this syncing is necessary for the zk conditional update to succeed at the end of core push
       SharedCoreConcurrencyController concurrencyController = cc.getSharedStoreManager().getSharedCoreConcurrencyController();
-      String sharedBlobName = Assign.buildSharedShardName(collectionName, shardName);
+      ClusterState clusterState = core.getCoreContainer().getZkController().getClusterState();
+      DocCollection collection = clusterState.getCollection(collectionName);
+      String sharedShardName = (String) collection.getSlicesMap().get(shardName).get(ZkStateReader.SHARED_SHARD_NAME);
       concurrencyController.updateCoreVersionMetadata(collectionName, shardName, coreName,
-          shardVersionMetadata, BlobCoreMetadataBuilder.buildEmptyCoreMetadata(sharedBlobName));
+          shardVersionMetadata, BlobCoreMetadataBuilder.buildEmptyCoreMetadata(sharedShardName));
 
-      sharedCoreTracker.persistShardIndexToSharedStore(
-          cc.getZkController().zkStateReader.getClusterState(),
-          collectionName,
-          shardName,
-          coreName);
+      new CorePusher().pushCoreToSharedStore(core, sharedShardName);
     }
   }
 }
diff --git a/solr/core/src/java/org/apache/solr/store/blob/process/CorePullTask.java b/solr/core/src/java/org/apache/solr/store/blob/process/CorePullTask.java
index a5aeed5..581f0d2 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/process/CorePullTask.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/process/CorePullTask.java
@@ -215,6 +215,7 @@ public class CorePullTask implements DeduplicatingList.Deduplicatable<String> {
       }
     }
 
+    // TODO: Consider using CorePuller here. It will require some refactoring in both the places.
     SharedCoreConcurrencyController concurrencyController = storeManager.getSharedCoreConcurrencyController();
     CoreSyncStatus syncStatus = CoreSyncStatus.FAILURE;
     // Auxiliary information related to pull outcome. It can be metadata resolver message which can be null or exception detail in case of failure 
diff --git a/solr/core/src/java/org/apache/solr/store/blob/process/CorePullTracker.java b/solr/core/src/java/org/apache/solr/store/blob/process/CorePullTracker.java
index f038cfa..644a85a 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/process/CorePullTracker.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/process/CorePullTracker.java
@@ -34,7 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Tracks cores that are being queried and if necessary enqueues them for pull from blob store
+ * Tracks cores that are being queried and if necessary enqueues them for pull from the shared store
  */
 public class CorePullTracker {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -78,29 +78,34 @@ public class CorePullTracker {
     String shardName = core.getCoreDescriptor().getCloudDescriptor().getShardId();
     DocCollection collection = cores.getZkController().getClusterState().getCollection(collectionName);
     Slice shard = collection.getSlicesMap().get(shardName);
-    if (shard != null) {
-      try {
-        if (!collection.getActiveSlices().contains(shard)) {
-          // unclear if there are side effects but logging for now
-          log.warn("Enqueueing a pull for shard " + shardName + " that is inactive!");
-        }
-        log.info("Enqueue a pull for collection=" + collectionName + " shard=" + shardName + " coreName=" + coreName);
-
-        String sharedShardName = (String) shard.get(ZkStateReader.SHARED_SHARD_NAME);
-
-        PushPullData pushPullData = new PushPullData.Builder()
-            .setCollectionName(collectionName)
-            .setShardName(shardName)
-            .setCoreName(coreName)
-            .setSharedStoreName(sharedShardName)
-            .build();
-
-        enqueueForPullIfNecessary(requestPath, pushPullData);
-
-      } catch (Exception ex) {
-        // wrap every thrown exception in a solr exception
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error trying to push to blob store", ex);
+    if (shard == null) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Pull request received for an unknown shard," +
+          " collection=" + collectionName + " shard=" + shardName + " coreName=" + coreName);
+    }
+
+    try {
+      if (!Slice.State.ACTIVE.equals(shard.getState())) {
+        // unclear what this means, but logging a warning for now
+        log.info("Enqueuing a pull for a non-active shard, collection=" + collectionName + " shard=" + shardName + " coreName=" + coreName);
+      } else {
+        log.info("Enqueuing a pull for collection=" + collectionName + " shard=" + shardName + " coreName=" + coreName);
       }
+
+      String sharedShardName = (String) shard.get(ZkStateReader.SHARED_SHARD_NAME);
+
+      PushPullData pushPullData = new PushPullData.Builder()
+          .setCollectionName(collectionName)
+          .setShardName(shardName)
+          .setCoreName(coreName)
+          .setSharedStoreName(sharedShardName)
+          .build();
+
+      enqueueForPullIfNecessary(requestPath, pushPullData);
+
+    } catch (Exception ex) {
+      // wrap every thrown exception in a solr exception
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error enqueueing a pull from the shared store," +
+          " collection=" + collectionName + " shard=" + shardName + " coreName=" + coreName, ex);
     }
   }
 
@@ -120,7 +125,7 @@ public class CorePullTracker {
   /**
    * Enqueues a core for pull
    * 
-   * @param pushPullData pull request data required to interact with blob store
+   * @param pushPullData pull request data required to interact with the shared store
    * @param createCoreIfAbsent whether to create core before pulling if absent
    * @param waitForSearcher whether to wait for newly pulled contents be reflected through searcher 
    */
diff --git a/solr/core/src/java/org/apache/solr/store/blob/process/CorePuller.java b/solr/core/src/java/org/apache/solr/store/blob/process/CorePuller.java
new file mode 100644
index 0000000..51c8c9a
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/store/blob/process/CorePuller.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.store.blob.process;
+
+import java.lang.invoke.MethodHandles;
+
+import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.store.blob.client.BlobCoreMetadata;
+import org.apache.solr.store.blob.client.BlobCoreMetadataBuilder;
+import org.apache.solr.store.blob.client.CoreStorageClient;
+import org.apache.solr.store.blob.metadata.CorePushPull;
+import org.apache.solr.store.blob.metadata.PushPullData;
+import org.apache.solr.store.blob.metadata.ServerSideMetadata;
+import org.apache.solr.store.blob.metadata.SharedStoreResolutionUtil;
+import org.apache.solr.store.blob.util.BlobStoreUtils;
+import org.apache.solr.store.shared.SharedCoreConcurrencyController;
+import org.apache.solr.store.shared.metadata.SharedShardMetadataController;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class executes synchronous pulls of cores from the shared store.
+ */
+public class CorePuller {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * Brings a local core up to date with the shard's index in the shared store.
+   * 
+   * @param core core to be pulled
+   * @param sharedShardName identifier for the shard index data located on a shared store
+   * @param shardVersionMetadata metadata pointing to the version of shard's index in the shared store to be pulled
+   * @param isLeaderInitiated whether pull is requested by a leader replica or not
+   */
+  public void pullCoreFromSharedStore(SolrCore core, String sharedShardName,
+                                      SharedShardMetadataController.SharedShardVersionMetadata shardVersionMetadata,
+                                      boolean isLeaderInitiated) {
+    CloudDescriptor cloudDescriptor = core.getCoreDescriptor().getCloudDescriptor();
+    String collectionName = cloudDescriptor.getCollectionName();
+    String shardName = cloudDescriptor.getShardId();
+    String coreName = core.getName();
+    try {
+      log.info("Initiating pull for collection=" + collectionName + " shard=" + shardName + " coreName=" + coreName);
+      CoreContainer coreContainer = core.getCoreContainer();
+      SharedCoreConcurrencyController concurrencyController = coreContainer.getSharedStoreManager().getSharedCoreConcurrencyController();
+      if (SharedShardMetadataController.METADATA_NODE_DEFAULT_VALUE.equals(shardVersionMetadata.getMetadataSuffix())) {
+        //no-op pull
+        BlobCoreMetadata emptyBlobCoreMetadata = BlobCoreMetadataBuilder.buildEmptyCoreMetadata(sharedShardName);
+        concurrencyController.updateCoreVersionMetadata(collectionName, shardName, coreName, shardVersionMetadata, emptyBlobCoreMetadata, isLeaderInitiated);
+        log.info("Pull successful, nothing to pull, collection=" + collectionName + " shard=" + shardName + " coreName=" + coreName);
+        return;
+      }
+      concurrencyController.recordState(collectionName, shardName, coreName, SharedCoreConcurrencyController.SharedCoreStage.BLOB_PULL_STARTED);
+      try {
+        // Get blob metadata
+        String blobCoreMetadataName = BlobStoreUtils.buildBlobStoreMetadataName(shardVersionMetadata.getMetadataSuffix());
+        CoreStorageClient blobClient = coreContainer.getSharedStoreManager().getBlobStorageProvider().getClient();
+        BlobCoreMetadata blobCoreMetadata = blobClient.pullCoreMetadata(sharedShardName, blobCoreMetadataName);
+        if (null == blobCoreMetadata) {
+          // Zookepeer and blob are out of sync, could be due to eventual consistency model in blob or something else went wrong.
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+              "core.metadata file needed for pull is missing from shared store, blobCoreMetadataName=" + blobCoreMetadataName +
+                  " shard=" + shardName +
+                  " collectionName=" + collectionName +
+                  " sharedShardName=" + sharedShardName);
+        } else if (blobCoreMetadata.getIsDeleted()) {
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+              "core.metadata file needed for pull is marked deleted in shared store, blobCoreMetadataName=" + blobCoreMetadataName +
+                  " shard=" + shardName +
+                  " collectionName=" + collectionName +
+                  " sharedShardName=" + sharedShardName);
+        } else if (blobCoreMetadata.getIsCorrupt()) {
+          log.warn("core.metadata file needed for pull is marked corrupt, skipping sync, collection=" + collectionName +
+              " shard=" + shardName + " coreName=" + coreName + " sharedShardName=" + sharedShardName);
+          return;
+        }
+
+        // Get local metadata + resolve with blob metadata. Given we're doing a pull, don't need to reserve commit point
+        // We do need to compute a directory hash to verify after pulling or before switching index dirs that no local 
+        // changes occurred concurrently
+        ServerSideMetadata serverMetadata = new ServerSideMetadata(coreName, coreContainer,
+            /* reserveCommit */ false, /* captureDirHash */ true);
+        SharedStoreResolutionUtil.SharedMetadataResolutionResult resolutionResult = SharedStoreResolutionUtil.resolveMetadata(serverMetadata, blobCoreMetadata);
+        PushPullData pushPullData = new PushPullData.Builder()
+            .setCollectionName(collectionName)
+            .setShardName(shardName)
+            .setCoreName(coreName)
+            .setSharedStoreName(sharedShardName)
+            .build();
+
+        if (resolutionResult.getFilesToPull().size() > 0) {
+          BlobDeleteManager deleteManager = coreContainer.getSharedStoreManager().getBlobDeleteManager();
+          CorePushPull cp = new CorePushPull(blobClient, deleteManager, pushPullData, resolutionResult, serverMetadata, blobCoreMetadata);
+          cp.pullUpdateFromBlob(/* waitForSearcher */ true);
+          concurrencyController.updateCoreVersionMetadata(pushPullData.getCollectionName(), pushPullData.getShardName(), pushPullData.getCoreName(),
+              shardVersionMetadata, blobCoreMetadata, isLeaderInitiated);
+        } else {
+          log.warn("Why there are no files to pull even when we do not match with the version in zk? " +
+              "collection=" + collectionName + " shard=" + shardName + " coreName=" + coreName);
+        }
+      } finally {
+        concurrencyController.recordState(collectionName, shardName, coreName, SharedCoreConcurrencyController.SharedCoreStage.BLOB_PULL_FINISHED);
+      }
+    } catch (Exception ex) {
+      // wrap every thrown exception in a solr exception
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error trying to pull from the shared store," +
+          " collection=" + collectionName + " shard=" + shardName + " coreName=" + coreName, ex);
+    }
+  }
+
+}
diff --git a/solr/core/src/java/org/apache/solr/store/blob/process/CorePusher.java b/solr/core/src/java/org/apache/solr/store/blob/process/CorePusher.java
index 685fd78..769337f 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/process/CorePusher.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/process/CorePusher.java
@@ -18,12 +18,12 @@
 package org.apache.solr.store.blob.process;
 
 import java.lang.invoke.MethodHandles;
-import java.util.Locale;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.index.IndexNotFoundException;
 import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
+import org.apache.solr.cloud.CloudDescriptor;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.core.CoreContainer;
@@ -46,32 +46,49 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This class executes synchronous pushes of core updates to blob store. See the implementation of asynchronous pulls
- * in {@link CorePullerFeeder}.
+ * This class executes synchronous pushes of core updates to the shared store.
  * 
- * Pushes will be triggered from {@link CoreUpdateTracker}, which Solr code notifies when a shard's index data has 
- * changed locally and needs to be persisted to a shared store (blob store). 
+ * Pushes will be triggered at the end of an indexing batch when a shard's index data has 
+ * changed locally and needs to be persisted to the shared store. 
  */
 public class CorePusher {
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private CoreContainer coreContainer;
-
-  public CorePusher(CoreContainer coreContainer) {
-    this.coreContainer = coreContainer;
+  /**
+   * Pushes a core to the shared store.
+   * @param core core to be pushed
+   * @param sharedShardName identifier for the shard index data located on the shared store
+   */
+  public void pushCoreToSharedStore(SolrCore core, String sharedShardName) {
+    CloudDescriptor cloudDescriptor = core.getCoreDescriptor().getCloudDescriptor();
+    String collectionName = cloudDescriptor.getCollectionName();
+    String shardName = cloudDescriptor.getShardId();
+    String coreName = core.getName();
+    try {
+      log.info("Initiating push for collection=" + collectionName + " shard=" + shardName + " coreName=" + coreName);
+      PushPullData pushPullData = new PushPullData.Builder()
+          .setCollectionName(collectionName)
+          .setShardName(shardName)
+          .setCoreName(coreName)
+          .setSharedStoreName(sharedShardName)
+          .build();
+      pushCoreToSharedStore(core, pushPullData);
+    } catch (Exception ex) {
+      // wrap every thrown exception in a solr exception
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error trying to push to the shared store," +
+          " collection=" + collectionName + " shard=" + shardName + " coreName=" + coreName, ex);
+    }
   }
 
   /**
-   * Pushes the local core updates to the Blob store and logs whether the push succeeded or failed.
+   * Pushes the local core updates to the shared store and logs whether the push succeeded or failed.
    */
-  public void pushCoreToBlob(PushPullData pushPullData) throws Exception {
-    BlobStorageProvider blobProvider = coreContainer.getSharedStoreManager().getBlobStorageProvider();
-    CoreStorageClient blobClient = blobProvider.getClient();
-    BlobDeleteManager deleteManager = coreContainer.getSharedStoreManager().getBlobDeleteManager();
+  private void pushCoreToSharedStore(SolrCore core, PushPullData pushPullData) throws Exception {
     try {
-      String shardName = pushPullData.getShardName();
+      CoreContainer coreContainer = core.getCoreContainer();
       String collectionName = pushPullData.getCollectionName();
+      String shardName = pushPullData.getShardName();
       String coreName = pushPullData.getCoreName();
       SharedCoreConcurrencyController concurrencyController = coreContainer.getSharedStoreManager().getSharedCoreConcurrencyController();
       ReentrantLock corePushLock = concurrencyController.getCorePushLock(collectionName, shardName, coreName);
@@ -94,10 +111,6 @@ public class CorePusher {
       corePushLock.lock();
       try {
         long lockAcquisitionTime = BlobStoreUtils.getCurrentTimeMs() - startTimeMs;
-        SolrCore core = coreContainer.getCore(coreName);
-        if (core == null) {
-          throw new SolrException(ErrorCode.SERVER_ERROR, "Can't find core " + coreName);
-        }
         try {
           concurrencyController.recordState(collectionName, shardName, coreName, SharedCoreStage.BLOB_PUSH_STARTED);
 
@@ -112,7 +125,7 @@ public class CorePusher {
             // This can happen if another indexing batch comes in and acquires the push lock first and ends up pushing segments 
             // produced by this indexing batch.
             // This optimization saves us from creating somewhat expensive ServerSideMetadata.
-            // Note1! At this point we might not be the leader and shared store might have already received pushes from
+            // Note1! At this point we might not be the leader and the shared store might have already received pushes from
             //        new leader. It is still ok to declare success since our indexing batch was correctly pushed earlier
             //        by another thread before the new leader could have pushed its batches.
             // Note2! It is important to note that we are piggybacking on cached BlobCoreMetadata's generation number here.
@@ -121,15 +134,13 @@ public class CorePusher {
             //        successful pull. But that is not necessary since local core will be on blobCoreMetadata's generation
             //        number after pull and on push blobCoreMetadata get generation number from local core.
             //        The reason it is important to call it out here is that BlobCoreMetadata' generation also gets
-            //        persisted to shared store which is not the requirement for this optimization.
-            log.info(String.format(Locale.ROOT,
-                "Nothing to push, pushLockTime=%s pushPullData=%s", lockAcquisitionTime, pushPullData.toString()));
+            //        persisted to the shared store which is not the requirement for this optimization.
+            log.info("Nothing to push, pushLockTime=" + lockAcquisitionTime + " pushPullData=" + pushPullData.toString());
             return;
           }
 
-          log.info("Push to shared store initiating with PushPullData= " + pushPullData.toString());
-          // Resolve the differences (if any) between the local shard index data and shard index data on shared store
-          // Reserving the commit point so it can be saved while pushing files to Blob store.
+          // Resolve the differences (if any) between the local shard index data and shard index data on the shared store
+          // Reserving the commit point so it can be saved while pushing files to the shared store.
           // We don't need to compute a directory hash for the push scenario as we only need it to verify local 
           // index changes during pull
           ServerSideMetadata localCoreMetadata = new ServerSideMetadata(coreName, coreContainer, 
@@ -138,12 +149,14 @@ public class CorePusher {
               localCoreMetadata, coreVersionMetadata.getBlobCoreMetadata());
 
           if (resolutionResult.getFilesToPush().isEmpty()) {
-            log.warn(String.format(Locale.ROOT,
-                "Why there is nothing to push even when there is a newer commit point since last push," +
-                " pushLockTime=%s pushPullData=%s", lockAcquisitionTime, pushPullData.toString()));
+            log.warn("Why there is nothing to push even when there is a newer commit point since last push," +
+                " pushLockTime=" + lockAcquisitionTime + " pushPullData=" + pushPullData.toString());
             return;
           }
 
+          BlobStorageProvider blobProvider = coreContainer.getSharedStoreManager().getBlobStorageProvider();
+          CoreStorageClient blobClient = blobProvider.getClient();
+          BlobDeleteManager deleteManager = coreContainer.getSharedStoreManager().getBlobDeleteManager();
           String newMetadataSuffix = BlobStoreUtils.generateMetadataSuffix();
           // begin the push process 
           CorePushPull pushPull = new CorePushPull(blobClient, deleteManager, pushPullData, resolutionResult, localCoreMetadata, coreVersionMetadata.getBlobCoreMetadata());
@@ -153,7 +166,7 @@ public class CorePusher {
           SharedShardMetadataController shardSharedMetadataController = coreContainer.getSharedStoreManager().getSharedShardMetadataController();
           SharedShardVersionMetadata newShardVersionMetadata = null;
           try {
-            newShardVersionMetadata = shardSharedMetadataController.updateMetadataValueWithVersion(pushPullData.getCollectionName(), pushPullData.getShardName(),
+            newShardVersionMetadata = shardSharedMetadataController.updateMetadataValueWithVersion(collectionName, shardName,
                 newMetadataSuffix, coreVersionMetadata.getVersion());
           } catch (Exception ex) {
             boolean isVersionMismatch = false;
@@ -167,7 +180,7 @@ public class CorePusher {
             }
             if (isVersionMismatch) {
               // conditional update of zookeeper failed, take away soft guarantee of equality.
-              // That will make sure before processing next indexing batch, we sync with zookeeper and pull from shared store.
+              // That will make sure before processing next indexing batch, we sync with zookeeper and pull from the shared store.
               concurrencyController.updateCoreVersionMetadata(collectionName, shardName, coreName, false);
             }
             throw ex;
@@ -176,34 +189,27 @@ public class CorePusher {
 
           assert newMetadataSuffix.equals(newShardVersionMetadata.getMetadataSuffix());
           // after successful update to zookeeper, update core version metadata with new version info
-          // and we can also give soft guarantee that core is up to date w.r.to shared store, until unless failures happen and leadership changes 
+          // and we can also give soft guarantee that core is up to date w.r.to the shared store, until unless failures happen and leadership changes 
           concurrencyController.updateCoreVersionMetadata(collectionName, shardName, coreName, newShardVersionMetadata, blobCoreMetadata, /* softGuaranteeOfEquality */ true);
           concurrencyController.recordState(collectionName, shardName, coreName, SharedCoreStage.LOCAL_CACHE_UPDATE_FINISHED);
-          log.info(String.format(Locale.ROOT,
-              "Successfully pushed to shared store, pushLockTime=%s pushPullData=%s", lockAcquisitionTime, pushPullData.toString()));
+          log.info("Successfully pushed to the shared store," +
+              " pushLockTime=" + lockAcquisitionTime + " pushPullData=" + pushPullData.toString());
         } finally {
             concurrencyController.recordState(collectionName, shardName, coreName, SharedCoreStage.BLOB_PUSH_FINISHED);
-            core.close();
         }
       } finally {
         corePushLock.unlock();
       }
       // TODO - make error handling a little nicer?
     } catch (InterruptedException e) {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "CorePusher was interrupted while pushing to blob store", e);
-    } catch (IndexNotFoundException infe) {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "CorePusher failed because the core " + pushPullData.getCoreName() +
-          " for the shard " + pushPullData.getShardName() + " was not found", infe);
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "CorePusher was interrupted while pushing to the shared store", e);
     } catch (SolrException e) {
       Throwable t = e.getCause();
       if (t instanceof BadVersionException) {
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "CorePusher failed to push because the node "
-            + "version doesn't match.", t);
+            + "version doesn't match, requestedVersion=" + ((BadVersionException) t).getRequested(), t);
       }
       throw e;
-    } catch (Exception e) {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "CorePusher failed to push shard index for "
-          + pushPullData.getShardName() + " due to unexpected exception", e);
     }
   }
 }
\ No newline at end of file
diff --git a/solr/core/src/java/org/apache/solr/store/blob/process/CoreSyncFeeder.java b/solr/core/src/java/org/apache/solr/store/blob/process/CoreSyncFeeder.java
index 985fb07..9b4c5fc 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/process/CoreSyncFeeder.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/process/CoreSyncFeeder.java
@@ -23,7 +23,6 @@ import java.util.Locale;
 import java.util.Set;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
-//import com.force.commons.util.concurrent.NamedThreadFactory; difference?
 import org.apache.lucene.util.NamedThreadFactory;
 import org.apache.solr.store.shared.SharedStoreManager;
 import org.slf4j.Logger;
@@ -43,8 +42,8 @@ public abstract class CoreSyncFeeder implements Runnable, Closeable {
    * Maximum number of elements in the queue, NOT counting re-inserts after failures. Total queue size might therefore
    * exceed this value by the number of syncThreads which is around 5 or 10...
    * <p>
-   * Note that this queue sits behind other tracking queues (see {@link CoreUpdateTracker} and
-   * {@link CorePullTracker}). The other queue has to be large, this one does not.
+   * Note that this queue sits behind another tracking queue (see {@link CorePullTracker}).
+   * The other queue has to be large, this one does not.
    */
   protected static final int ALMOST_MAX_WORKER_QUEUE_SIZE = 2000;
 
diff --git a/solr/core/src/java/org/apache/solr/store/blob/process/CoreUpdateTracker.java b/solr/core/src/java/org/apache/solr/store/blob/process/CoreUpdateTracker.java
deleted file mode 100644
index e611ca8..0000000
--- a/solr/core/src/java/org/apache/solr/store/blob/process/CoreUpdateTracker.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.store.blob.process;
-
-import java.lang.invoke.MethodHandles;
-
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.core.CoreContainer;
-import org.apache.solr.store.blob.metadata.PushPullData;
-import org.apache.solr.store.shared.metadata.SharedShardMetadataController;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Class to track local core updates that need pushing to Blob Store.
- */
-public class CoreUpdateTracker {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  private CoreContainer coreContainer;
-  private SharedShardMetadataController shardSharedMetadataController; 
-
-  public CoreUpdateTracker(CoreContainer coreContainer) {
-    this.coreContainer = coreContainer;
-    shardSharedMetadataController = coreContainer.getSharedStoreManager().getSharedShardMetadataController();
-  }
-
-  /**
-   * 
-   * Persist the shard index data to the underlying durable shared storage provider using the specified
-   * cluster snapshot.
-   * 
-   * The responsibility is to the caller using the shared store layer to propagate the desired cluster state
-   * from which we'll validate the update request
-   * 
-   * @param clusterState the state of the cluster
-   * @param collectionName name of the share-type collection that should persist its shard index data 
-   * to a shared storage provider
-   * @param shardName name of the shard that should be persisted
-   * @param coreName the name of the core from which the update request is being processed
-   */
-  public void persistShardIndexToSharedStore(ClusterState clusterState, String collectionName, String shardName, String coreName)
-      throws SolrException {
-    DocCollection collection = clusterState.getCollection(collectionName);
-    if (!collection.getSharedIndex()) {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can't push shard index data for a collection that"
-          + "is not of type shared. Collection=" + collectionName + " shard=" + shardName);
-    }
-
-    Slice shard = collection.getSlicesMap().get(shardName);
-    if (shard != null) {
-      try {
-        if (!collection.getActiveSlices().contains(shard)) {
-          // unclear if there are side effects but logging for now
-          log.warn("Performing a push for shard " + shardName + " that is inactive!");
-        }
-        log.info("Initiating push for collection=" + collectionName + " shard=" + shardName + " coreName=" + coreName);
-
-        String sharedShardName = (String) shard.get(ZkStateReader.SHARED_SHARD_NAME);
-
-        PushPullData pushPullData = new PushPullData.Builder()
-            .setCollectionName(collectionName)
-            .setShardName(shardName)
-            .setCoreName(coreName)
-            .setSharedStoreName(sharedShardName)
-            .build();
-        CorePusher pusher = new CorePusher(coreContainer);
-        pusher.pushCoreToBlob(pushPullData);
-      } catch (Exception ex) {
-        // wrap every thrown exception in a solr exception
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error trying to push to blob store", ex);
-      }
-    } else {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can't push shard index data with name " + shardName 
-          + " for collection " + collectionName + " because the shard does not exist in the cluster state");
-    }
-  }
-}
diff --git a/solr/core/src/java/org/apache/solr/store/blob/util/BlobStoreUtils.java b/solr/core/src/java/org/apache/solr/store/blob/util/BlobStoreUtils.java
index 74dc950..7404938 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/util/BlobStoreUtils.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/util/BlobStoreUtils.java
@@ -16,48 +16,26 @@
  */
 
 package org.apache.solr.store.blob.util;
-import java.lang.invoke.MethodHandles;
+
 import java.util.HashMap;
-import java.util.Locale;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.solr.cloud.CloudDescriptor;
-import org.apache.solr.cloud.ZkController;
-import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CollectionAdminParams;
-import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.store.blob.client.BlobCoreMetadata;
-import org.apache.solr.store.blob.client.BlobCoreMetadataBuilder;
-import org.apache.solr.store.blob.client.CoreStorageClient;
-import org.apache.solr.store.blob.metadata.CorePushPull;
-import org.apache.solr.store.blob.metadata.PushPullData;
-import org.apache.solr.store.blob.metadata.ServerSideMetadata;
-import org.apache.solr.store.blob.metadata.SharedStoreResolutionUtil;
-import org.apache.solr.store.blob.metadata.SharedStoreResolutionUtil.SharedMetadataResolutionResult;
-import org.apache.solr.store.blob.process.BlobDeleteManager;
-import org.apache.solr.store.shared.SharedCoreConcurrencyController;
-import org.apache.solr.store.shared.SharedCoreConcurrencyController.SharedCoreStage;
-import org.apache.solr.store.shared.metadata.SharedShardMetadataController;
-import org.apache.solr.store.shared.metadata.SharedShardMetadataController.SharedShardVersionMetadata;
 import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Utility class for BlobStore components
  */
 public class BlobStoreUtils {
 
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  
   /** 
    * The only blob that has a constant name for a core is the metadata. Basically the Blob store's equivalent for a core
    * of the highest segments_N file for a Solr server. 
@@ -85,93 +63,6 @@ public class BlobStoreUtils {
     return TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
   }
 
-  /***
-   * syncLocalCoreWithSharedStore checks the local core has the latest copy stored in shared storage. Updates from the blob store always override the
-   * local content.
-   * @throws SolrException if the local core was not successfully sync'd.
-   */
-  public static void syncLocalCoreWithSharedStore(String collectionName, String coreName, String shardName, CoreContainer coreContainer,
-                                                  SharedShardVersionMetadata shardVersionMetadata, boolean isLeaderPulling) throws SolrException {
-    assert coreContainer.isZooKeeperAware();
-
-    ZkController zkController = coreContainer.getZkController();
-    DocCollection collection = zkController.getClusterState().getCollection(collectionName);
-    CoreStorageClient blobClient = coreContainer.getSharedStoreManager().getBlobStorageProvider().getClient();
-    log.info("sync initialized for collection=" + collectionName + " shard=" + shardName + " coreName=" + coreName);
-
-    Slice shard = collection.getSlicesMap().get(shardName);
-    if (shard != null) {
-      try {
-        String sharedStoreName = (String) shard.get(ZkStateReader.SHARED_SHARD_NAME);
-        SharedCoreConcurrencyController concurrencyController = coreContainer.getSharedStoreManager().getSharedCoreConcurrencyController();
-        if (SharedShardMetadataController.METADATA_NODE_DEFAULT_VALUE.equals(shardVersionMetadata.getMetadataSuffix())) {
-          //no-op pull
-          BlobCoreMetadata emptyBlobCoreMetadata = BlobCoreMetadataBuilder.buildEmptyCoreMetadata(sharedStoreName);
-          concurrencyController.updateCoreVersionMetadata(collectionName, shardName, coreName, shardVersionMetadata, emptyBlobCoreMetadata, isLeaderPulling);
-          log.info("sync successful, nothing to pull, collection=" + collectionName + " shard=" + shardName + " coreName=" + coreName);
-          return;
-        }
-        concurrencyController.recordState(collectionName, shardName, coreName, SharedCoreStage.BLOB_PULL_STARTED);
-        try {
-          // Get blob metadata
-          String blobCoreMetadataName = BlobStoreUtils.buildBlobStoreMetadataName(shardVersionMetadata.getMetadataSuffix());
-          BlobCoreMetadata blobstoreMetadata = blobClient.pullCoreMetadata(sharedStoreName, blobCoreMetadataName);
-          if (null == blobstoreMetadata) {
-            // Zookepeer and blob are out of sync, could be due to eventual consistency model in blob or something else went wrong.
-            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-                "cannot get core.metadata file from shared store, blobCoreMetadataName=" + blobCoreMetadataName +
-                    " shard=" + shardName +
-                    " collectionName=" + collectionName +
-                    " sharedStoreName=" + sharedStoreName);
-          } else if (blobstoreMetadata.getIsDeleted()) {
-            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-                "core.metadata file is marked deleted in shared store, blobCoreMetadataName=" + blobCoreMetadataName +
-                    " shard=" + shardName +
-                    " collectionName=" + collectionName +
-                    " sharedStoreName=" + sharedStoreName);
-          } else if (blobstoreMetadata.getIsCorrupt()) {
-            log.warn("core.Metadata file is marked corrpt, skipping sync, collection=" + collectionName +
-                " shard=" + shardName + " coreName=" + coreName + " sharedStoreName=" + sharedStoreName);
-            return;
-          }
-
-          // Get local metadata + resolve with blob metadata. Given we're doing a pull, don't need to reserve commit point
-          // We do need to compute a directory hash to verify after pulling or before switching index dirs that no local 
-          // changes occurred concurrently
-          ServerSideMetadata serverMetadata = new ServerSideMetadata(coreName, coreContainer, 
-              /* reserveCommit */ false, /* captureDirHash */ true);
-          SharedMetadataResolutionResult resolutionResult = SharedStoreResolutionUtil.resolveMetadata(serverMetadata, blobstoreMetadata);
-          PushPullData pushPullData = new PushPullData.Builder()
-              .setCollectionName(collectionName)
-              .setShardName(shardName)
-              .setCoreName(coreName)
-              .setSharedStoreName(sharedStoreName)
-              .build();
-
-          if (resolutionResult.getFilesToPull().size() > 0) {
-            BlobDeleteManager deleteManager = coreContainer.getSharedStoreManager().getBlobDeleteManager();
-            CorePushPull cp = new CorePushPull(blobClient, deleteManager, pushPullData, resolutionResult, serverMetadata, blobstoreMetadata);
-            cp.pullUpdateFromBlob(/* waitForSearcher */ true);
-            concurrencyController.updateCoreVersionMetadata(pushPullData.getCollectionName(), pushPullData.getShardName(), pushPullData.getCoreName(),
-                shardVersionMetadata, blobstoreMetadata, isLeaderPulling);
-          } else {
-            log.warn(String.format(Locale.ROOT,
-                "Why there are no files to pull even when we do not match with the version in zk? collection=%s shard=%s core=%s",
-                collectionName, shardName, coreName));
-          }
-        } finally {
-          concurrencyController.recordState(collectionName, shardName, coreName, SharedCoreStage.BLOB_PULL_FINISHED);
-        }
-      } catch (Exception ex) {
-        // wrap every thrown exception in a solr exception
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error occurred pulling shard=" + shardName + " collection=" + collectionName + " from shared store", ex);
-      }
-    } else {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Sync requested for unknown shard=" + shardName + " in collection=" + collectionName);
-    }
-
-  }
-
   /**
    * Returns the list of core properties that are needed to create a missing core corresponding
    * to provided {@code replica} of the {@code collection}. 
diff --git a/solr/core/src/java/org/apache/solr/store/shared/SharedCoreConcurrencyController.java b/solr/core/src/java/org/apache/solr/store/shared/SharedCoreConcurrencyController.java
index a5ab288..7c2b213 100644
--- a/solr/core/src/java/org/apache/solr/store/shared/SharedCoreConcurrencyController.java
+++ b/solr/core/src/java/org/apache/solr/store/shared/SharedCoreConcurrencyController.java
@@ -23,23 +23,21 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.StringUtils;
 import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrCore;
 import org.apache.solr.servlet.HttpSolrCall;
 import org.apache.solr.store.blob.client.BlobCoreMetadata;
 import org.apache.solr.store.blob.metadata.PushPullData;
 import org.apache.solr.store.blob.process.CorePullTask;
+import org.apache.solr.store.blob.process.CorePuller;
 import org.apache.solr.store.blob.process.CorePusher;
-import org.apache.solr.store.blob.util.BlobStoreUtils;
-import org.apache.solr.store.shared.metadata.SharedShardMetadataController;
 import org.apache.solr.store.shared.metadata.SharedShardMetadataController.SharedShardVersionMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * This class helps coordinate synchronization of concurrent indexing, pushes and pulls
  * happening on a core of a shared collection {@link DocCollection#getSharedIndex()}
@@ -121,15 +119,6 @@ import com.google.common.annotations.VisibleForTesting;
 
 public class SharedCoreConcurrencyController {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  /**
-   * Time indexing thread needs to wait to try acquiring pull write lock before checking if someone else has already done the pull.
-   */
-  public static int SECONDS_TO_WAIT_INDEXING_PULL_WRITE_LOCK = 5;
-  /**
-   * Max attempts by indexing thread to try acquiring pull write lock before bailing out. Ideally bail out scenario should never happen.
-   * If it does then either we are too slow in pulling and can tune this value or something else is wrong.
-   */
-  public static int MAX_ATTEMPTS_INDEXING_PULL_WRITE_LOCK = 10;
 
   /**
    * This cache maintains the shared store version the each core is at or ahead of(core has to sometimes be ahead of
@@ -173,7 +162,7 @@ public class SharedCoreConcurrencyController {
   /**
    * Returns a {@link ReentrantReadWriteLock} corresponding to the core. It protects pulls from each other and indexing from pulls.
    * A write lock is required whenever pulling contents into a core from shared store.
-   * A read lock is required for the whole duration of indexing on the core(including the push to shared store {@link CorePusher#pushCoreToBlob(PushPullData)}.)
+   * A read lock is required for the whole duration of indexing on the core(including the push to shared store {@link CorePusher#pushCoreToSharedStore(SolrCore, PushPullData)}.)
    */
   public ReentrantReadWriteLock getCorePullLock(String collectionName, String shardName, String coreName) {
     SharedCoreVersionMetadata coreVersionMetadata = getOrCreateCoreVersionMetadata(collectionName, shardName, coreName);
@@ -182,7 +171,7 @@ public class SharedCoreConcurrencyController {
 
   /**
    * Returns a {@link ReentrantLock} corresponding to the core. It protects shared store pushes from each other.
-   * This lock is required for pushing the core to shared store {@link CorePusher#pushCoreToBlob(PushPullData)}.
+   * This lock is required for pushing the core to shared store {@link CorePusher#pushCoreToSharedStore(SolrCore, PushPullData)}.
    */
   public ReentrantLock getCorePushLock(String collectionName, String shardName, String coreName) {
     SharedCoreVersionMetadata coreVersionMetadata = getOrCreateCoreVersionMetadata(collectionName, shardName, coreName);
@@ -324,9 +313,9 @@ public class SharedCoreConcurrencyController {
     /**
      * Whether there is a soft guarantee of being in sync with {@link SharedShardVersionMetadata} of the shard.
      * In steady state this guarantee is provided for leader cores when they push
-     * {@link CorePusher#pushCoreToBlob(PushPullData)}
+     * {@link CorePusher#pushCoreToSharedStore(SolrCore, PushPullData)} 
      * and pull
-     * {@link BlobStoreUtils#syncLocalCoreWithSharedStore(String, String, String, CoreContainer, SharedShardVersionMetadata, boolean)}
+     * {@link CorePuller#pullCoreFromSharedStore(SolrCore, String, SharedShardVersionMetadata, boolean)} 
      * {@link CorePullTask#pullCoreFromBlob(boolean)} ()}
      * since followers cannot index. In presence of this guarantee we can skip consulting zookeeper before processing an indexing batch.
      */
diff --git a/solr/core/src/java/org/apache/solr/store/shared/SharedCoreIndexingBatchProcessor.java b/solr/core/src/java/org/apache/solr/store/shared/SharedCoreIndexingBatchProcessor.java
new file mode 100644
index 0000000..e3e87ac
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/store/shared/SharedCoreIndexingBatchProcessor.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.store.shared;
+
+import java.io.Closeable;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.store.blob.process.CorePuller;
+import org.apache.solr.store.blob.process.CorePusher;
+import org.apache.solr.store.shared.metadata.SharedShardMetadataController;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is responsible for bringing a stale SHARED core upto date by pulling from the shared store at the start 
+ * of an indexing batch and pushing the updated core at the end of a successfully committed indexing batch. 
+ */
+public class SharedCoreIndexingBatchProcessor implements Closeable {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  /**
+   * Time indexing thread needs to wait to try acquiring pull write lock before checking if someone else has already done the pull.
+   */
+  public static int SECONDS_TO_WAIT_INDEXING_PULL_WRITE_LOCK = 5;
+  /**
+   * Max attempts by indexing thread to try acquiring pull write lock before bailing out. Ideally bail out scenario should never happen.
+   * If it does then either we are too slow in pulling and can tune this value or something else is wrong.
+   */
+  public static int MAX_ATTEMPTS_INDEXING_PULL_WRITE_LOCK = 10;
+
+  private final SolrCore core;
+  private final String collectionName;
+  private final String shardName;
+  private final String sharedShardName;
+  private final CorePusher corePusher;
+  private final CorePuller corePuller;
+  private IndexingBatchState state;
+  private ReentrantReadWriteLock corePullLock;
+
+  public SharedCoreIndexingBatchProcessor(SolrCore core, ClusterState clusterState) {
+    this.core = core;
+    CloudDescriptor cloudDescriptor = core.getCoreDescriptor().getCloudDescriptor();
+    collectionName = cloudDescriptor.getCollectionName();
+    shardName = cloudDescriptor.getShardId();
+
+    DocCollection collection = clusterState.getCollection(collectionName);
+    if (!collection.getSharedIndex()) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, collectionName + " is not a shared collection.");
+    }
+
+    Slice shard = collection.getSlicesMap().get(shardName);
+    if (shard == null) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Indexing batch received for an unknown shard," +
+          " collection=" + collectionName + " shard=" + shardName + " core=" + core.getName());
+    }
+
+    if (!Slice.State.ACTIVE.equals(shard.getState())) {
+      // This happens when we buffer updates for a sub shard.
+      // SHARED replica should eventually stop supporting buffered updates and then this should become a real exception
+      log.warn("Processing an indexing batch for a non-active shard," +
+          " collection=" + collectionName + " shard=" + shardName + " core=" + core.getName());
+    }
+
+    sharedShardName = (String) shard.get(ZkStateReader.SHARED_SHARD_NAME);
+    corePuller = new CorePuller();
+    corePusher = new CorePusher();
+    state = IndexingBatchState.NOT_STARTED;
+  }
+
+  /**
+   * Should be called whenever a document is about to be added/deleted from the SHARED core. If it is the first doc
+   * of the core, this method will mark  the start of an indexing batch and bring a stale SHARED core upto date by
+   * pulling from the shared store.
+   */
+  public void addOrDeleteGoingToBeIndexedLocally() {
+    // Following logic is built on the assumption that one particular instance of this processor
+    // will solely be consumed by a single thread. And all the documents of indexing batch will be processed by this one instance. 
+    String coreName = core.getName();
+    if (IndexingBatchState.NOT_STARTED.equals(state)) {
+      startIndexingBatch();
+    } else if (IndexingBatchState.STARTED.equals(state)) {
+      // do nothing, we only use this method to start an indexing batch once
+    } else if (IndexingBatchState.COMMITTED.equals(state)) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+          "Why are we adding/deleting a doc through an already committed indexing batch?" +
+              " collection=" + collectionName + " shard=" + shardName + " core=" + coreName);
+    } else {
+      throwUnknownStateError();
+    }
+  }
+
+  @VisibleForTesting
+  protected void startIndexingBatch() {
+    // Following pull logic should only run once before the first add/delete of an indexing batch is processed by this processor
+
+    assert IndexingBatchState.NOT_STARTED.equals(state);
+
+    if (corePullLock != null) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "How come we already have a pull read lock?" +
+          " collection=" + collectionName + " shard=" + shardName + " core=" + core.getName());
+    }
+
+    String coreName = core.getName();
+    CoreContainer coreContainer = core.getCoreContainer();
+    SharedCoreConcurrencyController concurrencyController = coreContainer.getSharedStoreManager().getSharedCoreConcurrencyController();
+    corePullLock = concurrencyController.getCorePullLock(collectionName, shardName, coreName);
+    // from this point onward we should always exit this method with read lock (no matter failure or what)
+    try {
+      concurrencyController.recordState(collectionName, shardName, coreName, SharedCoreConcurrencyController.SharedCoreStage.INDEXING_BATCH_RECEIVED);
+      state = IndexingBatchState.STARTED;
+      SharedCoreConcurrencyController.SharedCoreVersionMetadata coreVersionMetadata = concurrencyController.getCoreVersionMetadata(collectionName, shardName, coreName);
+      /**
+       * we only need to sync if there is no soft guarantee of being in sync.
+       * if there is one we will rely on that, and if we turned out to be wrong indexing will fail at push time
+       * and will remove this guarantee in {@link CorePusher#pushCoreToBlob(PushPullData)}
+       */
+      if (!coreVersionMetadata.isSoftGuaranteeOfEquality()) {
+        SharedShardMetadataController metadataController = coreContainer.getSharedStoreManager().getSharedShardMetadataController();
+        SharedShardMetadataController.SharedShardVersionMetadata shardVersionMetadata = metadataController.readMetadataValue(collectionName, shardName);
+        if (!concurrencyController.areVersionsEqual(coreVersionMetadata, shardVersionMetadata)) {
+          acquireWriteLockAndPull(collectionName, shardName, coreName, coreContainer);
+        }
+      }
+    } finally {
+      // acquire lock for the whole duration of update
+      // we should always leave with read lock acquired(failure or success), since it is the job of close method to release it
+      corePullLock.readLock().lock();
+      concurrencyController.recordState(collectionName, shardName, coreName, SharedCoreConcurrencyController.SharedCoreStage.LOCAL_INDEXING_STARTED);
+    }
+  }
+
+  private void acquireWriteLockAndPull(String collectionName, String shardName, String coreName, CoreContainer coreContainer) {
+    // There is a likelihood that many indexing requests came at once and realized we are out of sync.
+    // They all would try to acquire write lock. One of them makes progress to pull from shared store.
+    // After that regular indexing will see soft guarantee of equality and moves straight to indexing
+    // under read lock. Now it is possible that new indexing keeps coming in and read lock is never free.
+    // In that case the poor guys that came in earlier and wanted to pull will still be struggling(starving) to
+    // acquire write lock. Since we know that write lock is only needed by one to do the work, we will
+    // try time boxed acquisition and in case of failed acquisition we will see if some one else has already completed the pull.
+    // We will make few attempts before we bail out. Ideally bail out scenario should never happen.
+    // If it does then either we are too slow in pulling and can tune following parameters or something else is wrong.
+    int attempt = 1;
+    while (true) {
+      SharedCoreConcurrencyController concurrencyController = coreContainer.getSharedStoreManager().getSharedCoreConcurrencyController();
+      try {
+        // try acquiring write lock
+        if (corePullLock.writeLock().tryLock(SECONDS_TO_WAIT_INDEXING_PULL_WRITE_LOCK, TimeUnit.SECONDS)) {
+          try {
+            // while acquiring write lock things might have updated, should reestablish if pull is still needed
+            SharedCoreConcurrencyController.SharedCoreVersionMetadata coreVersionMetadata = concurrencyController.getCoreVersionMetadata(collectionName, shardName, coreName);
+            if (!coreVersionMetadata.isSoftGuaranteeOfEquality()) {
+              SharedShardMetadataController metadataController = coreContainer.getSharedStoreManager().getSharedShardMetadataController();
+              SharedShardMetadataController.SharedShardVersionMetadata shardVersionMetadata = metadataController.readMetadataValue(collectionName, shardName);
+              if (!concurrencyController.areVersionsEqual(coreVersionMetadata, shardVersionMetadata)) {
+                getCorePuller().pullCoreFromSharedStore(core, sharedShardName, shardVersionMetadata, /* isLeaderPulling */true);
+              }
+            }
+          } finally {
+            corePullLock.writeLock().unlock();
+          }
+          // write lock acquisition was successful and we are in sync with shared store
+          break;
+        } else {
+          // we could not acquire write lock but see if some other thread has already done the pulling
+          SharedCoreConcurrencyController.SharedCoreVersionMetadata coreVersionMetadata = concurrencyController.getCoreVersionMetadata(collectionName, shardName, coreName);
+          if (coreVersionMetadata.isSoftGuaranteeOfEquality()) {
+            log.info("Indexing thread timed out trying to acquire the pull write lock. " +
+                "But some other thread has done the pulling so we are good. " +
+                "attempt=" + attempt + " collection=" + collectionName + " shard=" + shardName + " core=" + coreName);
+            break;
+          }
+          // no one else has pulled yet either, lets make another attempt ourselves
+          attempt++;
+          if (attempt > MAX_ATTEMPTS_INDEXING_PULL_WRITE_LOCK) {
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Indexing thread failed to acquire write lock for pull in " +
+                (SECONDS_TO_WAIT_INDEXING_PULL_WRITE_LOCK * MAX_ATTEMPTS_INDEXING_PULL_WRITE_LOCK) + " seconds." +
+                " And no one other thread either has done the pull during that time. " +
+                " collection=" + collectionName + " shard=" + shardName + " core=" + coreName);
+          }
+        }
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Indexing thread interrupted while trying to acquire pull write lock." +
+            " collection=" + collectionName + " shard=" + shardName + " core=" + coreName, ie);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  protected CorePuller getCorePuller() {
+    return corePuller;
+  }
+
+  /**
+   * Should be called after the SHARED core is successfully hard committed locally. This method will push the updated
+   * core to the shared store. If there was no local add/delete of a document for this processor then the push will be
+   * skipped.
+   */
+  public void hardCommitCompletedLocally() {
+    finishIndexingBatch();
+  }
+
+  protected void finishIndexingBatch() {
+    String coreName = core.getName();
+    if (IndexingBatchState.NOT_STARTED.equals(state)) {
+      // Since we did not added/deleted a single document therefore it is an isolated commit.
+      // Few ways isolated commit can manifest:
+      // 1. Client issuing a separate commit command after the update command.
+      // 2. SolrJ client issuing a separate follow up commit command to affected shards than actual indexing request
+      //    even when SolrJ client's caller issued a single update command with commit=true.
+      // 3. The replica that received the indexing batch first was either a follower replica or the leader of another 
+      //    shard and only did the job of forwarding the docs to their rightful leader. Therefore, at the end it has
+      //    nothing to commit.
+      // Shared replica has a hard requirement of processing each indexing batch with a hard commit(either explicit or
+      // implicit) because that is how, at the end of an indexing batch, synchronous push to shared store gets hold
+      // of the segment files on local disk.
+      // Therefore, isolated commits are meaningless for SHARED replicas and we can ignore writing to shared store. 
+      // If we ever need an isolated commit to write to shared store for some scenario, we should first understand if a
+      // pull from shared store was done or not(why not) and push should happen under corePullLock.readLock()
+      state = IndexingBatchState.COMMITTED;
+      log.info("Isolated commit encountered for a SHARED replica, ignoring writing to shared store." +
+          " collection=" + collectionName + " shard=" + shardName + " core=" + coreName);
+    } else if (IndexingBatchState.STARTED.equals(state)) {
+      if (corePullLock == null) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+            "How were we able to start an indexing batch without acquiring a pull read lock?" +
+                " collection=" + collectionName + " shard=" + shardName + " core=" + coreName);
+      }
+      SharedCoreConcurrencyController concurrencyController = core.getCoreContainer().getSharedStoreManager().getSharedCoreConcurrencyController();
+      concurrencyController.recordState(collectionName, shardName, coreName, SharedCoreConcurrencyController.SharedCoreStage.LOCAL_INDEXING_FINISHED);
+      state = IndexingBatchState.COMMITTED;
+      getCorePusher().pushCoreToSharedStore(core, sharedShardName);
+    } else if (IndexingBatchState.COMMITTED.equals(state)) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+          "Why are we committing an already committed indexing batch?" +
+              " collection=" + collectionName + " shard=" + shardName + " core=" + coreName);
+    } else {
+      throwUnknownStateError();
+    }
+  }
+
+  private void throwUnknownStateError() {
+    throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Programmer's error, unknown IndexingBatchState" + state
+        + " collection=" + collectionName + " shard=" + shardName + " core=" + core.getName());
+  }
+
+  @VisibleForTesting
+  protected CorePusher getCorePusher() {
+    return corePusher;
+  }
+
+  @Override
+  public void close() {
+    if (!IndexingBatchState.NOT_STARTED.equals(state)) {
+      try {
+        SharedCoreConcurrencyController concurrencyController =
+            core.getCoreContainer().getSharedStoreManager().getSharedCoreConcurrencyController();
+        concurrencyController.recordState(collectionName, shardName, core.getName(),
+            SharedCoreConcurrencyController.SharedCoreStage.INDEXING_BATCH_FINISHED);
+      } catch (Exception ex) {
+        SolrException.log(log, "Error recording the finish of a SHARED core indexing batch", ex);
+      }
+    }
+    if (corePullLock != null) {
+      try {
+        // release read lock
+        corePullLock.readLock().unlock();
+      } catch (Exception ex) {
+        SolrException.log(log, "Error releasing pull read lock of a SHARED core", ex);
+      }
+    }
+  }
+
+  private enum IndexingBatchState {
+    NOT_STARTED,
+    STARTED,
+    COMMITTED
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index 070d598..6977103 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -26,12 +26,10 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.solr.client.solrj.request.UpdateRequest;
@@ -61,13 +59,7 @@ import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
-import org.apache.solr.store.blob.process.CoreUpdateTracker;
-import org.apache.solr.store.blob.util.BlobStoreUtils;
-import org.apache.solr.store.shared.SharedCoreConcurrencyController;
-import org.apache.solr.store.shared.SharedCoreConcurrencyController.SharedCoreStage;
-import org.apache.solr.store.shared.SharedCoreConcurrencyController.SharedCoreVersionMetadata;
-import org.apache.solr.store.shared.metadata.SharedShardMetadataController;
-import org.apache.solr.store.shared.metadata.SharedShardMetadataController.SharedShardVersionMetadata;
+import org.apache.solr.store.shared.SharedCoreIndexingBatchProcessor;
 import org.apache.solr.update.AddUpdateCommand;
 import org.apache.solr.update.CommitUpdateCommand;
 import org.apache.solr.update.DeleteUpdateCommand;
@@ -92,8 +84,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
   private Set<String> skippedCoreNodeNames;
   private final String collection;
   private boolean readOnlyCollection = false;
-  private CoreUpdateTracker sharedCoreTracker;
-  private ReentrantReadWriteLock corePullLock;
 
   // The cached immutable clusterState for the update... usually refreshed for each individual update.
   // Different parts of this class used to request current clusterState views, which lead to subtle bugs and race conditions
@@ -114,16 +104,34 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
   private RollupRequestReplicationTracker rollupReplicationTracker;
   private LeaderRequestReplicationTracker leaderReplicationTracker;
 
+  /**
+   * TODO: This class is not an ideal place to run start and end of an indexing batch logic. We are not sure if an other
+   *       processor in the chain before this would also need to pull from shared store. Therefore we might want to
+   *       find a better place earlier in the call stack. The challenge would be to do it efficiently i.e. we might not 
+   *       know before this processor if a batch contains any documents that are meant for this replica. But sacrificing 
+   *       that efficiency for a more correct place might be fair trade off. 
+   * For {@link Replica.Type#SHARED} replica, it is necessary that we pull from the shared store at the start of
+   * an indexing batch (if the core is stale). And we push to the shared store at the end of a successfully committed
+   * indexing batch (we ensure that each batch has a hard commit). Details can be found in 
+   * {@link org.apache.solr.store.shared.SharedCoreConcurrencyController}.
+   * In other words, we would like to call {@link SharedCoreIndexingBatchProcessor#startIndexingBatch()} at the start of
+   * an indexing batch and {@link SharedCoreIndexingBatchProcessor#finishIndexingBatch()} at the end of a successfully
+   * committed indexing batch.
+   * For that, we rely on first {@link #processAdd(AddUpdateCommand)} or {@link #processDelete(DeleteUpdateCommand)}
+   * that is going to index the doc locally to identify the start of an indexing batch and 
+   * end of {@link #processCommit(CommitUpdateCommand)} to identify the end of a successfully committed indexing batch.
+   * Most of the logic is contained inside {@link SharedCoreIndexingBatchProcessor}. The only expectation from this class
+   * is to call {@link SharedCoreIndexingBatchProcessor#addOrDeleteGoingToBeIndexedLocally()} and 
+   * {@link SharedCoreIndexingBatchProcessor#hardCommitCompletedLocally()} appropriately.
+   * This logic is built upon the assumption/understanding that all the documents of an indexing batch are processed by 
+   * single instance of this class and that instance is consumed by a single thread. 
+   */
+  private SharedCoreIndexingBatchProcessor sharedCoreIndexingBatchProcessor;
+  
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   public DistributedZkUpdateProcessor(SolrQueryRequest req,
                                       SolrQueryResponse rsp, UpdateRequestProcessor next) {
-    this(req,rsp,next, new CoreUpdateTracker(req.getCore().getCoreContainer()));
-  }
-
-  @VisibleForTesting
-  protected DistributedZkUpdateProcessor(SolrQueryRequest req,
-      SolrQueryResponse rsp, UpdateRequestProcessor next, CoreUpdateTracker sharedCoreTracker) {
     super(req, rsp, next);
     CoreContainer cc = req.getCore().getCoreContainer();
     cloudDesc = req.getCore().getCoreDescriptor().getCloudDescriptor();
@@ -138,12 +146,11 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
       readOnlyCollection = coll.isReadOnly();
       if (coll.getSharedIndex() && replicaType != Replica.Type.SHARED) {
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-            String.format(Locale.ROOT, "Collections backed by shared store can only have replicas of type SHARED, " +
-                    "collection=%s, shard=%s core=%s replicaType=%s", 
-                collection, cloudDesc.getShardId(), req.getCore().getName(), replicaType.name()));
+            "Collections backed by shared store can only have replicas of type SHARED," +
+                " collection=" + collection + ", shard=" + cloudDesc.getShardId() +
+                " core=" + req.getCore().getName() + " replicaType=" + replicaType.name());
       }
     }
-    this.sharedCoreTracker = sharedCoreTracker;
   }
 
   private boolean isReadOnly() {
@@ -184,6 +191,35 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
 
     updateCommand = cmd;
 
+    // 1. SHARED replica has a hard requirement of processing each indexing batch with a hard commit(either explicit or
+    // implicit, HttpSolrCall#addCommitIfAbsent) because that is how, at the end of an indexing batch, synchronous push
+    // to shared store gets hold of the segment files on local disk. SHARED replica also does not support the notion of soft commit.
+    // Therefore unlike NRT replica type we do not need to broadcast commit to the leaders of all the shards of a collection.
+    // 
+    // This means we won't support a client explicitly sending commit=true to a replica of a shard and having it route to
+    // the leader for SHARED replicas or clients sending commit=true for the purpose of refreshing all of the searchers 
+    // in their collection. Former is not needed because isolated commit is a no-op for SHARED replica and later is not supported 
+    // because SHARED replica has a different plan around opening of searchers https://issues.apache.org/jira/browse/SOLR-14339
+    //
+    // 2. <code>isLeader</code> is computed fresh each time an AddUpdateCommand/DeleteUpdateCommand belonging to the indexing
+    // batch is processed. And finally it is recomputed in this method. It is possible that at the beginning of a batch
+    // this replica was a leader and did process some AddUpdateCommand/DeleteUpdateCommand. But before reaching this 
+    // method lost the leadership. In that case we will still like to process the commit otherwise the indexing batch can
+    // succeed without pushing the changes to the shared store (data loss). Therefore, we are not restricting the 
+    // following logic to leaders only. SharedCoreIndexingBatchProcessor already handles the case where it will
+    // skip the push to the shared store if there was no AddUpdateCommand/DeleteUpdateCommand processed by it.
+    //
+    // Conclusion: For SHARED replica we only need to handle a hard commit locally and then push changed core to 
+    //             the shared store.
+    if (Replica.Type.SHARED.equals(replicaType)) {
+      if (!cmd.softCommit) {
+        // TODO: when implementing SOLR-14339 (SHARED replica's freshness) consider how can we set cmd.openSearcher=false 
+        doLocalCommit(cmd);
+        getSharedCoreIndexingBatchProcessor().hardCommitCompletedLocally();
+      }
+      return;
+    }
+
     List<SolrCmdDistributor.Node> nodes = null;
     Replica leaderReplica = null;
     zkCheck();
@@ -195,12 +231,11 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
     }
     isLeader = leaderReplica.getName().equals(cloudDesc.getCoreNodeName());
 
-    nodes = getCollectionUrls(collection, EnumSet.of(Replica.Type.TLOG,Replica.Type.NRT, Replica.Type.SHARED), true);
+    nodes = getCollectionUrls(collection, EnumSet.of(Replica.Type.TLOG,Replica.Type.NRT), true);
     if (nodes == null) {
       // This could happen if there are only pull replicas
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-          "Unable to distribute commit operation. No replicas available of types " + Replica.Type.TLOG + ", " + Replica.Type.NRT
-            + " or " + Replica.Type.SHARED);
+          "Unable to distribute commit operation. No replicas available of types " + Replica.Type.TLOG + " or " + Replica.Type.NRT);
     }
 
     nodes.removeIf((node) -> node.getNodeProps().getNodeName().equals(zkController.getNodeName())
@@ -211,13 +246,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
         log.warn("Commit not supported on replicas of type " + Replica.Type.PULL);
       } else if (replicaType == Replica.Type.NRT) {
         doLocalCommit(cmd);
-      } else if (replicaType == Replica.Type.SHARED) {
-        // If a replica is Replica.Type.SHARED then all are for the shard. These do not forward, data flows though shared storage.
-        // We really want to know if this happens, as it most likely means something went wrong elsewhere.
-        String message = "Unexpected indexing forwarding from leader to replicas for type " + Replica.Type.SHARED
-            + " collection " + collection + " leader " + leaderReplica.getCoreUrl() + " on " + leaderReplica.getNodeName();
-        log.error(message); // Remove if exception below ends up being logged.
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, message);
       }
     } else {
       // zk
@@ -275,9 +303,11 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
     // check if client has requested minimum replication factor information. will set replicationTracker to null if
     // we aren't the leader or subShardLeader
     checkReplicationTracker(cmd);
-    // Update the local cores if needed.
-    if (replicaType.equals(Replica.Type.SHARED)) {
-      readFromSharedStoreIfNecessary();
+
+    // this should be called after setupRequest(UpdateCommand) and before the doc is indexed locally
+    // because it needs to know whether current replica is leader or subShardLeader for the doc or not
+    if (isSharedCoreAddOrDeleteGoingToBeIndexedLocally()) {
+      getSharedCoreIndexingBatchProcessor().addOrDeleteGoingToBeIndexedLocally();
     }
 
     super.processAdd(cmd);
@@ -347,11 +377,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
     if (isReadOnly()) {
       throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
     }
-    // Update the local cores if needed.
-    if (replicaType.equals(Replica.Type.SHARED)) {
-      readFromSharedStoreIfNecessary();
-    }
-
     super.processDelete(cmd);
   }
 
@@ -363,6 +388,12 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
     // we aren't the leader or subShardLeader
     checkReplicationTracker(cmd);
 
+    // this should be called after setupRequest(UpdateCommand) and before the doc is indexed locally
+    // because it needs to know whether current replica is leader or subShardLeader for the doc or not
+    if (isSharedCoreAddOrDeleteGoingToBeIndexedLocally()) {
+      getSharedCoreIndexingBatchProcessor().addOrDeleteGoingToBeIndexedLocally();
+    }
+
     super.doDeleteById(cmd);
   }
 
@@ -497,6 +528,13 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
     // check if client has requested minimum replication factor information. will set replicationTracker to null if
     // we aren't the leader or subShardLeader
     checkReplicationTracker(cmd);
+
+    // this should be called after setupRequest(UpdateCommand) and before the doc is indexed locally
+    // because it needs to know whether current replica is leader or subShardLeader for the doc or not
+    if (isSharedCoreAddOrDeleteGoingToBeIndexedLocally()) {
+      getSharedCoreIndexingBatchProcessor().addOrDeleteGoingToBeIndexedLocally();
+    }
+
     super.doDeleteByQuery(cmd, replicas, coll);
   }
 
@@ -1065,17 +1103,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
 
   @Override
   protected void doClose() {
-    if (corePullLock != null) {
-      try {
-        SharedCoreConcurrencyController concurrencyController =
-            req.getCore().getCoreContainer().getSharedStoreManager().getSharedCoreConcurrencyController();
-        concurrencyController.recordState(cloudDesc.getCollectionName(), cloudDesc.getShardId(), req.getCore().getName(),
-            SharedCoreStage.INDEXING_BATCH_FINISHED);
-      } catch (Exception ex) {
-        SolrException.log(log, "Error recording the finish of SHARED core indexing batch", ex);
-      }
-      // release read lock
-      corePullLock.readLock().unlock();
+    if (sharedCoreIndexingBatchProcessor != null) {
+      sharedCoreIndexingBatchProcessor.close();
     }
     if (cmdDistrib != null) {
       cmdDistrib.close();
@@ -1103,129 +1132,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
     super.processRollback(cmd);
   }
 
-
-  // TODO: Refactor most of shared replica logic out into a separate class.
-  private void writeToSharedStore() {
-    String collectionName = cloudDesc.getCollectionName();
-    String shardName = cloudDesc.getShardId();
-    String coreName = req.getCore().getName();
-    SharedCoreConcurrencyController concurrencyController = req.getCore().getCoreContainer().getSharedStoreManager().getSharedCoreConcurrencyController();
-    concurrencyController.recordState(collectionName, shardName, coreName, SharedCoreStage.LOCAL_INDEXING_FINISHED);
-
-    log.info("Attempting to initiate index update write to shared store for collection=" + collectionName +
-        " and shard=" + shardName + " using core=" + coreName);
-
-    sharedCoreTracker.persistShardIndexToSharedStore(zkController.zkStateReader.getClusterState(),
-        collectionName,
-        shardName,
-        coreName);
-  }
-
-  private void readFromSharedStoreIfNecessary() {
-    String collectionName = cloudDesc.getCollectionName();
-    String shardName = cloudDesc.getShardId();
-    String coreName = req.getCore().getName();
-    assert Replica.Type.SHARED.equals(replicaType);
-    // Peers and subShardLeaders should only forward the update request to leader replica,
-    // hence not need to sync with the blob store at this point.
-    if (!isLeader || isSubShardLeader) {
-      return;
-    }
-
-    // this lock acquire/release logic is built on the assumption that one particular instance of this processor
-    // will solely be consumed by a single thread. And all the documents of indexing batch will be processed by this one instance. 
-    // Following pull logic should only run once before the first document of indexing batch(add/delete) is processed by this processor
-    if (corePullLock != null) {
-      // we already have a lock i.e. we have already read from the shared store (if needed)
-      return;
-    }
-
-    CoreContainer coreContainer = req.getCore().getCoreContainer();
-    SharedCoreConcurrencyController concurrencyController = coreContainer.getSharedStoreManager().getSharedCoreConcurrencyController();
-    concurrencyController.recordState(collectionName, shardName, coreName, SharedCoreStage.INDEXING_BATCH_RECEIVED);
-    corePullLock = concurrencyController.getCorePullLock(collectionName, shardName, coreName);
-    // from this point on wards we should always exit this method with read lock (no matter failure or what)
-    try {
-      SharedCoreVersionMetadata coreVersionMetadata = concurrencyController.getCoreVersionMetadata(collectionName, shardName, coreName);
-      /**
-       * we only need to sync if there is no soft guarantee of being in sync.
-       * if there is one we will rely on that, and if we turned out to be wrong indexing will fail at push time
-       * and will remove this guarantee in {@link CorePusher#pushCoreToBlob(PushPullData)}
-       */
-      if (!coreVersionMetadata.isSoftGuaranteeOfEquality()) {
-        SharedShardMetadataController metadataController = coreContainer.getSharedStoreManager().getSharedShardMetadataController();
-        SharedShardVersionMetadata shardVersionMetadata = metadataController.readMetadataValue(collectionName, shardName);
-        if (!concurrencyController.areVersionsEqual(coreVersionMetadata, shardVersionMetadata)) {
-          acquireWriteLockAndPull(collectionName, shardName, coreName, coreContainer);
-        }
-      }
-    } finally {
-      // acquire lock for the whole duration of update
-      // we should always leave with read lock acquired(failure or success), since it is the job of close method to release it
-      corePullLock.readLock().lock();
-      concurrencyController.recordState(collectionName, shardName, coreName, SharedCoreStage.LOCAL_INDEXING_STARTED);
-    }
-  }
-
-  private void acquireWriteLockAndPull(String collectionName, String shardName, String coreName, CoreContainer coreContainer) {
-    // There is a likelihood that many indexing requests came at once and realized we are out of sync.
-    // They all would try to acquire write lock. One of them makes progress to pull from shared store.
-    // After that regular indexing will see soft guarantee of equality and moves straight to indexing
-    // under read lock. Now it is possible that new indexing keeps coming in and read lock is never free.
-    // In that case the poor guys that came in earlier and wanted to pull will still be struggling(starving) to
-    // acquire write lock. Since we know that write lock is only needed by one to do the work, we will
-    // try time boxed acquisition and in case of failed acquisition we will see if some one else has already completed the pull.
-    // We will make few attempts before we bail out. Ideally bail out scenario should never happen.
-    // If it does then either we are too slow in pulling and can tune following parameters or something else is wrong.
-    int attempt = 1;
-    while (true) {
-      SharedCoreConcurrencyController concurrencyController = coreContainer.getSharedStoreManager().getSharedCoreConcurrencyController();
-      try {
-        // try acquiring write lock
-        if (corePullLock.writeLock().tryLock(SharedCoreConcurrencyController.SECONDS_TO_WAIT_INDEXING_PULL_WRITE_LOCK, TimeUnit.SECONDS)) {
-          try {
-            // while acquiring write lock things might have updated, should reestablish if pull is still needed
-            SharedCoreVersionMetadata coreVersionMetadata = concurrencyController.getCoreVersionMetadata(collectionName, shardName, coreName);
-            if (!coreVersionMetadata.isSoftGuaranteeOfEquality()) {
-              SharedShardMetadataController metadataController = coreContainer.getSharedStoreManager().getSharedShardMetadataController();
-              SharedShardVersionMetadata shardVersionMetadata = metadataController.readMetadataValue(collectionName, shardName);
-              if (!concurrencyController.areVersionsEqual(coreVersionMetadata, shardVersionMetadata)) {
-                BlobStoreUtils.syncLocalCoreWithSharedStore(collectionName, coreName, shardName, coreContainer, shardVersionMetadata, /* isLeaderSyncing */true);
-              }
-            }
-          } finally {
-            corePullLock.writeLock().unlock();
-          }
-          // write lock acquisition was successful and we are in sync with shared store
-          break;
-        } else {
-          // we could not acquire write lock but see if some other thread has already done the pulling
-          SharedCoreVersionMetadata coreVersionMetadata = concurrencyController.getCoreVersionMetadata(collectionName, shardName, coreName);
-          if (coreVersionMetadata.isSoftGuaranteeOfEquality()) {
-            log.info(String.format(Locale.ROOT, "Indexing thread waited to acquire to write lock and could not. " +
-                    "But someone else has done the pulling so we are good. attempt=%s collection=%s shard=%s core=%s",
-                attempt, collectionName, shardName, coreName));
-            break;
-          }
-          // no one else has pulled yet either, lets make another attempt ourselves
-          attempt++;
-          if (attempt > SharedCoreConcurrencyController.MAX_ATTEMPTS_INDEXING_PULL_WRITE_LOCK) {
-            throw new SolrException(ErrorCode.SERVER_ERROR, String.format(Locale.ROOT, 
-                    "Indexing thread failed to acquire write lock for pull in %s seconds. " +
-                    "And no one else either has done the pull during that time. collection=%s shard=%s core=%s",
-                Integer.toString(SharedCoreConcurrencyController.SECONDS_TO_WAIT_INDEXING_PULL_WRITE_LOCK * SharedCoreConcurrencyController.MAX_ATTEMPTS_INDEXING_PULL_WRITE_LOCK),
-                collectionName, shardName, coreName));
-          }
-        }
-      } catch (InterruptedException ie) {
-        Thread.currentThread().interrupt();
-        throw new SolrException(ErrorCode.SERVER_ERROR, String.format(Locale.ROOT,
-            "Indexing thread interrupted while trying to acquire pull write lock." +
-            " collection=%s shard=%s core=%s", collectionName, shardName, coreName), ie);
-      }
-    }
-  }
-
   // TODO: optionally fail if n replicas are not reached...
   protected void doDistribFinish() {
     clusterState = zkController.getClusterState();
@@ -1238,50 +1144,10 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
       }
       zkController.getShardTerms(collection, cloudDesc.getShardId()).ensureHighestTermsAreNotZero();
     }
-
-    /**
-     *  Track the updated core for push to Blob store.
-     *
-     *  Only, the leader node pushes the updates to blob store but the leader can be change mid update,
-     *  so we don't stop peers from pushing updates to the blob store.
-     *
-     *  We also need to check for isLeader here because a peer can also receive commit message if the request was directly send to the peer.
-     */
-    if (updateCommand != null &&
-        updateCommand.getClass() == CommitUpdateCommand.class &&
-        isLeader && replicaType.equals(Replica.Type.SHARED)
-        && !((CommitUpdateCommand) updateCommand).softCommit) {
-      if (corePullLock == null) {
-        // Since corePullLock is null, this update request has no document to add or delete i.e. it is an isolated commit.
-        // Few ways isolated commit can manifest:
-        // 1. Client does indexing for while before issuing a separate commit.
-        // 2. SolrJ client issuing a separate follow up commit command to affected shards than actual indexing request
-        //    even when SolrJ client's caller issued a single update command with commit=true.
-        // Shared replica has a hard requirement of processing each indexing batch with a hard commit(either explicit or
-        // implicit) because that is how, at the end of an indexing batch, synchronous push to shared store gets hold
-        // of the segment files on local disk.
-        // Therefore, isolated commits are meaningless for SHARED replicas and we can ignore writing to shared store. 
-        // If we ever need an isolated commit to write to shared store for some scenario, we should first understand if a 
-        // pull from shared store was done or not(why not) and push should happen under corePullLock.readLock()
-        log.info("Isolated commit encountered for a SHARED replica, ignoring writing to shared store. collection="
-            + cloudDesc.getCollectionName() + " shard=" + cloudDesc.getShardId() + " core=" + req.getCore().getName());
-      } else {
-        /*
-         * TODO SPLITSHARD triggers soft commits.
-         * We don't persist on softCommit because there is nothing to so we should ignore those kinds of commits.
-         * Configuring behavior based on soft/hard commit seems like we're getting into an abstraction deeper then
-         * what the DUP is concerned about so we may want to consider moving this code somewhere more appropriate
-         * in the future (deeper in the stack)
-         */
-        writeToSharedStore();
-      }
-    }
-
     // TODO: if not a forward and replication req is not specified, we could
     // send in a background thread
 
     cmdDistrib.finish();
-
     List<SolrCmdDistributor.Error> errors = cmdDistrib.getErrors();
     // TODO - we may need to tell about more than one error...
 
@@ -1465,4 +1331,30 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
 
     throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Cannot talk to ZooKeeper - Updates are disabled.");
   }
+
+  private boolean isSharedCoreAddOrDeleteGoingToBeIndexedLocally() {
+    // forwardToLeader: if true, then the update is going to be forwarded to its rightful leader.
+    //                  The doc being added or deleted might not even belong to the current core's (req.getCore()) shard.
+    // isLeader: if true, then the current core (req.getCore()) is the leader of the shard to which the doc being added or deleted belongs to.
+    //           For SHARED replicas only leader replicas do local indexing. Follower SHARED replicas do not do any local 
+    //           indexing and their only job is to forward the add/delete updates to the leader replica.
+    // isSubShardLeader: if true, then the current core (req.getCore()) is the leader of a sub shard being built.
+    //                   Sub shard leaders only buffer the updates locally and apply them towards the end of a successful
+    //                   split before the sub shard is declared active. It is at that point the sub shard is pushed to
+    //                   the shared store (org.apache.solr.handler.admin.RequestApplyUpdatesOp).
+    // Therefore, only the leader replicas of the SHARED active shards will process docs locally and thus need to pull/push
+    // from the shared store during indexing.
+    return Replica.Type.SHARED.equals(replicaType) && !forwardToLeader && isLeader && !isSubShardLeader;
+  }
+
+  @VisibleForTesting
+  public SharedCoreIndexingBatchProcessor getSharedCoreIndexingBatchProcessor() {
+    assert Replica.Type.SHARED.equals(replicaType);
+    
+    if(sharedCoreIndexingBatchProcessor == null) {
+      sharedCoreIndexingBatchProcessor = new SharedCoreIndexingBatchProcessor(req.getCore(), clusterState);
+    }
+
+    return sharedCoreIndexingBatchProcessor;
+  }
 }
diff --git a/solr/core/src/test-files/solr/configsets/shared-distrib-indexing/conf/schema.xml b/solr/core/src/test-files/solr/configsets/shared-distrib-indexing/conf/schema.xml
new file mode 100644
index 0000000..4124fea
--- /dev/null
+++ b/solr/core/src/test-files/solr/configsets/shared-distrib-indexing/conf/schema.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<schema name="minimal" version="1.1">
+  <fieldType name="string" class="solr.StrField"/>
+  <fieldType name="int" class="${solr.tests.IntegerFieldType}" docValues="${solr.tests.numeric.dv}" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
+  <fieldType name="long" class="${solr.tests.LongFieldType}" docValues="${solr.tests.numeric.dv}" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
+  <dynamicField name="*" type="string" indexed="true" stored="true"/>
+  <!-- for versioning -->
+  <field name="_version_" type="long" indexed="true" stored="true"/>
+  <field name="_root_" type="string" indexed="true" stored="true" multiValued="false" required="false"/>
+  <field name="id" type="string" indexed="true" stored="true"/>
+  <dynamicField name="*_s"  type="string"  indexed="true"  stored="true" />
+  <uniqueKey>id</uniqueKey>
+</schema>
diff --git a/solr/core/src/test-files/solr/configsets/shared-distrib-indexing/conf/solrconfig.xml b/solr/core/src/test-files/solr/configsets/shared-distrib-indexing/conf/solrconfig.xml
new file mode 100644
index 0000000..2238fe8
--- /dev/null
+++ b/solr/core/src/test-files/solr/configsets/shared-distrib-indexing/conf/solrconfig.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- Minimal solrconfig.xml with /select, /admin and /update only -->
+
+<config>
+
+  <dataDir>${solr.data.dir:}</dataDir>
+
+  <directoryFactory name="DirectoryFactory"
+                    class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
+  <schemaFactory class="ClassicIndexSchemaFactory"/>
+
+  <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+  <updateRequestProcessorChain name="test-update-factory" default="true">
+    <processor class="solr.LogUpdateProcessorFactory" />
+    <processor class="org.apache.solr.store.shared.SharedStoreDistributedIndexingTest$TestDistributedUpdateProcessorFactory" />
+    <processor class="solr.RunUpdateProcessorFactory" />
+  </updateRequestProcessorChain>
+
+  <updateHandler class="solr.DirectUpdateHandler2">
+    <commitWithin>
+      <softCommit>${solr.commitwithin.softcommit:true}</softCommit>
+    </commitWithin>
+    <updateLog class="${solr.ulog:solr.UpdateLog}"></updateLog>
+  </updateHandler>
+
+  <requestHandler name="/select" class="solr.SearchHandler">
+    <lst name="defaults">
+      <str name="echoParams">explicit</str>
+      <str name="indent">true</str>
+      <str name="df">text</str>
+    </lst>
+
+  </requestHandler>
+  <indexConfig>
+    <mergeScheduler class="${solr.mscheduler:org.apache.lucene.index.ConcurrentMergeScheduler}"/>
+:  </indexConfig>
+</config>
+
diff --git a/solr/core/src/test/org/apache/solr/store/blob/util/BlobStoreUtilsTest.java b/solr/core/src/test/org/apache/solr/store/blob/process/CorePullerTest.java
similarity index 63%
copy from solr/core/src/test/org/apache/solr/store/blob/util/BlobStoreUtilsTest.java
copy to solr/core/src/test/org/apache/solr/store/blob/process/CorePullerTest.java
index 653ed07..e41e5de 100644
--- a/solr/core/src/test/org/apache/solr/store/blob/util/BlobStoreUtilsTest.java
+++ b/solr/core/src/test/org/apache/solr/store/blob/process/CorePullerTest.java
@@ -15,14 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.solr.store.blob.util;
-
-import java.io.FileInputStream;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Path;
-import java.util.Map;
-import java.util.Properties;
+package org.apache.solr.store.blob.process;
+
 import java.util.UUID;
 
 import org.apache.solr.client.solrj.SolrClient;
@@ -31,6 +25,7 @@ import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.ZkStateReader;
@@ -40,7 +35,6 @@ import org.apache.solr.core.SolrCore;
 import org.apache.solr.store.blob.client.CoreStorageClient;
 import org.apache.solr.store.shared.SolrCloudSharedStoreTestCase;
 import org.apache.solr.store.shared.metadata.SharedShardMetadataController;
-import org.apache.solr.store.shared.metadata.SharedShardMetadataController.SharedShardVersionMetadata;
 import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -51,33 +45,32 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 
 /**
- * Unit tests for {@link BlobStoreUtils}
+ * Tests for {@link CorePuller}
  */
-public class BlobStoreUtilsTest extends SolrCloudSharedStoreTestCase { 
-  
-  private static Path sharedStoreRootPath;
-  
+public class CorePullerTest extends SolrCloudSharedStoreTestCase {
+
   private String collectionName;
   private String shardName;
   private Replica newReplica;
   private CoreContainer cc;
-  
+  private SolrCore core;
+
   private static CoreStorageClient storageClient;
-  
+
   @BeforeClass
   public static void setupTestClass() throws Exception {
     assumeWorkingMockito();
-    sharedStoreRootPath = createTempDir("tempDir");
-    storageClient = setupLocalBlobStoreClient(sharedStoreRootPath, DEFAULT_BLOB_DIR_NAME);
-    
-    assumeWorkingMockito();
+    storageClient = setupLocalBlobStoreClient(blobDir, DEFAULT_BLOB_DIR_NAME);
   }
-  
+
   @After
   public void doAfter() throws Exception {
+    if (core != null) {
+      core.close();
+    }
     shutdownCluster();
   }
-  
+
   /**
    * testSyncLocalCoreWithSharedStore_syncSkipOnDefault checks that syncLocalCoreWithSharedStore 
    * will skip sync if metadataSuffix is set to default in the ZK.
@@ -86,26 +79,29 @@ public class BlobStoreUtilsTest extends SolrCloudSharedStoreTestCase {
   public void testSyncLocalCoreWithSharedStore_syncSkipOnDefault() throws Exception {
     setupCluster(1);
     setupTestSharedClientForNode(getBlobStorageProviderTestInstance(storageClient), cluster.getJettySolrRunner(0));
-    
+
     collectionName = "sharedCol" + UUID.randomUUID();
     shardName = "shard" + UUID.randomUUID();
     CloudSolrClient cloudClient = cluster.getSolrClient();
     setupSharedCollectionWithShardNames(collectionName, 1, 1, shardName);
-        
-    DocCollection collection = cloudClient.getZkStateReader().getClusterState().getCollection(collectionName);
+
+    ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
+    DocCollection collection = clusterState.getCollection(collectionName);
+    String sharedShareName = (String) collection.getSlicesMap().get(shardName).get(ZkStateReader.SHARED_SHARD_NAME);
     newReplica = collection.getReplicas().get(0);
     cc = getCoreContainer(newReplica.getNodeName());
-    
-    CoreStorageClient blobClientSpy = Mockito.spy(storageClient);    
+    core = cc.getCore(newReplica.getCoreName());
+
+    CoreStorageClient blobClientSpy = Mockito.spy(storageClient);
     try {
-      SharedShardVersionMetadata shardVersionMetadata = new SharedShardVersionMetadata(0, SharedShardMetadataController.METADATA_NODE_DEFAULT_VALUE);
-      BlobStoreUtils.syncLocalCoreWithSharedStore(collectionName, newReplica.getCoreName(), shardName, cc, shardVersionMetadata, true);
+      SharedShardMetadataController.SharedShardVersionMetadata shardVersionMetadata = new SharedShardMetadataController.SharedShardVersionMetadata(0, SharedShardMetadataController.METADATA_NODE_DEFAULT_VALUE);
+      new CorePuller().pullCoreFromSharedStore(core, sharedShareName, shardVersionMetadata, true);
       verify(blobClientSpy, never()).pullCoreMetadata(anyString(), anyString());
     } catch (Exception ex){
       fail("syncLocalCoreWithSharedStore failed with exception: " + ex.getMessage());
-    } 
+    }
   }
-  
+
   /**
    * testSyncLocalCoreWithSharedStore_missingBlob checks that syncLocalCoreWithSharedStore 
    * will throw exception if core.metadata file is missing from the sharedStore.
@@ -114,26 +110,29 @@ public class BlobStoreUtilsTest extends SolrCloudSharedStoreTestCase {
   public void testSyncLocalCoreWithSharedStore_missingBlob() throws Exception {
     setupCluster(1);
     setupTestSharedClientForNode(getBlobStorageProviderTestInstance(storageClient), cluster.getJettySolrRunner(0));
-    
+
     collectionName = "sharedCol" + UUID.randomUUID();
     shardName = "shard" + UUID.randomUUID();
     CloudSolrClient cloudClient = cluster.getSolrClient();
     setupSharedCollectionWithShardNames(collectionName, 1, 1, shardName);
-        
-    DocCollection collection = cloudClient.getZkStateReader().getClusterState().getCollection(collectionName);
+
+    ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
+    DocCollection collection = clusterState.getCollection(collectionName);
+    String sharedShareName = (String) collection.getSlicesMap().get(shardName).get(ZkStateReader.SHARED_SHARD_NAME);
     newReplica = collection.getReplicas().get(0);
     cc = getCoreContainer(newReplica.getNodeName());
-    
+    core = cc.getCore(newReplica.getCoreName());
+
     try {
-      SharedShardVersionMetadata shardVersionMetadata = new SharedShardVersionMetadata(0, UUID.randomUUID().toString());
-      BlobStoreUtils.syncLocalCoreWithSharedStore(collectionName, newReplica.getCoreName(), shardName, cc, shardVersionMetadata, true);
+      SharedShardMetadataController.SharedShardVersionMetadata shardVersionMetadata = new SharedShardMetadataController.SharedShardVersionMetadata(0, UUID.randomUUID().toString());
+      new CorePuller().pullCoreFromSharedStore(core, sharedShareName, shardVersionMetadata, true);
       fail("syncLocalCoreWithSharedStore should throw exception if shared store doesn't have the core.metadata file.");
     } catch (Exception ex){
       String expectedException = "cannot get core.metadata file from shared store";
       assertTrue(ex.getCause().getMessage().contains(expectedException));
-    } 
+    }
   }
-  
+
   /**
    * testSyncLocalCoreWithSharedStore_syncEquivalent checks that syncLocalCoreWithSharedStore 
    * doesn't throw an exception if shared store and local files, already are in sync.
@@ -142,17 +141,20 @@ public class BlobStoreUtilsTest extends SolrCloudSharedStoreTestCase {
   public void testSyncLocalCoreWithSharedStore_syncEquivalent() throws Exception {
     setupCluster(1);
     setupTestSharedClientForNode(getBlobStorageProviderTestInstance(storageClient), cluster.getJettySolrRunner(0));
-    
+
     CloudSolrClient cloudClient = cluster.getSolrClient();
-    
+
     collectionName = "sharedCol" + UUID.randomUUID();
     shardName = "shard" + UUID.randomUUID();
     setupSharedCollectionWithShardNames(collectionName, 1, 1, shardName);
-    
-    DocCollection collection = cloudClient.getZkStateReader().getClusterState().getCollection(collectionName);
+
+    ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
+    DocCollection collection = clusterState.getCollection(collectionName);
+    String sharedShareName = (String) collection.getSlicesMap().get(shardName).get(ZkStateReader.SHARED_SHARD_NAME);
     newReplica = collection.getReplicas().get(0);
     cc = getCoreContainer(newReplica.getNodeName());
-    
+    core = cc.getCore(newReplica.getCoreName()); 
+
     CoreStorageClient blobClientSpy = Mockito.spy(storageClient);
     // Add a document.
     SolrInputDocument doc = new SolrInputDocument();
@@ -163,15 +165,15 @@ public class BlobStoreUtilsTest extends SolrCloudSharedStoreTestCase {
     req.commit(cloudClient, collectionName);
     try {
       SharedShardMetadataController metadataController = cc.getSharedStoreManager().getSharedShardMetadataController();
-      SharedShardVersionMetadata shardVersionMetadata = metadataController.readMetadataValue(collectionName, shardName);
+      SharedShardMetadataController.SharedShardVersionMetadata shardVersionMetadata = metadataController.readMetadataValue(collectionName, shardName);
       // we push and already have the latest updates so we should not pull here
-      BlobStoreUtils.syncLocalCoreWithSharedStore(collectionName, newReplica.getCoreName(), shardName, cc, shardVersionMetadata, true);
+      new CorePuller().pullCoreFromSharedStore(core, sharedShareName, shardVersionMetadata, true);
       verify(blobClientSpy, never()).pullCoreMetadata(anyString(), anyString());
-    } catch (Exception ex) { 
+    } catch (Exception ex) {
       fail("syncLocalCoreWithSharedStore failed with exception: " + ex.getMessage());
     }
   }
-  
+
   /**
    * testSyncLocalCoreWithSharedStore_syncSuccess checks that syncLocalCoreWithSharedStore 
    * pulls index files from blob if missing locally and present in blob
@@ -179,18 +181,18 @@ public class BlobStoreUtilsTest extends SolrCloudSharedStoreTestCase {
   @Test
   public void testSyncLocalCoreWithSharedStore_syncSuccess() throws Exception {
     setupCluster(2);
-    
+
     // configure same client for each runner, this isn't a concurrency test so this is fine
     for (JettySolrRunner runner : cluster.getJettySolrRunners()) {
       setupTestSharedClientForNode(getBlobStorageProviderTestInstance(storageClient), runner);
     }
-    
+
     // set up two nodes with one shard and two replicas 
     collectionName = "sharedCol" + UUID.randomUUID();
     shardName = "shard" + UUID.randomUUID();
     CloudSolrClient cloudClient = cluster.getSolrClient();
     setupSharedCollectionWithShardNames(collectionName, 1, 2, shardName);
-    
+
     CoreStorageClient blobClientSpy = Mockito.spy(storageClient);
     // Add a document.
     SolrInputDocument doc = new SolrInputDocument();
@@ -199,9 +201,11 @@ public class BlobStoreUtilsTest extends SolrCloudSharedStoreTestCase {
     UpdateRequest req = new UpdateRequest();
     req.add(doc);
     req.commit(cluster.getSolrClient(), collectionName);
-    
-    // get the follower replica    
-    DocCollection collection = cloudClient.getZkStateReader().getClusterState().getCollection(collectionName);
+
+    // get the follower replica
+    ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
+    DocCollection collection = clusterState.getCollection(collectionName);
+    String sharedShareName = (String) collection.getSlicesMap().get(shardName).get(ZkStateReader.SHARED_SHARD_NAME);
     Replica leaderReplica = collection.getLeader(shardName);
     Replica follower = null;
     for (Replica replica : collection.getReplicas()) {
@@ -210,73 +214,34 @@ public class BlobStoreUtilsTest extends SolrCloudSharedStoreTestCase {
         break;
       }
     }
-    
+
     // verify this last update didn't happen on the follower, it should only have its default segment file
     cc = getCoreContainer(follower.getNodeName());
-    SolrCore core = cc.getCore(follower.getCoreName());    
+    core = cc.getCore(follower.getCoreName());
     assertEquals(1, core.getDeletionPolicy().getLatestCommit().getFileNames().size());
-    
+
     try {
       SharedShardMetadataController metadataController = cc.getSharedStoreManager().getSharedShardMetadataController();
-      SharedShardVersionMetadata shardVersionMetadata = metadataController.readMetadataValue(collectionName, shardName);
+      SharedShardMetadataController.SharedShardVersionMetadata shardVersionMetadata = metadataController.readMetadataValue(collectionName, shardName);
       // we pushed on the leader, try sync on the follower
-      BlobStoreUtils.syncLocalCoreWithSharedStore(collectionName, follower.getCoreName(), shardName, cc, shardVersionMetadata, true);
-      
+      new CorePuller().pullCoreFromSharedStore(core, sharedShareName, shardVersionMetadata, true);
+
       // did we pull?
       assertTrue(core.getDeletionPolicy().getLatestCommit().getFileNames().size() > 1);
-      
+
       // query just the replica we pulled on
-      try (SolrClient directClient = getHttpSolrClient(follower.getBaseUrl() + "/" + follower.getCoreName())) { 
+      try (SolrClient directClient = getHttpSolrClient(follower.getBaseUrl() + "/" + follower.getCoreName())) {
         ModifiableSolrParams params = new ModifiableSolrParams();
         params
-          .set("q", "*:*")
-          .set("distrib", "false");
+            .set("q", "*:*")
+            .set("distrib", "false");
         QueryResponse resp = directClient.query(params);
         assertEquals(1, resp.getResults().getNumFound());
         assertEquals("cat123", (String) resp.getResults().get(0).getFieldValue("cat"));
       }
-    } catch (Exception ex) { 
+    } catch (Exception ex) {
       fail("syncLocalCoreWithSharedStore failed with exception: " + ex.getMessage());
-    } finally {
-      core.close();
     }
   }
 
-  /**
-   * Tests that the core properties returned by {@link BlobStoreUtils#getSharedCoreProperties(ZkStateReader, DocCollection, Replica)}
-   * match the core properties of a core created in normal flow i.e. create collection, add shard or add replica. They all
-   * should essentially produce same set of properties. Here we are using create collection. 
-   * 
-   * These properties are used to create a missing core against a SHARED replica. 
-   */
-  @Test
-  public void testMissingSharedCoreProperties() throws Exception {
-    setupCluster(1);
-    collectionName = "sharedCol" + UUID.randomUUID();
-    shardName = "shard" + UUID.randomUUID();
-    setupSharedCollectionWithShardNames(collectionName, 1, 1, shardName);
-    CloudSolrClient cloudClient = cluster.getSolrClient();
-    DocCollection coll = cloudClient.getZkStateReader().getClusterState().getCollection(collectionName);
-    Replica rep = coll.getLeader(shardName);
-    cc = getCoreContainer(rep.getNodeName());
-
-    Path corePropertiesPath = cc.getCoreRootDirectory().resolve(rep.getCoreName()).resolve(CORE_PROPERTIES_FILENAME);
-    Properties expectedCoreProperties = new Properties();
-    try (InputStreamReader is = new InputStreamReader(new FileInputStream(corePropertiesPath.toFile()), StandardCharsets.UTF_8)) {
-      expectedCoreProperties.load(is);
-    }
-
-    Map<String, String> coreProperties = BlobStoreUtils.getSharedCoreProperties(cloudClient.getZkStateReader(), coll, rep);
-
-    // name is separately passed as core name, therefore, it is not part of the core properties
-    expectedCoreProperties.remove("name");
-    /** see comment inside {@link BlobStoreUtils#getSharedCoreProperties(ZkStateReader, DocCollection, Replica)}*/
-    expectedCoreProperties.remove("numShards");
-
-    assertEquals("wrong number of core properties", expectedCoreProperties.size(), coreProperties.size());
-    for (Object key : expectedCoreProperties.keySet()) {
-      assertTrue(key + " is missing", coreProperties.containsKey(key));
-      assertEquals(key + "'s value is wrong", expectedCoreProperties.get(key), coreProperties.get(key));
-    }
-  }
 }
diff --git a/solr/core/src/test/org/apache/solr/store/blob/util/BlobStoreUtilsTest.java b/solr/core/src/test/org/apache/solr/store/blob/util/BlobStoreUtilsTest.java
index 653ed07..5665a34 100644
--- a/solr/core/src/test/org/apache/solr/store/blob/util/BlobStoreUtilsTest.java
+++ b/solr/core/src/test/org/apache/solr/store/blob/util/BlobStoreUtilsTest.java
@@ -23,242 +23,43 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.util.Map;
 import java.util.Properties;
-import java.util.UUID;
 
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.client.solrj.response.QueryResponse;
-import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.core.CoreContainer;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.store.blob.client.CoreStorageClient;
 import org.apache.solr.store.shared.SolrCloudSharedStoreTestCase;
-import org.apache.solr.store.shared.metadata.SharedShardMetadataController;
-import org.apache.solr.store.shared.metadata.SharedShardMetadataController.SharedShardVersionMetadata;
 import org.junit.After;
-import org.junit.BeforeClass;
 import org.junit.Test;
-import org.mockito.Mockito;
-
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
 
 /**
  * Unit tests for {@link BlobStoreUtils}
  */
-public class BlobStoreUtilsTest extends SolrCloudSharedStoreTestCase { 
-  
-  private static Path sharedStoreRootPath;
-  
-  private String collectionName;
-  private String shardName;
-  private Replica newReplica;
-  private CoreContainer cc;
-  
-  private static CoreStorageClient storageClient;
-  
-  @BeforeClass
-  public static void setupTestClass() throws Exception {
-    assumeWorkingMockito();
-    sharedStoreRootPath = createTempDir("tempDir");
-    storageClient = setupLocalBlobStoreClient(sharedStoreRootPath, DEFAULT_BLOB_DIR_NAME);
-    
-    assumeWorkingMockito();
-  }
-  
+public class BlobStoreUtilsTest extends SolrCloudSharedStoreTestCase {
+
   @After
   public void doAfter() throws Exception {
     shutdownCluster();
   }
-  
-  /**
-   * testSyncLocalCoreWithSharedStore_syncSkipOnDefault checks that syncLocalCoreWithSharedStore 
-   * will skip sync if metadataSuffix is set to default in the ZK.
-   */
-  @Test
-  public void testSyncLocalCoreWithSharedStore_syncSkipOnDefault() throws Exception {
-    setupCluster(1);
-    setupTestSharedClientForNode(getBlobStorageProviderTestInstance(storageClient), cluster.getJettySolrRunner(0));
-    
-    collectionName = "sharedCol" + UUID.randomUUID();
-    shardName = "shard" + UUID.randomUUID();
-    CloudSolrClient cloudClient = cluster.getSolrClient();
-    setupSharedCollectionWithShardNames(collectionName, 1, 1, shardName);
-        
-    DocCollection collection = cloudClient.getZkStateReader().getClusterState().getCollection(collectionName);
-    newReplica = collection.getReplicas().get(0);
-    cc = getCoreContainer(newReplica.getNodeName());
-    
-    CoreStorageClient blobClientSpy = Mockito.spy(storageClient);    
-    try {
-      SharedShardVersionMetadata shardVersionMetadata = new SharedShardVersionMetadata(0, SharedShardMetadataController.METADATA_NODE_DEFAULT_VALUE);
-      BlobStoreUtils.syncLocalCoreWithSharedStore(collectionName, newReplica.getCoreName(), shardName, cc, shardVersionMetadata, true);
-      verify(blobClientSpy, never()).pullCoreMetadata(anyString(), anyString());
-    } catch (Exception ex){
-      fail("syncLocalCoreWithSharedStore failed with exception: " + ex.getMessage());
-    } 
-  }
-  
-  /**
-   * testSyncLocalCoreWithSharedStore_missingBlob checks that syncLocalCoreWithSharedStore 
-   * will throw exception if core.metadata file is missing from the sharedStore.
-   */
-  @Test
-  public void testSyncLocalCoreWithSharedStore_missingBlob() throws Exception {
-    setupCluster(1);
-    setupTestSharedClientForNode(getBlobStorageProviderTestInstance(storageClient), cluster.getJettySolrRunner(0));
-    
-    collectionName = "sharedCol" + UUID.randomUUID();
-    shardName = "shard" + UUID.randomUUID();
-    CloudSolrClient cloudClient = cluster.getSolrClient();
-    setupSharedCollectionWithShardNames(collectionName, 1, 1, shardName);
-        
-    DocCollection collection = cloudClient.getZkStateReader().getClusterState().getCollection(collectionName);
-    newReplica = collection.getReplicas().get(0);
-    cc = getCoreContainer(newReplica.getNodeName());
-    
-    try {
-      SharedShardVersionMetadata shardVersionMetadata = new SharedShardVersionMetadata(0, UUID.randomUUID().toString());
-      BlobStoreUtils.syncLocalCoreWithSharedStore(collectionName, newReplica.getCoreName(), shardName, cc, shardVersionMetadata, true);
-      fail("syncLocalCoreWithSharedStore should throw exception if shared store doesn't have the core.metadata file.");
-    } catch (Exception ex){
-      String expectedException = "cannot get core.metadata file from shared store";
-      assertTrue(ex.getCause().getMessage().contains(expectedException));
-    } 
-  }
-  
-  /**
-   * testSyncLocalCoreWithSharedStore_syncEquivalent checks that syncLocalCoreWithSharedStore 
-   * doesn't throw an exception if shared store and local files, already are in sync.
-   */
-  @Test
-  public void testSyncLocalCoreWithSharedStore_syncEquivalent() throws Exception {
-    setupCluster(1);
-    setupTestSharedClientForNode(getBlobStorageProviderTestInstance(storageClient), cluster.getJettySolrRunner(0));
-    
-    CloudSolrClient cloudClient = cluster.getSolrClient();
-    
-    collectionName = "sharedCol" + UUID.randomUUID();
-    shardName = "shard" + UUID.randomUUID();
-    setupSharedCollectionWithShardNames(collectionName, 1, 1, shardName);
-    
-    DocCollection collection = cloudClient.getZkStateReader().getClusterState().getCollection(collectionName);
-    newReplica = collection.getReplicas().get(0);
-    cc = getCoreContainer(newReplica.getNodeName());
-    
-    CoreStorageClient blobClientSpy = Mockito.spy(storageClient);
-    // Add a document.
-    SolrInputDocument doc = new SolrInputDocument();
-    doc.setField("id", "1");
-    doc.setField("cat", "cat123");
-    UpdateRequest req = new UpdateRequest();
-    req.add(doc);
-    req.commit(cloudClient, collectionName);
-    try {
-      SharedShardMetadataController metadataController = cc.getSharedStoreManager().getSharedShardMetadataController();
-      SharedShardVersionMetadata shardVersionMetadata = metadataController.readMetadataValue(collectionName, shardName);
-      // we push and already have the latest updates so we should not pull here
-      BlobStoreUtils.syncLocalCoreWithSharedStore(collectionName, newReplica.getCoreName(), shardName, cc, shardVersionMetadata, true);
-      verify(blobClientSpy, never()).pullCoreMetadata(anyString(), anyString());
-    } catch (Exception ex) { 
-      fail("syncLocalCoreWithSharedStore failed with exception: " + ex.getMessage());
-    }
-  }
-  
-  /**
-   * testSyncLocalCoreWithSharedStore_syncSuccess checks that syncLocalCoreWithSharedStore 
-   * pulls index files from blob if missing locally and present in blob
-   */
-  @Test
-  public void testSyncLocalCoreWithSharedStore_syncSuccess() throws Exception {
-    setupCluster(2);
-    
-    // configure same client for each runner, this isn't a concurrency test so this is fine
-    for (JettySolrRunner runner : cluster.getJettySolrRunners()) {
-      setupTestSharedClientForNode(getBlobStorageProviderTestInstance(storageClient), runner);
-    }
-    
-    // set up two nodes with one shard and two replicas 
-    collectionName = "sharedCol" + UUID.randomUUID();
-    shardName = "shard" + UUID.randomUUID();
-    CloudSolrClient cloudClient = cluster.getSolrClient();
-    setupSharedCollectionWithShardNames(collectionName, 1, 2, shardName);
-    
-    CoreStorageClient blobClientSpy = Mockito.spy(storageClient);
-    // Add a document.
-    SolrInputDocument doc = new SolrInputDocument();
-    doc.setField("id", "1");
-    doc.setField("cat", "cat123");
-    UpdateRequest req = new UpdateRequest();
-    req.add(doc);
-    req.commit(cluster.getSolrClient(), collectionName);
-    
-    // get the follower replica    
-    DocCollection collection = cloudClient.getZkStateReader().getClusterState().getCollection(collectionName);
-    Replica leaderReplica = collection.getLeader(shardName);
-    Replica follower = null;
-    for (Replica replica : collection.getReplicas()) {
-      if (!replica.getName().equals(leaderReplica.getName())) {
-        follower = replica;
-        break;
-      }
-    }
-    
-    // verify this last update didn't happen on the follower, it should only have its default segment file
-    cc = getCoreContainer(follower.getNodeName());
-    SolrCore core = cc.getCore(follower.getCoreName());    
-    assertEquals(1, core.getDeletionPolicy().getLatestCommit().getFileNames().size());
-    
-    try {
-      SharedShardMetadataController metadataController = cc.getSharedStoreManager().getSharedShardMetadataController();
-      SharedShardVersionMetadata shardVersionMetadata = metadataController.readMetadataValue(collectionName, shardName);
-      // we pushed on the leader, try sync on the follower
-      BlobStoreUtils.syncLocalCoreWithSharedStore(collectionName, follower.getCoreName(), shardName, cc, shardVersionMetadata, true);
-      
-      // did we pull?
-      assertTrue(core.getDeletionPolicy().getLatestCommit().getFileNames().size() > 1);
-      
-      // query just the replica we pulled on
-      try (SolrClient directClient = getHttpSolrClient(follower.getBaseUrl() + "/" + follower.getCoreName())) { 
-        ModifiableSolrParams params = new ModifiableSolrParams();
-        params
-          .set("q", "*:*")
-          .set("distrib", "false");
-        QueryResponse resp = directClient.query(params);
-        assertEquals(1, resp.getResults().getNumFound());
-        assertEquals("cat123", (String) resp.getResults().get(0).getFieldValue("cat"));
-      }
-    } catch (Exception ex) { 
-      fail("syncLocalCoreWithSharedStore failed with exception: " + ex.getMessage());
-    } finally {
-      core.close();
-    }
-  }
 
   /**
    * Tests that the core properties returned by {@link BlobStoreUtils#getSharedCoreProperties(ZkStateReader, DocCollection, Replica)}
    * match the core properties of a core created in normal flow i.e. create collection, add shard or add replica. They all
    * should essentially produce same set of properties. Here we are using create collection. 
-   * 
+   *
    * These properties are used to create a missing core against a SHARED replica. 
    */
   @Test
   public void testMissingSharedCoreProperties() throws Exception {
     setupCluster(1);
-    collectionName = "sharedCol" + UUID.randomUUID();
-    shardName = "shard" + UUID.randomUUID();
+    String collectionName = "sharedCollection";
+    String shardName = "shard1";
     setupSharedCollectionWithShardNames(collectionName, 1, 1, shardName);
     CloudSolrClient cloudClient = cluster.getSolrClient();
     DocCollection coll = cloudClient.getZkStateReader().getClusterState().getCollection(collectionName);
     Replica rep = coll.getLeader(shardName);
-    cc = getCoreContainer(rep.getNodeName());
+    CoreContainer cc = getCoreContainer(rep.getNodeName());
 
     Path corePropertiesPath = cc.getCoreRootDirectory().resolve(rep.getCoreName()).resolve(CORE_PROPERTIES_FILENAME);
     Properties expectedCoreProperties = new Properties();
diff --git a/solr/core/src/test/org/apache/solr/store/shared/SharedCoreConcurrencyTest.java b/solr/core/src/test/org/apache/solr/store/shared/SharedCoreConcurrencyTest.java
index d305120..f7f08e9 100644
--- a/solr/core/src/test/org/apache/solr/store/shared/SharedCoreConcurrencyTest.java
+++ b/solr/core/src/test/org/apache/solr/store/shared/SharedCoreConcurrencyTest.java
@@ -595,7 +595,7 @@ public class SharedCoreConcurrencyTest extends SolrCloudSharedStoreTestCase {
       public void recordState(String collectionName, String shardName, String coreName, SharedCoreStage stage) {
         super.recordState(collectionName, shardName, coreName, stage);
         ConcurrentLinkedQueue<String> coreConcurrencyStages = coreConcurrencyStagesMap.computeIfAbsent(coreName, k -> new ConcurrentLinkedQueue<>());
-        coreConcurrencyStages.add(Thread.currentThread().getId() + "." + stage.name());
+        coreConcurrencyStages.add(Thread.currentThread().getName() + "." + stage.name());
       }
     };
     setupTestSharedConcurrencyControllerForNode(concurrencyController, solrProcess);
diff --git a/solr/core/src/test/org/apache/solr/store/shared/SharedCoreIndexingBatchProcessorTest.java b/solr/core/src/test/org/apache/solr/store/shared/SharedCoreIndexingBatchProcessorTest.java
new file mode 100644
index 0000000..ee725a5
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/store/shared/SharedCoreIndexingBatchProcessorTest.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.store.shared;
+
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.store.blob.process.CorePuller;
+import org.apache.solr.store.blob.process.CorePusher;
+import org.apache.solr.store.shared.metadata.SharedShardMetadataController;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Unit tests for {@link SharedCoreIndexingBatchProcessor}
+ */
+public class SharedCoreIndexingBatchProcessorTest extends  SolrCloudSharedStoreTestCase {
+
+  private static final String COLLECTION_NAME = "sharedCollection";
+  private static final String SHARD_NAME = "shard1";
+
+  private SolrCore core;
+  private CorePuller corePuller;
+  private CorePusher corePusher;
+  private ReentrantReadWriteLock corePullLock;
+  private SharedCoreIndexingBatchProcessor processor;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    assumeWorkingMockito();
+    setupCluster(1);
+  }
+
+  @Before
+  public void setupTest() throws Exception {
+    assertEquals("wrong number of nodes", 1, cluster.getJettySolrRunners().size());
+    CoreContainer cc = cluster.getJettySolrRunner(0).getCoreContainer();
+
+    int maxShardsPerNode = 1;
+    int numReplicas = 1;
+    setupSharedCollectionWithShardNames(COLLECTION_NAME, maxShardsPerNode, numReplicas, SHARD_NAME);
+    DocCollection collection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
+
+    assertEquals("wrong number of replicas", 1, collection.getReplicas().size());
+    core = cc.getCore(collection.getReplicas().get(0).getCoreName());
+
+    assertNotNull("core is null", core);
+
+    corePuller = Mockito.spy(new CorePuller());
+    corePusher = Mockito.spy(new CorePusher());
+    processor = new SharedCoreIndexingBatchProcessor(core, core.getCoreContainer().getZkController().getClusterState()) {
+      @Override
+      protected CorePuller getCorePuller() {
+        return corePuller;
+      }
+
+      @Override
+      protected CorePusher getCorePusher() {
+        return corePusher;
+      }
+    };
+    processor = Mockito.spy(processor);
+    corePullLock = core.getCoreContainer().getSharedStoreManager().getSharedCoreConcurrencyController().getCorePullLock(
+        COLLECTION_NAME, SHARD_NAME, core.getName());
+  }
+
+  @After
+  public void teardownTest() throws Exception {
+    if (core != null) {
+      core.close();
+    }
+    if (processor != null) {
+      processor.close();
+      assertEquals("read lock count is wrong", 0, corePullLock.getReadLockCount());
+    }
+    if (cluster != null) {
+      cluster.deleteAllCollections();
+    }
+  }
+
+  /**
+   * Tests that first add/delete starts an indexing batch.
+   */
+  @Test
+  public void testAddOrDeleteStart() throws Exception {
+    verify(processor, never()).startIndexingBatch();
+    processAddOrDelete();
+    verify(processor).startIndexingBatch();
+  }
+
+  /**
+   * Tests that two adds/deletes only start an indexing batch once.
+   */
+  @Test
+  public void testTwoAddOrDeleteOnlyStartOnce() throws Exception {
+    verify(processor, never()).startIndexingBatch();
+    processAddOrDelete();
+    verify(processor).startIndexingBatch();
+    processAddOrDelete();
+    verify(processor).startIndexingBatch();
+  }
+
+  /**
+   * Tests that commit does finish an indexing batch.
+   */
+  @Test
+  public void testCommitDoesFinish() throws Exception {
+    verify(processor, never()).finishIndexingBatch();
+    processCommit();
+    verify(processor).finishIndexingBatch();
+  }
+
+  /**
+   * Tests that a stale core is pulled at the start of an indexing batch.
+   */
+  @Test
+  public void testStaleCoreIsPulledAtStart() throws Exception {
+    verify(processor, never()).startIndexingBatch();
+    verify(corePuller, never()).pullCoreFromSharedStore(any(), any(), any(), anyBoolean());
+    processAddOrDelete();
+    verify(processor).startIndexingBatch();
+    verify(corePuller).pullCoreFromSharedStore(any(), any(), any(), anyBoolean());
+  }
+
+  /**
+   * Tests that an up-to-date core is not pulled at the start of an indexing batch.
+   */
+  @Test
+  public void testUpToDateCoreIsNotPulledAtStart() throws Exception {
+    SharedShardMetadataController.SharedShardVersionMetadata shardVersionMetadata = core.getCoreContainer()
+        .getSharedStoreManager().getSharedShardMetadataController().readMetadataValue(COLLECTION_NAME, SHARD_NAME);
+    ClusterState clusterState = core.getCoreContainer().getZkController().getClusterState();
+    DocCollection collection = clusterState.getCollection(COLLECTION_NAME);
+    String sharedShardName = (String) collection.getSlicesMap().get(SHARD_NAME).get(ZkStateReader.SHARED_SHARD_NAME);
+    corePuller.pullCoreFromSharedStore(core, sharedShardName, shardVersionMetadata, true);
+    verify(corePuller).pullCoreFromSharedStore(any(), any(), any(), anyBoolean());
+    verify(processor, never()).startIndexingBatch();
+    processAddOrDelete();
+    verify(processor).startIndexingBatch();
+    verify(corePuller).pullCoreFromSharedStore(any(), any(), any(), anyBoolean());
+  }
+
+  /**
+   * Tests that a read lock is acquired even when the start encounters an error.
+   */
+  @Test
+  public void testReadLockIsAcquiredEvenStartEncountersError() throws Exception {
+    doThrow(new SolrException(SolrException.ErrorCode.SERVER_ERROR, "pull failed"))
+        .when(corePuller).pullCoreFromSharedStore(any(), any(), any(), anyBoolean());
+    verify(processor, never()).startIndexingBatch();
+    verify(corePuller, never()).pullCoreFromSharedStore(any(), any(), any(), anyBoolean());
+    assertEquals("wrong pull read lock count", 0, corePullLock.getReadLockCount());
+    boolean errorOccurred = false;
+    try {
+      processor.startIndexingBatch();
+      fail("No exception thrown");
+    } catch (Exception ex) {
+      assertTrue("Wrong exception thrown", ex.getMessage().contains("pull failed"));
+    }
+    assertEquals("wrong pull read lock count", 1, corePullLock.getReadLockCount());
+    verify(processor).startIndexingBatch();
+    verify(corePuller).pullCoreFromSharedStore(any(), any(), any(), anyBoolean());
+  }
+
+  /**
+   * Tests that an indexing batch with some work does push to the shared store.
+   */
+  @Test
+  public void testCommitAfterAddOrDeleteDoesPush() throws Exception {
+    processAddOrDelete();
+    processCommit();
+    verify(corePusher).pushCoreToSharedStore(any(), any());
+  }
+
+  /**
+   * Tests that an indexing batch with no work does not push to the shared store.
+   */
+  @Test
+  public void testIsolatedCommitDoesNotPush() throws Exception {
+    processCommit();
+    verify(corePusher, never()).pushCoreToSharedStore(any(), any());
+  }
+
+  /**
+   * Tests that an already committed indexing batch throws if a doc is added/deleted again.
+   */
+  @Test
+  public void testAddOrDeleteAfterCommitThrows() throws Exception {
+    processAddOrDelete();
+    commitAndThenAddOrDeleteDoc();
+  }
+
+  /**
+   * Tests that an already isolated committed indexing batch throws if a doc is added/deleted again.
+   */
+  @Test
+  public void testAddOrDeleteAfterIsolatedCommitThrows() throws Exception {
+    commitAndThenAddOrDeleteDoc();
+  }
+
+  private void commitAndThenAddOrDeleteDoc() {
+    processCommit();
+    try {
+      processAddOrDelete();
+      fail("No exception thrown");
+    } catch (Exception ex) {
+      assertTrue( "wrong exception thrown", 
+          ex.getMessage().contains("Why are we adding/deleting a doc through an already committed indexing batch?"));
+    }
+  }
+
+  /**
+   * Tests that an already committed indexing batch throws if committed again.
+   */
+  @Test
+  public void testCommitAfterCommitThrows() throws Exception {
+    processAddOrDelete();
+    doDoubleCommit();
+  }
+
+  /**
+   * Tests that an already isolated committed indexing batch throws if committed again.
+   */
+  @Test
+  public void testCommitAfterIsolatedCommitThrows() throws Exception {
+    doDoubleCommit();
+  }
+
+  private void doDoubleCommit() {
+    processCommit();
+    try {
+      processCommit();
+      fail("No exception thrown");
+    } catch (Exception ex) {
+      assertTrue( "wrong exception thrown",
+          ex.getMessage().contains("Why are we committing an already committed indexing batch?"));
+    }
+  }
+
+
+  private void processAddOrDelete() {
+    processor.addOrDeleteGoingToBeIndexedLocally();
+  }
+
+  private void processCommit() {
+    processor.hardCommitCompletedLocally();
+  }
+
+}
diff --git a/solr/core/src/test/org/apache/solr/store/shared/SharedStoreDistributedIndexingTest.java b/solr/core/src/test/org/apache/solr/store/shared/SharedStoreDistributedIndexingTest.java
new file mode 100644
index 0000000..8ff8e81
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/store/shared/SharedStoreDistributedIndexingTest.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.store.shared;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.store.blob.client.LocalStorageClient;
+import org.apache.solr.update.processor.DistributedUpdateProcessorFactory;
+import org.apache.solr.update.processor.DistributedZkUpdateProcessor;
+import org.apache.solr.update.processor.UpdateRequestProcessor;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests that indexing requests for {@link Replica.Type#SHARED} are distributed correctly through {@link DistributedZkUpdateProcessor}.
+ */
+public class SharedStoreDistributedIndexingTest extends SolrCloudSharedStoreTestCase {
+  private static Map<String, List<DistributedZkUpdateProcessor>> zkUpdateProcessors = new HashMap<>();
+
+  private static final String COLLECTION_NAME = "sharedCollection";
+  private static final String SHARD1_NAME = "shard1";
+  private static final String SHARD2_NAME = "shard2";
+
+  private Replica shard1LeaderReplica;
+  private Replica shard2LeaderReplica;
+  private Replica shard1FollowerReplica;
+  private Replica shard2FollowerReplica;
+
+  private SolrInputDocument[] shard1Docs = new SolrInputDocument[2];
+  private SolrInputDocument[] shard2Docs = new SolrInputDocument[2];
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    assumeWorkingMockito();
+    System.setProperty(LocalStorageClient.BLOB_STORE_LOCAL_FS_ROOT_DIR_PROPERTY,
+        blobDir.resolve(DEFAULT_BLOB_DIR_NAME).toString());
+
+    configureCluster(1)
+        .withSolrXml(TEST_PATH().resolve("solr-sharedstore.xml"))
+        .addConfig("conf", configset("shared-distrib-indexing"))
+        .configure();
+  }
+
+  @Before
+  public void setupTest() throws Exception {
+    assertEquals("wrong number of nodes", 1, cluster.getJettySolrRunners().size());
+
+    String shardNames = SHARD1_NAME + "," + SHARD2_NAME;
+    CollectionAdminRequest.Create create = CollectionAdminRequest
+        .createCollectionWithImplicitRouter(COLLECTION_NAME, "conf", shardNames, 0)
+        .setRouterField("shardName")
+        .setMaxShardsPerNode(4)
+        .setSharedIndex(true)
+        .setSharedReplicas(2);
+    create.process(cluster.getSolrClient());
+    // Verify that collection was created
+    waitForState("Timed-out wait for collection to be created", COLLECTION_NAME, clusterShape(2, 4));
+    DocCollection collection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
+    Slice shard1 = collection.getSlice(SHARD1_NAME);
+    assertEquals("wrong number of replicas", 2, shard1.getReplicas().size());
+    Slice shard2 = collection.getSlice(SHARD2_NAME);
+    assertEquals("wrong number of replicas", 2, shard2.getReplicas().size());
+
+    shard1LeaderReplica = shard1.getLeader();
+    shard2LeaderReplica = shard2.getLeader();
+    shard1FollowerReplica = shard1.getReplicas(r -> !r.getName().equals(shard1.getLeader().getName())).get(0);
+    shard2FollowerReplica = shard2.getReplicas(r -> !r.getName().equals(shard2.getLeader().getName())).get(0);
+
+    generateDocsForShard(SHARD1_NAME, shard1Docs);
+    generateDocsForShard(SHARD2_NAME, shard2Docs);
+
+    zkUpdateProcessors.clear();
+  }
+
+  private void generateDocsForShard(String shardName, SolrInputDocument[] docArray) {
+    for (int i = 1; i <= docArray.length ; i++) {
+      SolrInputDocument document = new SolrInputDocument();
+      String id = i + "." + shardName;
+      document.addField("id", id);
+      document.addField("shardName", shardName);
+      docArray[i - 1] = document;
+    }
+  }
+
+  @After
+  public void teardownTest() throws Exception {
+    if (cluster != null) {
+      cluster.deleteAllCollections();
+    }
+  }
+
+  /**
+   * Tests that an update request sent to the leader is handled correctly.
+   */
+  @Test
+  public void testIndexingBatchSentToLeader() throws Exception {
+
+    SolrClient shard1LeaderDirectClient  = getHttpSolrClient(shard1LeaderReplica.getBaseUrl() + "/" + shard1LeaderReplica.getCoreName());
+    try {
+
+      UpdateRequest req = new UpdateRequest();
+      req.add(shard1Docs[0]);
+      req.add(shard1Docs[1]);
+      req.deleteById("dummyId");
+      req.process(shard1LeaderDirectClient, null);
+
+      assertEquals("wrong number of zk processors instances used for shard1 leader", 1,
+          zkUpdateProcessors.get(shard1LeaderReplica.getCoreName()).size());
+      assertNull("wrong number of zk processors instances used for shard2 leader", 
+          zkUpdateProcessors.get(shard2LeaderReplica.getCoreName()));
+      assertNull("wrong number of zk processors instances used for shard1 follower", 
+          zkUpdateProcessors.get(shard1FollowerReplica.getCoreName()));
+      assertNull("wrong number of zk processors instances used for shard2 follower", 
+          zkUpdateProcessors.get(shard2FollowerReplica.getCoreName()));
+
+      // two adds, one delete and a commit for shard1 leader
+      SharedCoreIndexingBatchProcessor processor = zkUpdateProcessors.get(shard1LeaderReplica.getCoreName()).get(0).getSharedCoreIndexingBatchProcessor();
+      verify(processor, times(3)).addOrDeleteGoingToBeIndexedLocally();
+      verify(processor, times(1)).hardCommitCompletedLocally();
+    } finally {
+      shard1LeaderDirectClient.close();
+    }
+  }
+
+  /**
+   * Tests that an update request sent to a follower is handled correctly.
+   */
+  @Test
+  public void testIndexingBatchSentToFollower() throws Exception {
+
+    SolrClient shard1FollowerDirectClient  = getHttpSolrClient(shard1FollowerReplica.getBaseUrl() + "/" + shard1FollowerReplica.getCoreName());
+    try {
+
+      UpdateRequest req = new UpdateRequest();
+      req.add(shard1Docs[0]);
+      req.add(shard1Docs[1]);
+      req.process(shard1FollowerDirectClient, null);
+
+      assertEquals("wrong number of zk processors instances used for shard1 leader", 1,
+          zkUpdateProcessors.get(shard1LeaderReplica.getCoreName()).size());
+      assertNull("wrong number of zk processors instances used for shard2 leader",
+          zkUpdateProcessors.get(shard2LeaderReplica.getCoreName()));
+      assertEquals("wrong number of zk processors instances used for shard1 follower", 1,
+          zkUpdateProcessors.get(shard1FollowerReplica.getCoreName()).size());
+      assertNull("wrong number of zk processors instances used for shard2 follower",
+          zkUpdateProcessors.get(shard2FollowerReplica.getCoreName()));
+
+      // two adds and a commit for shard1 leader
+      SharedCoreIndexingBatchProcessor processor = zkUpdateProcessors.get(shard1LeaderReplica.getCoreName()).get(0).getSharedCoreIndexingBatchProcessor();
+      verify(processor, times(2)).addOrDeleteGoingToBeIndexedLocally();
+      verify(processor, times(1)).hardCommitCompletedLocally();
+      // isolated commit for shard1 follower
+      processor = zkUpdateProcessors.get(shard1FollowerReplica.getCoreName()).get(0).getSharedCoreIndexingBatchProcessor();
+      verify(processor, never()).addOrDeleteGoingToBeIndexedLocally();
+      verify(processor, times(1)).hardCommitCompletedLocally();
+    } finally {
+      shard1FollowerDirectClient.close();
+    }
+  }
+
+  /**
+   * Tests that an update request sent to other shard's leader is handled correctly.
+   */
+  @Test
+  public void testIndexingBatchSentToOtherShardsLeader() throws Exception {
+
+    SolrClient shard2LeaderDirectClient  = getHttpSolrClient(shard2LeaderReplica.getBaseUrl() + "/" + shard2LeaderReplica.getCoreName());
+    try {
+      UpdateRequest req = new UpdateRequest();
+      req.add(shard1Docs[0]);
+      req.add(shard1Docs[1]);
+      req.process(shard2LeaderDirectClient, null);
+
+      assertEquals("wrong number of zk processors instances used for shard1 leader", 1,
+          zkUpdateProcessors.get(shard1LeaderReplica.getCoreName()).size());
+      assertEquals("wrong number of zk processors instances used for shard2 leader", 1,
+          zkUpdateProcessors.get(shard2LeaderReplica.getCoreName()).size());
+      assertNull("wrong number of zk processors instances used for shard1 follower",
+          zkUpdateProcessors.get(shard1FollowerReplica.getCoreName()));
+      assertNull("wrong number of zk processors instances used for shard2 follower",
+          zkUpdateProcessors.get(shard2FollowerReplica.getCoreName()));
+
+      // two adds and a commit for shard1 leader
+      SharedCoreIndexingBatchProcessor processor = zkUpdateProcessors.get(shard1LeaderReplica.getCoreName()).get(0).getSharedCoreIndexingBatchProcessor();
+      verify(processor, times(2)).addOrDeleteGoingToBeIndexedLocally();
+      verify(processor, times(1)).hardCommitCompletedLocally();
+      // isolated commit for shard2 leader
+      processor = zkUpdateProcessors.get(shard2LeaderReplica.getCoreName()).get(0).getSharedCoreIndexingBatchProcessor();
+      verify(processor, never()).addOrDeleteGoingToBeIndexedLocally();
+      verify(processor, times(1)).hardCommitCompletedLocally();
+    } finally {
+      shard2LeaderDirectClient.close();
+    }
+  }
+
+  /**
+   * Tests that in a single request documents belonging to two different shards are handled correctly.
+   */
+  @Test
+  public void testMultiShardIndexingBatch() throws Exception {
+
+    SolrClient shard1LeaderDirectClient  = getHttpSolrClient(shard1LeaderReplica.getBaseUrl() + "/" + shard1LeaderReplica.getCoreName());
+    try {
+
+      UpdateRequest req = new UpdateRequest();
+      req.add(shard1Docs[0]);
+      req.add(shard2Docs[0]);
+      req.add(shard1Docs[1]);
+      req.add(shard2Docs[1]);
+      req.process(shard1LeaderDirectClient, null);
+      
+      assertEquals("wrong number of zk processors instances used for shard1 leader", 1,
+          zkUpdateProcessors.get(shard1LeaderReplica.getCoreName()).size());
+      // DistributedZkUpdateProcessor forwards the documents to other nodes through StreamingSolrClients#getSolrClient
+      // such that, they are streamed through a single update request
+      assertEquals("wrong number of zk processors instances used for shard2 leader", 1,
+          zkUpdateProcessors.get(shard2LeaderReplica.getCoreName()).size());
+      assertNull("wrong number of zk processors instances used for shard1 follower",
+          zkUpdateProcessors.get(shard1FollowerReplica.getCoreName()));
+      assertNull("wrong number of zk processors instances used for shard2 follower",
+          zkUpdateProcessors.get(shard2FollowerReplica.getCoreName()));
+
+      // two adds and a commit for shard1
+      SharedCoreIndexingBatchProcessor processor = zkUpdateProcessors.get(shard1LeaderReplica.getCoreName()).get(0).getSharedCoreIndexingBatchProcessor();
+      verify(processor, times(2)).addOrDeleteGoingToBeIndexedLocally();
+      verify(processor, times(1)).hardCommitCompletedLocally();
+      // two adds and a commit for shard2
+      processor = zkUpdateProcessors.get(shard2LeaderReplica.getCoreName()).get(0).getSharedCoreIndexingBatchProcessor();
+      verify(processor, times(2)).addOrDeleteGoingToBeIndexedLocally();
+      verify(processor, times(1)).hardCommitCompletedLocally();
+    } finally {
+      shard1LeaderDirectClient.close();
+    }
+  }
+
+  /**
+   * The purpose of this test {@link DistributedUpdateProcessorFactory} is to produce a {@link DistributedZkUpdateProcessor
+   * that can be spied upon. The spied instance's implementation is not changed as such.
+   */
+  public static class TestDistributedUpdateProcessorFactory extends DistributedUpdateProcessorFactory {
+    @Override
+    public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
+      DistributedZkUpdateProcessor zkUpdateProcessor = (DistributedZkUpdateProcessor) super.getInstance(req, rsp, next);
+      SharedCoreIndexingBatchProcessor sharedCoreIndexingBatchProcessor = zkUpdateProcessor.getSharedCoreIndexingBatchProcessor();
+      sharedCoreIndexingBatchProcessor = spy(sharedCoreIndexingBatchProcessor);
+      zkUpdateProcessor = spy(zkUpdateProcessor);
+      doReturn(sharedCoreIndexingBatchProcessor).when(zkUpdateProcessor).getSharedCoreIndexingBatchProcessor();
+      String coreName = req.getCore().getName();
+      zkUpdateProcessors.computeIfAbsent(coreName, k -> new ArrayList<>()).add(zkUpdateProcessor);
+      return zkUpdateProcessor;
+    }
+  }
+
+}
diff --git a/solr/core/src/test/org/apache/solr/store/shared/SolrCloudSharedStoreTestCase.java b/solr/core/src/test/org/apache/solr/store/shared/SolrCloudSharedStoreTestCase.java
index edb6fec..83c1c25 100644
--- a/solr/core/src/test/org/apache/solr/store/shared/SolrCloudSharedStoreTestCase.java
+++ b/solr/core/src/test/org/apache/solr/store/shared/SolrCloudSharedStoreTestCase.java
@@ -37,7 +37,7 @@ import org.apache.solr.store.blob.process.CorePullTask.PullCoreCallback;
 import org.apache.solr.store.blob.process.CorePullerFeeder;
 import org.apache.solr.store.blob.process.CoreSyncStatus;
 import org.apache.solr.store.blob.provider.BlobStorageProvider;
-import org.junit.After;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
@@ -70,8 +70,8 @@ public class SolrCloudSharedStoreTestCase extends SolrCloudTestCase {
     blobDir = createTempDir("tempDir");
   }
   
-  @After
-  public void cleanupBlobDirectory() throws Exception {
+  @AfterClass
+  public static void cleanupBlobDirectory() throws Exception {
     if (blobDir != null) {
       FileUtils.cleanDirectory(blobDir.toFile());
     }
diff --git a/solr/core/src/test/org/apache/solr/update/processor/DistributedZkUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/DistributedZkUpdateProcessorTest.java
deleted file mode 100644
index eba8819..0000000
--- a/solr/core/src/test/org/apache/solr/update/processor/DistributedZkUpdateProcessorTest.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.update.processor;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.SimpleOrderedMap;
-import org.apache.solr.core.CoreContainer;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.request.LocalSolrQueryRequest;
-import org.apache.solr.request.SolrQueryRequest;
-import org.apache.solr.response.SolrQueryResponse;
-import org.apache.solr.store.blob.process.CoreUpdateTracker;
-import org.apache.solr.store.shared.SolrCloudSharedStoreTestCase;
-import org.apache.solr.update.AddUpdateCommand;
-import org.apache.solr.update.CommitUpdateCommand;
-import org.junit.After;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-/**
- * Test for the {@link DistributedZkUpdateProcessor}.
- */
-public class DistributedZkUpdateProcessorTest extends SolrCloudSharedStoreTestCase {    
-  
-  @BeforeClass
-  public static void setupTestClass() throws Exception {
-    assumeWorkingMockito();
-  }
-  
-  @After
-  public void teardownTest() throws Exception {
-    shutdownCluster();
-  }
-  
-  /**
-   * Test that shared replicas can only route update requests from followers to leaders
-   * by verifying that non leaders fail on forwarded updates. 
-   * Note we assume shared replicas always route update requests such that commit=true 
-   */
-  @Test
-  public void testNonLeaderSharedReplicaFailsOnForwardedCommit() throws Exception {
-    setupCluster(1);
-
-    String collectionName = "sharedCollection";
-    CloudSolrClient cloudClient = cluster.getSolrClient();
-
-    setupSharedCollectionWithShardNames(collectionName, 2, 2, "shard1");
-
-    // get the replica that's not the leader for the shard
-    DocCollection collection = cloudClient.getZkStateReader().getClusterState().getCollection(collectionName);
-    Slice slice = collection.getSlice("shard1");
-    Replica leaderReplica = collection.getLeader("shard1");
-    Replica follower = null;
-    for (Replica repl : slice.getReplicas()) {
-      if (!repl.getName().equals(leaderReplica.getName())) {
-        follower = repl;
-        break;
-      }
-    }
-
-    if (follower == null) {
-      fail("This test has been misconfigured");
-    }
-
-    DistributedZkUpdateProcessor processor = null;
-    SolrCore core = getCoreContainer(follower.getNodeName()).getCore(follower.getCoreName());
-    try {
-      // Setup request such that the COMMIT_END_POINT is set to replicas. This indicates that
-      // a leader has forwarded an update to its replicas which isn't possible but a safety 
-      // check exists nonetheless. 
-      ModifiableSolrParams params = new ModifiableSolrParams();
-      params.set(DistributedZkUpdateProcessor.COMMIT_END_POINT, "replicas");
-      SolrQueryRequest req = new LocalSolrQueryRequest(core, params);
-
-      // have the follower process the command as if it was forwarded it
-      SolrQueryResponse rsp = new SolrQueryResponse();
-      CoreUpdateTracker tracker = Mockito.mock(CoreUpdateTracker.class);
-      processor = new DistributedZkUpdateProcessor(req, rsp, null, tracker);
-      CommitUpdateCommand cmd = new CommitUpdateCommand(req, false);
-
-      processor.processCommit(cmd);
-      fail("Exception should have been thrown on processCommit");
-    } catch (SolrException ex) {
-      assertTrue(ex.getMessage().contains("Unexpected indexing forwarding"));
-    } finally {
-      if (processor != null) {
-        processor.finish();
-        processor.close();
-      }
-      core.close();
-    }
-  }
-  
-  /**
-   * Create shared collection, create SHARED replica, index data, and confirm that commit writes to shared store.
-   */
-  @Test
-  public void testSharedReplicaUpdate() throws Exception {
-    boolean isUpdateAnIsolatedCommit = false;
-    // if update has something to add or delete(i.e. not an isolated commit)
-    // we expect a write to shared store
-    boolean isWriteToSharedStoreExpected = true;
-    testReplicaUpdatesZk(Replica.Type.SHARED, isUpdateAnIsolatedCommit, isWriteToSharedStoreExpected);
-  }
-
-  /**
-   * Create shared collection, create SHARED replica, and confirm that isolated commit does not write to shared store.
-   */
-  @Test
-  public void testSharedReplicaIsolatedCommit() throws Exception {
-    boolean isUpdateAnIsolatedCommit = true;
-    // if update has nothing to add or delete(i.e. just an isolated commit)
-    // we expect no write to shared store
-    boolean isWriteToSharedStoreExpected = false;
-    testReplicaUpdatesZk(Replica.Type.SHARED, isUpdateAnIsolatedCommit, isWriteToSharedStoreExpected);
-  }
-
-  /**
-   * Creates a collection with desired ({@code replicaType}), makes a desired {@code isUpdateAnIsolatedCommit} update
-   * to the collection and ensure that write to shared store happened as expected {@code isWriteToSharedStoreExpected}.
-   * 
-   * @param replicaType only SHARED and NRT are supported
-   * @param isUpdateAnIsolatedCommit if true add one document and then commit, otherwise, just an isolated commit
-   *                                 Few ways isolated commit can manifest in actual usage:
-   *                                 1. Client does indexing for while before issuing a separate commit.
-   *                                 2. SolrJ client issuing a separate follow up commit command to affected shards than
-   *                                    actual indexing request even when SolrJ client's caller issued a single update
-   *                                    command with commit=true.
-   * @param isWriteToSharedStoreExpected whether write to shared store is expected or not
-   */
-  private void testReplicaUpdatesZk(Replica.Type replicaType, boolean isUpdateAnIsolatedCommit, boolean isWriteToSharedStoreExpected) throws Exception {
-    setupCluster(1);
-
-    // Set collection name and create client
-    String collectionName = "testCollection";
-    CloudSolrClient cloudClient = cluster.getSolrClient();
-
-    // Create collection
-    if (replicaType == Replica.Type.SHARED) {
-      setupSharedCollectionWithShardNames(collectionName, 1, 1, "shard1");
-    } else if (replicaType == Replica.Type.NRT) {
-      CollectionAdminRequest.Create create = CollectionAdminRequest
-          .createCollectionWithImplicitRouter(collectionName, "conf", "shard2", 0)
-          .setSharedIndex(false)
-          .setNrtReplicas(1);
-      create.process(cloudClient);
-    } else {
-      throw new IllegalArgumentException(replicaType.name() + " replica type is not supported.");
-    }
-
-    // Verify that collection was created
-    waitForState("Timed-out wait for collection to be created", collectionName, clusterShape(1, 1));
-    assertTrue(cloudClient.getZkStateReader().getZkClient().exists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collectionName, false));
-    DocCollection collection = cloudClient.getZkStateReader().getClusterState().getCollection(collectionName);
-
-    DistributedZkUpdateProcessor processor = null;
-    CoreUpdateTracker tracker = Mockito.mock(CoreUpdateTracker.class);
-    // Get core from collection
-    Replica newReplica = collection.getReplicas().get(0);
-    SolrCore core = getCoreContainer(newReplica.getNodeName()).getCore(newReplica.getCoreName());
-    try {
-      // Verify that replica type is as expected
-      assertEquals("wrong replica type", core.getCoreDescriptor().getCloudDescriptor().getReplicaType(), replicaType);
-
-      // Mock out DistributedZkUpdateProcessor
-      SolrQueryResponse rsp = new SolrQueryResponse();
-      rsp.addResponseHeader(new SimpleOrderedMap<>());
-      SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams());
-      processor = new DistributedZkUpdateProcessor(req, rsp, null, tracker);
-      if (!isUpdateAnIsolatedCommit) {
-        // index a doc
-        AddUpdateCommand cmd = new AddUpdateCommand(req);
-        cmd.solrDoc = new SolrInputDocument();
-        cmd.solrDoc.addField("id", "1");
-        processor.processAdd(cmd);
-      }
-
-      // Make a commit
-      processor.processCommit(new CommitUpdateCommand(req, false));
-    } finally {
-      if (processor != null) {
-        processor.finish();
-        processor.close();
-      }
-      core.close();
-    }
-
-    if (isWriteToSharedStoreExpected) {
-      // Verify that core tracker was updated
-      verify(tracker).persistShardIndexToSharedStore(any(), any(), any(), any());
-    } else {
-      // Verify that core tracker was not updated
-      verify(tracker, never()).persistShardIndexToSharedStore(any(), any(), any(), any());
-    }
-  }
-
-  /**
-   * Create collection, create NRT replica, index data, and confirm that commit does not write to shared store.
-   */
-  @Test
-  public void testNRTReplicaUpdate() throws Exception {
-    boolean isIsolatedCommit = false;
-    // update to non-SHARED replica should not write to shared store
-    boolean isWriteToSharedStoreExpected = false;
-    testReplicaUpdatesZk(Replica.Type.NRT, isIsolatedCommit, isWriteToSharedStoreExpected);
-  }
-
-  /**
-   * Test update only happens on leader replica for a 1 shard shared collection
-   */
-  @Test
-  public void testSharedReplicaSimpleUpdateOnLeaderSuccess() throws Exception {
-    setupCluster(3);
-    
-    String collectionName = "sharedCollection";
-    CloudSolrClient cloudClient = cluster.getSolrClient();
-    
-    setupSharedCollectionWithShardNames(collectionName, 1, 2, "shard1");
-    
-    // do an update
-    UpdateRequest req = new UpdateRequest();
-    req.add("id", "1");
-    req.commit(cluster.getSolrClient(), collectionName);
-    
-    DocCollection collection = cloudClient.getZkStateReader().getClusterState().getCollection(collectionName);
-    Replica leaderReplica = collection.getLeader("shard1");
-    
-    Replica followerReplica = null;
-    for (Replica replica : collection.getReplicas()) {
-      if (!replica.getName().equals(leaderReplica.getName())) {
-        followerReplica = replica;
-        break;
-      }
-    }
-    
-    SolrCore leaderReplicaCore = null;
-    SolrCore followerReplicaCore = null;
-    try {
-      // verify this last update didn't happen on the follower
-      CoreContainer ccLeader = getCoreContainer(leaderReplica.getNodeName());
-      leaderReplicaCore = ccLeader.getCore(leaderReplica.getCoreName());
-      
-      CoreContainer ccFollower = getCoreContainer(followerReplica.getNodeName());
-      followerReplicaCore = ccFollower.getCore(followerReplica.getCoreName());
-      
-      // the follower's core should only have its default segment file from creation
-      assertEquals(1, followerReplicaCore.getDeletionPolicy().getLatestCommit().getFileNames().size());
-      // the commit should have only happened on the leader and it should have more index files than the default 
-      assertTrue(leaderReplicaCore.getDeletionPolicy().getLatestCommit().getFileNames().size() > 1);
-    } finally {
-      if (leaderReplica != null) {
-        leaderReplicaCore.close();
-      }
-      if (followerReplica != null) {
-        followerReplicaCore.close();
-      }
-    }
-  }
-}
\ No newline at end of file