You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2019/10/30 20:45:08 UTC

[lucene-solr] 03/11: @W-6587409 Make sure newly pulled blob contents are considered source… (#389)

This is an automated email from the ASF dual-hosted git repository.

yonik pushed a commit to branch jira/SOLR-13101
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit d63c03224df89015d41721ea686de212791714ca
Author: Bilal Waheed <mw...@salesforce.com>
AuthorDate: Mon Sep 30 17:21:34 2019 -0400

    @W-6587409 Make sure newly pulled blob contents are considered source… (#389)
    
    @W-6587409 Make sure newly pulled blob contents are considered source of truth
    
    -When local index contents conflict with contents to be pulled from blob we move core to new index dir when pulling blob contents
    Two cases:
    1. local index is at higher generation number than blob's generation number
    2. same index file exist in both places with different size/checksum
---
 .../solr/store/blob/client/BlobCoreMetadata.java   |  84 +++++----
 .../store/blob/client/BlobCoreMetadataBuilder.java |  16 +-
 .../solr/store/blob/metadata/CorePushPull.java     | 185 +++++++++++++------
 .../store/blob/metadata/ServerSideMetadata.java    | 118 +++++++++---
 .../blob/metadata/SharedStoreResolutionUtil.java   |  94 +++++++---
 .../store/blob/client/CoreStorageClientTest.java   |   2 +-
 .../store/blob/metadata/BlobCoreMetadataTest.java  |  13 +-
 .../solr/store/blob/metadata/CorePushPullTest.java | 107 ++++++++++-
 .../blob/metadata/DeleteBlobStrategyTest.java      |  20 +-
 .../metadata/SharedStoreResolutionUtilTest.java    | 203 ++++++++++++++++-----
 .../solr/store/blob/metadata/ToFromJsonTest.java   |  17 +-
 11 files changed, 645 insertions(+), 214 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/store/blob/client/BlobCoreMetadata.java b/solr/core/src/java/org/apache/solr/store/blob/client/BlobCoreMetadata.java
index b234550..8bf2f2a 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/client/BlobCoreMetadata.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/client/BlobCoreMetadata.java
@@ -2,6 +2,7 @@ package org.apache.solr.store.blob.client;
 
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 
@@ -20,6 +21,14 @@ public class BlobCoreMetadata {
     private final String sharedBlobName;
 
     /**
+     * Generation number of index represented by this metadata.
+     * This generation number is only meant to identify a scenario where local index generation number is higher than
+     * what we have in blob. In that scenario we would switch index to a new directory when pulling contents from blob. 
+     * Because in the presence of higher generation number locally, blob contents cannot establish their legitimacy.
+     */
+    private final long generation;
+
+    /**
      * Unique identifier of this metadata, that changes on every update to the metadata (except generating a new corrupt metadata
      * through {@link #getCorruptOf}).
      */
@@ -57,16 +66,17 @@ public class BlobCoreMetadata {
      * It always builds non "isCorrupt" and non "isDeleted" metadata. 
      * The only way to build an instance of "isCorrupt" metadata is to use {@link #getCorruptOf} and for "isDeleted" use {@link #getDeletedOf()}
      */
-    BlobCoreMetadata(String sharedBlobName, BlobFile[] blobFiles, BlobFileToDelete[] blobFilesToDelete) {
-        this(sharedBlobName, blobFiles, blobFilesToDelete, UUID.randomUUID().toString(), false,
+    BlobCoreMetadata(String sharedBlobName, BlobFile[] blobFiles, BlobFileToDelete[] blobFilesToDelete, long generation) {
+        this(sharedBlobName, blobFiles, blobFilesToDelete, generation, UUID.randomUUID().toString(), false,
                 false);
     }
 
-    private BlobCoreMetadata(String sharedBlobName, BlobFile[] blobFiles, BlobFileToDelete[] blobFilesToDelete, 
+    private BlobCoreMetadata(String sharedBlobName, BlobFile[] blobFiles, BlobFileToDelete[] blobFilesToDelete, long generation,
         String uniqueIdentifier, boolean isCorrupt, boolean isDeleted) {
         this.sharedBlobName = sharedBlobName;
         this.blobFiles = blobFiles;
         this.blobFilesToDelete = blobFilesToDelete;
+        this.generation = generation;
         this.uniqueIdentifier = uniqueIdentifier;
         this.isCorrupt = isCorrupt;
         this.isDeleted = isDeleted;
@@ -78,7 +88,7 @@ public class BlobCoreMetadata {
      */
     public BlobCoreMetadata getCorruptOf() {
         assert !isCorrupt;
-        return new BlobCoreMetadata(sharedBlobName, blobFiles, blobFilesToDelete, uniqueIdentifier, true, isDeleted);
+        return new BlobCoreMetadata(sharedBlobName, blobFiles, blobFilesToDelete, generation, uniqueIdentifier, true, isDeleted);
     }
 
     /**
@@ -88,7 +98,7 @@ public class BlobCoreMetadata {
      */
     public BlobCoreMetadata getDeletedOf() {
         assert !isDeleted;
-        return new BlobCoreMetadata(sharedBlobName, blobFiles, blobFilesToDelete, uniqueIdentifier, isCorrupt, true);
+        return new BlobCoreMetadata(sharedBlobName, blobFiles, blobFilesToDelete, generation, uniqueIdentifier, isCorrupt, true);
     }
 
     /**
@@ -123,6 +133,10 @@ public class BlobCoreMetadata {
         return uniqueIdentifier;
     }
 
+    public long getGeneration() {
+        return this.generation;
+    }
+
     public BlobFile[] getBlobFiles() {
         return blobFiles;
     }
@@ -138,6 +152,7 @@ public class BlobCoreMetadata {
 
         BlobCoreMetadata that = (BlobCoreMetadata) o;
 
+        if (this.generation != that.generation) return false;
         if (this.isCorrupt != that.isCorrupt) return false;
         if (this.isDeleted != that.isDeleted) return false;
         if (!this.uniqueIdentifier.equals(that.uniqueIdentifier)) return false;
@@ -156,19 +171,17 @@ public class BlobCoreMetadata {
 
     @Override
     public int hashCode() {
-        int result = sharedBlobName.hashCode();
-        result = 31 * result + uniqueIdentifier.hashCode();
-        // The array of files is not ordered so need to compare as a set
-        result = 31 * result + new HashSet<>(Arrays.asList(this.blobFiles)).hashCode();
-        result = 31 * result + new HashSet<>(Arrays.asList(this.blobFilesToDelete)).hashCode();
-        result = 31 * result + (isCorrupt ? 1 : 0);
-        result = 31 * result + (isDeleted ? 1 : 0);
-        return result;
+        return Objects.hash(sharedBlobName, uniqueIdentifier, generation,
+            // The array of files is not ordered so need to compare as a set
+            new HashSet<>(Arrays.asList(this.blobFiles)).hashCode(),
+            new HashSet<>(Arrays.asList(this.blobFilesToDelete)).hashCode(),
+            isCorrupt, isDeleted);
     }
 
     @Override
     public String toString() {
-        return "sharedBlobName=" + sharedBlobName + " isCorrupt=" + isCorrupt + " uniqueIdentifier=" + uniqueIdentifier;
+        return "sharedBlobName=" + sharedBlobName +  " generation=" + generation 
+            + " isCorrupt=" + isCorrupt + " uniqueIdentifier=" + uniqueIdentifier;
     }
 
     /**
@@ -187,14 +200,18 @@ public class BlobCoreMetadata {
          */
         private final String blobName;
 
-        // TODO add some checksum here to verify blob files are not corrupt
-
         private final long fileSize;
 
-        public BlobFile(String solrFileName, String blobName, long fileSize) {
+        /**
+         * Lucene generated checksum of the file. It is used in addition to file size to compare local and blob files.
+         */
+        private final long checksum;
+
+        public BlobFile(String solrFileName, String blobName, long fileSize, long checksum) {
             this.solrFileName = solrFileName;
             this.blobName = blobName;
             this.fileSize = fileSize;
+            this.checksum = checksum;
         }
 
         @Override
@@ -202,19 +219,17 @@ public class BlobCoreMetadata {
             if (this == o) return true;
             if (o == null || getClass() != o.getClass()) return false;
 
-            BlobFile blobFile = (BlobFile) o;
+            BlobFile other = (BlobFile) o;
 
-            if (fileSize != blobFile.fileSize) return false;
-            if (!solrFileName.equals(blobFile.solrFileName)) return false;
-            return blobName.equals(blobFile.blobName);
+            return Objects.equals(solrFileName, other.solrFileName) &&
+                Objects.equals(blobName, other.blobName) &&
+                Objects.equals(checksum, other.checksum) &&
+                Objects.equals(fileSize, other.fileSize);
         }
 
         @Override
         public int hashCode() {
-            int result = solrFileName.hashCode();
-            result = 31 * result + blobName.hashCode();
-            result = 31 * result + (int) (fileSize ^ (fileSize >>> 32));
-            return result;
+            return Objects.hash(solrFileName, blobName, fileSize, checksum);
         }
 
         public String getSolrFileName() {
@@ -228,7 +243,10 @@ public class BlobCoreMetadata {
         public long getFileSize() {
             return this.fileSize;
         }
-        
+
+        public long getChecksum() {
+            return this.checksum;
+        }
     }
 
     /**
@@ -247,14 +265,14 @@ public class BlobCoreMetadata {
       // likelyhood of a really really slow update by another server causing a race is low).
       private final long deletedAt;
 
-      public BlobFileToDelete(String solrFileName, String blobName, long fileSize, long deletedAt) {
-        super(solrFileName, blobName, fileSize);
+      public BlobFileToDelete(String solrFileName, String blobName, long fileSize, long checksum, long deletedAt) {
+        super(solrFileName, blobName, fileSize, checksum);
 
         this.deletedAt = deletedAt;
       }
 
       public BlobFileToDelete(BlobFile bf, long deletedAt) {
-        super(bf.solrFileName, bf.blobName, bf.fileSize);
+        super(bf.solrFileName, bf.blobName, bf.fileSize, bf.checksum);
 
         this.deletedAt = deletedAt;
       }
@@ -265,16 +283,14 @@ public class BlobCoreMetadata {
         if (o == null || getClass() != o.getClass()) return false;
         if (!super.equals(o)) return false;
 
-        BlobFileToDelete that = (BlobFileToDelete) o;
+        BlobFileToDelete other = (BlobFileToDelete) o;
 
-        return deletedAt == that.deletedAt;
+        return deletedAt == other.deletedAt;
       }
 
       @Override
       public int hashCode() {
-        int result = super.hashCode();
-        result = 31 * result + (int) (deletedAt ^ (deletedAt >>> 32));
-        return result;
+          return Objects.hash(super.hashCode(), deletedAt);
       }
 
       public long getDeletedAt() {
diff --git a/solr/core/src/java/org/apache/solr/store/blob/client/BlobCoreMetadataBuilder.java b/solr/core/src/java/org/apache/solr/store/blob/client/BlobCoreMetadataBuilder.java
index a8c2676..bc43464 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/client/BlobCoreMetadataBuilder.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/client/BlobCoreMetadataBuilder.java
@@ -6,22 +6,30 @@ import java.util.*;
  * Builder for {@link BlobCoreMetadata}.
  */
 public class BlobCoreMetadataBuilder {
+    /**
+     * Generation number in metadata of cores not existing on the Blob Store.
+     */
+    public static final long UNDEFINED_VALUE = -1L;
 
     final private String sharedBlobName;
+    final private long generation;
     final private Set<BlobCoreMetadata.BlobFile> blobFiles;
     final private Set<BlobCoreMetadata.BlobFileToDelete> blobFilesToDelete;
 
-    public BlobCoreMetadataBuilder(String sharedBlobName) {
+    public BlobCoreMetadataBuilder(String sharedBlobName, long generation) {
         this.sharedBlobName = sharedBlobName;
+        this.generation= generation;
         this.blobFiles = new HashSet<>();
         this.blobFilesToDelete = new HashSet<>();
     }
 
     /**
      * Builder used for "cloning" then modifying an existing instance of {@link BlobCoreMetadata}.
+     * The new generation has to be passed in because it is final and can't be set later.
      */
-    public BlobCoreMetadataBuilder(BlobCoreMetadata bcm) {
+    public BlobCoreMetadataBuilder(BlobCoreMetadata bcm, long generation) {
         this.sharedBlobName = bcm.getSharedBlobName();
+        this.generation = generation;
         this.blobFiles = new HashSet<>(Arrays.asList(bcm.getBlobFiles()));
         this.blobFilesToDelete = new HashSet<>(Arrays.asList(bcm.getBlobFilesToDelete()));
     }
@@ -34,7 +42,7 @@ public class BlobCoreMetadataBuilder {
      * Builds a {@link BlobCoreMetadata} for a non existing core of a given name.
      */
     static public BlobCoreMetadata buildEmptyCoreMetadata(String sharedBlobName) {
-        return (new BlobCoreMetadataBuilder(sharedBlobName)).build();
+        return (new BlobCoreMetadataBuilder(sharedBlobName, UNDEFINED_VALUE)).build();
     }
 
     /**
@@ -90,6 +98,6 @@ public class BlobCoreMetadataBuilder {
         BlobCoreMetadata.BlobFile[] blobFilesArray = this.blobFiles.toArray(new BlobCoreMetadata.BlobFile[this.blobFiles.size()]);
         BlobCoreMetadata.BlobFileToDelete[] blobFilesToDeleteArray = this.blobFilesToDelete.toArray(new BlobCoreMetadata.BlobFileToDelete[this.blobFilesToDelete.size()]);
 
-        return new BlobCoreMetadata(this.sharedBlobName, blobFilesArray, blobFilesToDeleteArray);
+        return new BlobCoreMetadata(this.sharedBlobName, blobFilesArray, blobFilesToDeleteArray, generation);
     }
 }
diff --git a/solr/core/src/java/org/apache/solr/store/blob/metadata/CorePushPull.java b/solr/core/src/java/org/apache/solr/store/blob/metadata/CorePushPull.java
index 8245651..a96c5a5 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/metadata/CorePushPull.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/metadata/CorePushPull.java
@@ -13,6 +13,7 @@ import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.lucene.codecs.CodecUtil;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
@@ -110,7 +111,7 @@ public class CorePushPull {
 
         try {
           // Creating the new BlobCoreMetadata as a modified clone of the existing one
-          BlobCoreMetadataBuilder bcmBuilder = new BlobCoreMetadataBuilder(blobMetadata);
+          BlobCoreMetadataBuilder bcmBuilder = new BlobCoreMetadataBuilder(blobMetadata, solrServerMetadata.getGeneration());
 
           // First copy index files over to a temp directory and then push to blob store from there. 
           // This is to avoid holding a lock over index directory involving network operation.
@@ -132,14 +133,14 @@ public class CorePushPull {
           // that can potentially be stopped in the middle because of a concurrent merge deleting the segment files being pushed. 
 
           // create a temp directory (within the core local folder).
-          String tempIndexDirName = solrCore.getDataDir() + "index.push." + System.nanoTime();
-          Directory tempIndexDir = solrCore.getDirectoryFactory().get(tempIndexDirName, DirectoryFactory.DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
+          String tempIndexDirPath = solrCore.getDataDir() + "index.push." + System.nanoTime();
+          Directory tempIndexDir = solrCore.getDirectoryFactory().get(tempIndexDirPath, DirectoryFactory.DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
           try {
             Directory indexDir = solrCore.getDirectoryFactory().get(solrCore.getIndexDir(), DirectoryFactory.DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
             try {
               // copy index files to the temp directory
               for (CoreFileData cfd : resolvedMetadataResult.getFilesToPush()) {
-                copyFileToDirectory(indexDir, cfd.fileName, tempIndexDir);
+                copyFileToDirectory(indexDir, cfd.getFileName(), tempIndexDir);
               }
             } finally {
               solrCore.getDirectoryFactory().release(indexDir);
@@ -176,7 +177,8 @@ public class CorePushPull {
               
               String blobCoreMetadataName = BlobStoreUtils.buildBlobStoreMetadataName(pushPullData.getLastReadMetadataSuffix());
               String coreMetadataPath = blobMetadata.getSharedBlobName() + "/" + blobCoreMetadataName;
-              BlobCoreMetadata.BlobFileToDelete bftd = new BlobCoreMetadata.BlobFileToDelete("", coreMetadataPath, bcmSize, System.currentTimeMillis());
+              // so far checksum is not used for metadata file
+              BlobCoreMetadata.BlobFileToDelete bftd = new BlobCoreMetadata.BlobFileToDelete("", coreMetadataPath, bcmSize, BlobCoreMetadataBuilder.UNDEFINED_VALUE, System.currentTimeMillis());
               bcmBuilder.addFileToDelete(bftd);
             }
             
@@ -184,13 +186,21 @@ public class CorePushPull {
             // But this is untrue/totally false/misleading. SnapPuller has File all over.
             for (CoreFileData cfd : resolvedMetadataResult.getFilesToPush()) {
               // Sanity check that we're talking about the same file (just sanity, Solr doesn't update files so should never be different)
-              assert cfd.fileSize == tempIndexDir.fileLength(cfd.fileName);
+              assert cfd.getFileSize() == tempIndexDir.fileLength(cfd.getFileName());
 
-              String blobPath = pushFileToBlobStore(coreStorageClient, tempIndexDir, cfd.fileName, cfd.fileSize);
-              bcmBuilder.addFile(new BlobCoreMetadata.BlobFile(cfd.fileName, blobPath, cfd.fileSize));
+              String blobPath = pushFileToBlobStore(coreStorageClient, tempIndexDir, cfd.getFileName(), cfd.getFileSize());
+              bcmBuilder.addFile(new BlobCoreMetadata.BlobFile(cfd.getFileName(), blobPath, cfd.getFileSize(), cfd.getChecksum()));
             }
           } finally {
-            removeTempDirectory(solrCore, tempIndexDirName, tempIndexDir);
+            try {
+              // Remove temp directory
+              solrCore.getDirectoryFactory().doneWithDirectory(tempIndexDir);
+              solrCore.getDirectoryFactory().remove(tempIndexDir);
+            } catch (Exception e) {
+              log.warn("Cannot remove temp directory " + tempIndexDirPath, e);
+            } finally {
+              solrCore.getDirectoryFactory().release(tempIndexDir);
+            }
           }
           
           // delete what we need
@@ -206,7 +216,7 @@ public class CorePushPull {
         }
       } finally {
         long filesAffected = resolvedMetadataResult.getFilesToPush().size();
-        long bytesTransferred = resolvedMetadataResult.getFilesToPush().stream().mapToLong(cfd -> cfd.fileSize).sum();
+        long bytesTransferred = resolvedMetadataResult.getFilesToPush().stream().mapToLong(cfd -> cfd.getFileSize()).sum();
         
         // todo correctness stuff
         logBlobAction("PUSH", filesAffected, bytesTransferred, requestQueuedTimeMs, attempt, startTimeMs);
@@ -226,7 +236,6 @@ public class CorePushPull {
 
     /**
      * We're doing here what replication does in {@link org.apache.solr.handler.IndexFetcher#fetchLatestIndex}.<p>
-     *     TODO: check changes in Solr.7's IndexFetcher. Core reloading needed?
      *
      * This method will work in 2 cases:
      * <ol>
@@ -255,58 +264,72 @@ public class CorePushPull {
           if (solrCore == null) {
             throw new Exception("Can't find core " + pushPullData.getCoreName());
           }
-
+          // if there is a conflict between local and blob contents we will move the core to a new index directory
+          final boolean createNewIndexDir = resolvedMetadataResult.isLocalConflictingWithBlob();
+          boolean coreSwitchedToNewIndexDir = false;
           try {
-            // We're here because we identified no conflicts in downloading from the Blob store the files missing locally.
-            // In order to make sure there still are no conflicts (local Solr server on which we run might have updated the
-            // core since we checked or might do so as we download files shortly), we'll download the needed files to a temp
-            // dir and before moving them to the core directory, we will check the directory hasn't changed locally.
-
             // Create temp directory (within the core local folder).
-            String tmpIdxDirName = solrCore.getDataDir() + "index.pull." + System.nanoTime();
-            Directory tmpIndexDir = solrCore.getDirectoryFactory().get(tmpIdxDirName, DirectoryFactory.DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
+            // If we are moving index to a new directory because of conflict then this will be that new directory.
+            // Even if we are not moving to a newer directory we will first download files from blob store into this temp directory.
+            // Then we will take a lock over index directory and move files from temp directory to index directory. This is to avoid
+            // involving a network operation within an index directory lock.
+            String tempIndexDirName = "index.pull." + System.nanoTime();
+            String tempIndexDirPath = solrCore.getDataDir() + tempIndexDirName;
+            Directory tempIndexDir = solrCore.getDirectoryFactory().get(tempIndexDirPath, DirectoryFactory.DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
             try {
-              downloadFilesFromBlob(tmpIndexDir, resolvedMetadataResult.getFilesToPull());
+              String indexDirPath = solrCore.getIndexDir();
+              Collection<BlobFile> filesToDownload;
+              if (createNewIndexDir) {
+                // This is an optimization to not download everything from blob if possible
+                // This made sense for some rolling start scenario in TLOG replicas and makes here too
+                // https://issues.apache.org/jira/browse/SOLR-11920
+                // https://issues.apache.org/jira/browse/SOLR-11815
+                // TODO: We might want to skip this optimization when healing a locally corrupt core
+                Directory indexDir = solrCore.getDirectoryFactory().get(indexDirPath, DirectoryFactory.DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
+                try {
+                  filesToDownload = initializeNewIndexDirWithLocallyAvailableFiles(indexDir, tempIndexDir);
+                } finally {
+                  solrCore.getDirectoryFactory().release(indexDir);
+                }
+              } else {
+                filesToDownload = resolvedMetadataResult.getFilesToPull();
+              }
+              downloadFilesFromBlob(tempIndexDir, filesToDownload);
 
-              String indexDir = solrCore.getIndexDir();
-              Directory dir = solrCore.getDirectoryFactory().get(indexDir, DirectoryFactory.DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
-              
+              Directory indexDir = solrCore.getDirectoryFactory().get(indexDirPath, DirectoryFactory.DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
               try {
-                // Close the index writer to stop changes to this core
-                solrCore.getUpdateHandler().getSolrCoreState().closeIndexWriter(solrCore, true);
+                if (!createNewIndexDir) {
+                  // Close the index writer to stop changes to this core
+                  solrCore.getUpdateHandler().getSolrCoreState().closeIndexWriter(solrCore, true);
+                }
 
                 boolean thrownException = false;
                 try {
                   // Make sure Solr core directory content hasn't changed since we decided what we want to pull from Blob
-                  if (!solrServerMetadata.isSameDirectoryContent(dir)) {
+                  if (!solrServerMetadata.isSameDirectoryContent(indexDir)) {
                     // Maybe return something less aggressive than throwing an exception? TBD once we end up calling this method :)
-                    throw new Exception("Local Directory content " + indexDir + " has changed since Blob pull started. Aborting pull.");
+                    throw new Exception("Local Directory content " + indexDirPath + " has changed since Blob pull started. Aborting pull.");
                   }
 
-                  // Copy all files into the Solr directory (there are no naming conflicts since we're doing an Action.PULL)
-                  // Move the segments_N file last once all other are ok.
-                  String segmentsN = null;
-                  for (BlobCoreMetadata.BlobFile bf : resolvedMetadataResult.getFilesToPull()) {
-                    if (SharedStoreResolutionUtil.isSegmentsNFilename(bf)) {
-                      assert segmentsN == null;
-                      segmentsN = bf.getSolrFileName();
-                    } else {
-                      // Copy all non segments_N files
-                      moveFileToDirectory(solrCore, tmpIndexDir, bf.getSolrFileName(), dir);
-                    }
+                  if (createNewIndexDir) {
+                    // point index to the new directory
+                    coreSwitchedToNewIndexDir = solrCore.modifyIndexProps(tempIndexDirName);
+                  } else {
+                    moveFilesFromTempToIndexDir(solrCore, tempIndexDir, indexDir);
                   }
-                  assert segmentsN != null;
-                  // Copy segments_N file. From this point on the local core might be accessed and is up to date with Blob content
-                  moveFileToDirectory(solrCore, tmpIndexDir, segmentsN, dir);
                 } catch (Exception e) {
                   // Used in the finally below to not mask an exception thrown from the try block above
                   thrownException = true;
                   throw e;
                 } finally {
                   try {
-                    // The closed index writer must be opened back (in the finally bloc)
                     // TODO this has been observed to throw org.apache.lucene.index.CorruptIndexException on certain types of corruptions in Blob Store. We need to handle this correctly (maybe we already do).
-                    solrCore.getUpdateHandler().getSolrCoreState().openIndexWriter(solrCore);
+                    if (!createNewIndexDir) {
+                      // The closed index writer must be opened back (in the finally bloc)
+                      solrCore.getUpdateHandler().getSolrCoreState().openIndexWriter(solrCore);
+                    } else if (coreSwitchedToNewIndexDir) {
+                      solrCore.getUpdateHandler().newIndexWriter(true);
+                    }
                   } catch (IOException ioe) {
                     // TODO corrupt core handling happened here
                     // CorruptCoreHandler.notifyBlobPullFailure(container, coreName, blobMetadata);
@@ -317,10 +340,28 @@ public class CorePushPull {
                   }
                 }
               } finally {
-                  solrCore.getDirectoryFactory().release(dir);
+                try {
+                  if (coreSwitchedToNewIndexDir) {
+                    solrCore.getDirectoryFactory().doneWithDirectory(indexDir);
+                    solrCore.getDirectoryFactory().remove(indexDir);
+                  }
+                } catch (Exception e) {
+                  log.warn("Cannot remove previous index directory " + indexDir, e);
+                } finally {
+                  solrCore.getDirectoryFactory().release(indexDir);
+                }
               }
             } finally {
-                removeTempDirectory(solrCore, tmpIdxDirName, tmpIndexDir);
+              try {
+                if (!coreSwitchedToNewIndexDir) {
+                  solrCore.getDirectoryFactory().doneWithDirectory(tempIndexDir);
+                  solrCore.getDirectoryFactory().remove(tempIndexDir);
+                }
+              } catch (Exception e) {
+                log.warn("Cannot remove temp directory " + tempIndexDirPath, e);
+              } finally {
+                solrCore.getDirectoryFactory().release(tempIndexDir);
+              }
             }
             
             try {
@@ -353,7 +394,44 @@ public class CorePushPull {
         }
     }
 
-    /**
+  private void moveFilesFromTempToIndexDir(SolrCore solrCore, Directory tmpIndexDir, Directory dir) throws IOException {
+    // Copy all files into the Solr directory
+    // Move the segments_N file last once all other are ok.
+    String segmentsN = null;
+    for (BlobFile bf : resolvedMetadataResult.getFilesToPull()) {
+      if (SharedStoreResolutionUtil.isSegmentsNFilename(bf)) {
+        assert segmentsN == null;
+        segmentsN = bf.getSolrFileName();
+      } else {
+        // Copy all non segments_N files
+        moveFileToDirectory(solrCore, tmpIndexDir, bf.getSolrFileName(), dir);
+      }
+    }
+    assert segmentsN != null;
+    // Copy segments_N file. From this point on the local core might be accessed and is up to date with Blob content
+    moveFileToDirectory(solrCore, tmpIndexDir, segmentsN, dir);
+  }
+
+  private Collection<BlobFile> initializeNewIndexDirWithLocallyAvailableFiles(Directory indexDir, Directory newIndexDir) {
+    Collection<BlobFile> filesToDownload = new HashSet<>();
+      for (BlobFile blobFile : resolvedMetadataResult.getFilesToPull()) {
+        try (final IndexInput indexInput = indexDir.openInput(blobFile.getSolrFileName(), IOContext.READONCE)) {
+          long length = indexInput.length();
+          long checksum  = CodecUtil.retrieveChecksum(indexInput);
+          if (length == blobFile.getFileSize() && checksum == blobFile.getChecksum()) {
+            copyFileToDirectory(indexDir, blobFile.getSolrFileName(), newIndexDir);
+          } else {
+            filesToDownload.add(blobFile);
+          }
+        } catch (Exception ex){
+          // Either file does not exist locally or copy not succeeded, we will download from blob store
+          filesToDownload.add(blobFile);
+        }
+      }
+    return filesToDownload;
+  }
+
+  /**
      * Pushes a local file to blob store and returns a unique path to newly created blob  
      */
     @VisibleForTesting
@@ -366,13 +444,13 @@ public class CorePushPull {
         return blobPath;
     }
 
-    private void removeTempDirectory(SolrCore solrCore, String tempDirName, Directory tempDir) throws IOException {
+    private void removeTempDirectory(SolrCore solrCore, String tempDirPath, Directory tempDir) throws IOException {
         try {
             // Remove temp directory
             solrCore.getDirectoryFactory().doneWithDirectory(tempDir);
             solrCore.getDirectoryFactory().remove(tempDir);
         } catch (Exception e) {
-            log.warn("Cannot remove temp directory " + tempDirName, e);
+            log.warn("Cannot remove temp directory " + tempDirPath, e);
         } finally {
             solrCore.getDirectoryFactory().release(tempDir);
         }
@@ -390,11 +468,12 @@ public class CorePushPull {
       long now = System.currentTimeMillis();
       long runTime = now - startTimeMs;
       long startLatency = now - requestQueuedTimeMs;
- 
+
       String message = String.format("PushPullData=[%s] action=%s storageProvider=%s bucketRegion=%s bucketName=%s "
-        + "runTime=%s startLatency=%s bytesTransferred=%s attempt=%s filesAffected=%s",
-        pushPullData.toString(), action, coreStorageClient.getStorageProvider().name(), coreStorageClient.getBucketRegion(), 
-        coreStorageClient.getBucketName(), runTime, startLatency, bytesTransferred, attempt, filesAffected);
+              + "runTime=%s startLatency=%s bytesTransferred=%s attempt=%s filesAffected=%s localGenerationNumber=%s blobGenerationNumber=%s ",
+          pushPullData.toString(), action, coreStorageClient.getStorageProvider().name(), coreStorageClient.getBucketRegion(),
+          coreStorageClient.getBucketName(), runTime, startLatency, bytesTransferred, attempt, filesAffected,
+          solrServerMetadata.getGeneration(), blobMetadata.getGeneration());
       log.info(message);
     }
 
@@ -423,6 +502,7 @@ public class CorePushPull {
      * Copies {@code fileName} from {@code fromDir} to {@code toDir}
      */
     private void copyFileToDirectory(Directory fromDir, String fileName, Directory toDir) throws IOException {
+      // TODO: Consider optimizing with org.apache.lucene.store.HardlinkCopyDirectoryWrapper
       toDir.copyFrom(fromDir, fileName, fileName, DirectoryFactory.IOCONTEXT_NO_CACHE);
     }
 
@@ -431,6 +511,7 @@ public class CorePushPull {
      */
     private void moveFileToDirectory(SolrCore solrCore, Directory fromDir, String fileName, Directory toDir) throws IOException {
       // We don't need to keep the original files so we move them over.
+      // TODO: Consider optimizing with org.apache.lucene.store.HardlinkCopyDirectoryWrapper
       solrCore.getDirectoryFactory().move(fromDir, toDir, fileName, DirectoryFactory.IOCONTEXT_NO_CACHE);
     }
     
diff --git a/solr/core/src/java/org/apache/solr/store/blob/metadata/ServerSideMetadata.java b/solr/core/src/java/org/apache/solr/store/blob/metadata/ServerSideMetadata.java
index 7e2d3e0..bd3562b 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/metadata/ServerSideMetadata.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/metadata/ServerSideMetadata.java
@@ -5,11 +5,16 @@ import java.io.IOException;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Objects;
 
 import org.apache.commons.codec.binary.Hex;
+import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.DirectoryFactory;
 import org.apache.solr.core.SolrCore;
@@ -34,7 +39,13 @@ public class ServerSideMetadata {
      * Files composing the core. They are are referenced from the core's current commit point's segments_N file
      * which is ALSO included in this collection.
      */
-    private final ImmutableCollection<CoreFileData> files;
+    private final ImmutableCollection<CoreFileData> latestCommitFiles;
+
+    /**
+     * Index files related to current and previous commit points(if any).
+     * These files do not matter when pushing contents to blob but they do matter if blob content being pulled conflicts with them.
+     */
+    private final ImmutableCollection<CoreFileData> allCommitsFiles;
 
     /**
      * Hash of the directory content used to make sure the content doesn't change as we proceed to pull new files from Blob
@@ -42,6 +53,13 @@ public class ServerSideMetadata {
      */
     private final String directoryHash;
 
+    /**
+     * Generation number of the local index.
+     * This generation number is only meant to identify a scenario where local index generation number is higher than
+     * what we have in blob. In that scenario we would switch index to a new directory when pulling contents from blob. 
+     * Because in the presence of higher generation number locally, blob contents cannot establish their legitimacy.
+     */
+    private final long generation;
     private final SolrCore core;
     private final String coreName;
     private final CoreContainer container;
@@ -62,14 +80,17 @@ public class ServerSideMetadata {
         }
 
         try {
-            IndexCommit commit = core.getDeletionPolicy().getLatestCommit();
-            if (commit == null) {
+            IndexCommit latestCommit = core.getDeletionPolicy().getLatestCommit();
+            if (latestCommit == null) {
                 throw new BlobException("Core " + coreName + " has no available commit point");
             }
 
+            generation = latestCommit.getGeneration();
+
             // Work around possible bug returning same file multiple times by using a set here
             // See org.apache.solr.handler.ReplicationHandler.getFileList()
-            ImmutableCollection.Builder<CoreFileData> builder = new ImmutableSet.Builder<>();
+            ImmutableCollection.Builder<CoreFileData> latestCommitBuilder = new ImmutableSet.Builder<>();
+            ImmutableCollection.Builder<CoreFileData> allCommitsBuilder;
 
             Directory coreDir = core.getDirectoryFactory().get(core.getIndexDir(), DirectoryFactory.DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);
             try {
@@ -78,20 +99,45 @@ public class ServerSideMetadata {
                 // with the download.
                 directoryHash = getSolrDirectoryHash(coreDir);
 
-                for (String fileName : commit.getFileNames()) {
-                    // Note we add here all segment related files as well as the commit point's segments_N file
-                    // Note commit points do not contain lock (write.lock) files.
-                    builder.add(new CoreFileData(fileName, coreDir.fileLength(fileName)));
+                buildCommitFiles(coreDir, latestCommit, latestCommitBuilder);
+
+                // A note on listCommits says that it does not guarantee consistent results if a commit is in progress.
+                // But in blob context we serialize commits and pulls by proper locking therefore we should be good here.
+                List<IndexCommit> allCommits = DirectoryReader.listCommits(coreDir);
+
+                // optimization:  normally we would only be dealing with one commit point. In that case just reuse latest commit files builder.
+                if (allCommits.size() > 1 ){
+                    allCommitsBuilder = new ImmutableSet.Builder<>();
+                    for (IndexCommit commit: allCommits) {
+                        buildCommitFiles(coreDir, commit, allCommitsBuilder);
+                    }
+                } else {
+                    // we should always have a commit point as verified in the beginning of this method.
+                    assert allCommits.size() == 1 && allCommits.get(0).equals(latestCommit);
+                    allCommitsBuilder = latestCommitBuilder;
                 }
             } finally {
                 core.getDirectoryFactory().release(coreDir);
             }
-            files = builder.build();
+            latestCommitFiles = latestCommitBuilder.build();
+            allCommitsFiles = allCommitsBuilder.build();
         } finally {
             core.close();
         }
     }
 
+    private void buildCommitFiles(Directory coreDir, IndexCommit commit, ImmutableCollection.Builder<CoreFileData> builder) throws IOException {
+        for (String fileName : commit.getFileNames()) {
+            // Note we add here all segment related files as well as the commit point's segments_N file
+            // Note commit points do not contain lock (write.lock) files.
+            try (final IndexInput indexInput = coreDir.openInput(fileName, IOContext.READONCE)) {
+                long length = indexInput.length();
+                long checksum = CodecUtil.retrieveChecksum(indexInput);
+                builder.add(new CoreFileData(fileName, length, checksum));
+            }
+        }
+    }
+
     public String getCoreName() {
         return this.coreName;
     }
@@ -100,12 +146,20 @@ public class ServerSideMetadata {
         return this.container;
     }
 
+    public long getGeneration() {
+        return this.generation;
+    }
+
     public String getDirectoryHash() {
         return this.directoryHash;
     }
 
-    public ImmutableCollection<CoreFileData> getFiles(){
-        return this.files;
+    public ImmutableCollection<CoreFileData> getLatestCommitFiles(){
+        return this.latestCommitFiles;
+    }
+
+    public ImmutableCollection<CoreFileData> getAllCommitsFiles() {
+        return this.allCommitsFiles;
     }
 
     /**
@@ -149,18 +203,28 @@ public class ServerSideMetadata {
         return hash;
     }
 
+    @Override
+    public String toString() {
+        return "collectionName=" + core.getCoreDescriptor().getCollectionName() +
+            " shardName=" + core.getCoreDescriptor().getCloudDescriptor().getShardId() +
+            " coreName=" + core.getName() +
+            " generation=" + generation;
+    }
+
     /**
      * Information we capture per local core file (segments_N file *included*)
      */
     public static class CoreFileData {
         /** Local file name, no path */
-        public final String fileName;
+        private final String fileName;
         /** Size in bytes */
-        public final long fileSize;
+        private final long fileSize;
+        private final long checksum;
 
-        CoreFileData(String fileName, long fileSize) {
+        CoreFileData(String fileName, long fileSize, long checksum) {
             this.fileName = fileName;
             this.fileSize = fileSize;
+            this.checksum = checksum;
         }
 
         @Override
@@ -170,21 +234,27 @@ public class ServerSideMetadata {
 
             CoreFileData other = (CoreFileData) o;
 
-            return Objects.equals(fileSize, other.fileSize) && 
-                    Objects.equals(fileName, other.fileName);
+            return Objects.equals(fileName, other.fileName) &&
+                Objects.equals(fileSize, other.fileSize) &&
+                Objects.equals(checksum, other.checksum);
+        }
+
+        public String getFileName() {
+            return fileName;
+        }
+
+        public long getFileSize() {
+            return fileSize;
+        }
+
+        public long getChecksum() {
+            return checksum;
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(fileName, fileSize);
+            return Objects.hash(fileName, fileSize, checksum);
         }
-
     }
 
-    @Override
-    public String toString() {
-        return "collectionName=" + core.getCoreDescriptor().getCollectionName() +
-            " shardName=" + core.getCoreDescriptor().getCloudDescriptor().getShardId() + 
-            " coreName=" + core.getName();
-    }
 }
diff --git a/solr/core/src/java/org/apache/solr/store/blob/metadata/SharedStoreResolutionUtil.java b/solr/core/src/java/org/apache/solr/store/blob/metadata/SharedStoreResolutionUtil.java
index f96463d..2bc8054 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/metadata/SharedStoreResolutionUtil.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/metadata/SharedStoreResolutionUtil.java
@@ -1,5 +1,7 @@
 package org.apache.solr.store.blob.metadata;
 
+import java.lang.invoke.MethodHandles;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -9,13 +11,18 @@ import org.apache.solr.common.SolrException;
 import org.apache.solr.store.blob.client.BlobCoreMetadata;
 import org.apache.solr.store.blob.client.BlobCoreMetadata.BlobFile;
 import org.apache.solr.store.blob.metadata.ServerSideMetadata.CoreFileData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Utility class used to compare local {@link ServerSideMetadata} and remote 
  * {@link BlobCoreMetadata}, metadata of shard index data on the local solr node
  * and remote shared store (blob).
  */
-public class SharedStoreResolutionUtil {  
+public class SharedStoreResolutionUtil {
+
+  private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   private static String SEGMENTS_N_PREFIX = "segments_";
 
   public static class SharedMetadataResolutionResult {
@@ -23,25 +30,29 @@ public class SharedStoreResolutionUtil {
     private final Collection<CoreFileData> filesToPush;
     // blob files needed to be pulled
     private final Collection<BlobFile> filesToPull;
+    // Whether the local index contents conflict with contents to be pulled from blob. If they do we will move the
+    // core to new index dir when pulling blob contents
+    // Two cases:
+    //  1. local index is at higher generation number than blob's generation number
+    //  2. same index file exist in both places with different size/checksum
+    private final boolean localConflictingWithBlob;
     
-    public SharedMetadataResolutionResult(Collection<CoreFileData> localFilesMissingOnBlob, 
-        Collection<BlobFile> blobFilesMissingLocally) {
-      if (localFilesMissingOnBlob == null) {
+    
+    public SharedMetadataResolutionResult(Collection<CoreFileData> filesToPush, 
+        Collection<BlobFile> filesToPull, boolean localConflictingWithBlob) {
+      if (filesToPush == null) {
         this.filesToPush = Collections.emptySet();
       } else {
-        this.filesToPush = localFilesMissingOnBlob;
+        this.filesToPush = filesToPush;
       }
       
-      if (blobFilesMissingLocally == null) {
+      if (filesToPull == null) {
         this.filesToPull = Collections.emptySet();
       } else {
-        this.filesToPull = blobFilesMissingLocally;
+        this.filesToPull = filesToPull;
       }
-    }
-    
-    public SharedMetadataResolutionResult(Map<String, CoreFileData> localFilesMissingOnBlob, 
-        Map<String, BlobFile> blobFilesMissingLocally) {
-      this(localFilesMissingOnBlob.values(), blobFilesMissingLocally.values());
+
+      this.localConflictingWithBlob = localConflictingWithBlob;
     }
     
     public Collection<CoreFileData> getFilesToPush() {
@@ -51,6 +62,10 @@ public class SharedStoreResolutionUtil {
     public Collection<BlobFile> getFilesToPull() {
       return filesToPull;
     }
+    
+    public boolean isLocalConflictingWithBlob(){
+      return localConflictingWithBlob;
+    }
   }
   
   private SharedStoreResolutionUtil() {}
@@ -67,6 +82,7 @@ public class SharedStoreResolutionUtil {
   public static SharedMetadataResolutionResult resolveMetadata(ServerSideMetadata local, BlobCoreMetadata distant) {
     Map<String, CoreFileData> localFilesMissingOnBlob = new HashMap<>();
     Map<String, BlobFile> blobFilesMissingLocally = new HashMap<>();
+    Map<String, CoreFileData> allLocalFiles = new HashMap<>();
     
     if (local == null && distant == null) {
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Cannot resolve if both local and remote metadata is null"); 
@@ -74,8 +90,15 @@ public class SharedStoreResolutionUtil {
     
     if (local != null) {
       // Prepare local files for lookup by file name
-      for (CoreFileData cfd : local.getFiles()) {
-          localFilesMissingOnBlob.put(cfd.fileName, cfd);
+      
+      // for files to push only the current commit point matters
+      for (CoreFileData cfd : local.getLatestCommitFiles()) {
+          localFilesMissingOnBlob.put(cfd.getFileName(), cfd);
+      }
+
+      // for files to pull all the index files present locally matters
+      for (CoreFileData cfd : local.getAllCommitsFiles()) {
+        allLocalFiles.put(cfd.getFileName(), cfd);
       }
       // TODO we're not dealing here with local core on Solr server being corrupt. Not part of PoC at this stage but eventually need a solution
       // (fetch from Blob unless Blob corrupt as well...)
@@ -85,7 +108,7 @@ public class SharedStoreResolutionUtil {
         || distant.getBlobFiles().length == 0) {
       // The shard index data does not exist on the shared store. All we can do is push. 
       // We've computed localFilesMissingOnBlob above, and blobFilesMissingLocally is empty as it should be.
-      return new SharedMetadataResolutionResult(localFilesMissingOnBlob, blobFilesMissingLocally);
+      return new SharedMetadataResolutionResult(localFilesMissingOnBlob.values(), blobFilesMissingLocally.values(), false);
     }
     
     // Verify we find one and only one segments_N file to download from Blob.
@@ -111,30 +134,43 @@ public class SharedStoreResolutionUtil {
     }
     
     if (local == null) {
-      // The shard index data does not exist on the shared store. All we can do is pull.  
+      // The shard index data does not exist locally. All we can do is pull.  
       // We've computed blobFilesMissingLocally and localFilesMissingOnBlob is empty as it should be.
-      return new SharedMetadataResolutionResult(localFilesMissingOnBlob, blobFilesMissingLocally);
+      return new SharedMetadataResolutionResult(localFilesMissingOnBlob.values(), blobFilesMissingLocally.values(), false);
     }
-    
+
+    boolean localConflictingWithBlob = false;
     // Verify there are no inconsistencies between local index and blob index files
     for (BlobFile bf : distant.getBlobFiles()) {
       // We remove from map of local files those already present remotely since they don't have to be pushed.
-      CoreFileData cf = localFilesMissingOnBlob.remove(bf.getSolrFileName());
+      localFilesMissingOnBlob.remove(bf.getSolrFileName());
+      CoreFileData cf = allLocalFiles.get(bf.getSolrFileName());
       if (cf != null) {
-        // The blob file is present locally (by virtue of having been in localFilesMissingOnBlob initialized with
-        // all local files). Check if there is a conflict between local and distant (blob) versions of that file.
+        // The blob file is present locally. Check if there is a conflict between local and distant (blob) versions of that file.
         blobFilesMissingLocally.remove(bf.getSolrFileName());
-        // Later we could add checksum verification etc. here
-        if (cf.fileSize != bf.getFileSize()) {
-          // TODO - for now just log and propagate the error up
-          String errorMessage = "Size conflict for shared shard name: " + distant.getSharedBlobName() +". File " + 
-              bf.getSolrFileName() + " (Blob name " + bf.getBlobName() + ") local size " + 
-              cf.fileSize + " blob size " + bf.getFileSize() + " (core " + local.getCoreName() + ")";
-          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, errorMessage);
+        if (cf.getFileSize() != bf.getFileSize() || cf.getChecksum() != bf.getChecksum()) {
+          String message = String.format("Size/Checksum conflicts sharedShardName=%s coreName=%s fileName=%s blobName=%s" +
+                  " localSize=%s blobSize=%s localChecksum=%s blobCheckSum=%s",
+              distant.getSharedBlobName(), local.getCoreName(), bf.getSolrFileName(), bf.getBlobName(),
+              cf.getFileSize(), bf.getFileSize(), cf.getChecksum(), bf.getChecksum());
+          logger.info(message);
+          localConflictingWithBlob = true;
         }
       }
     }
-    return new SharedMetadataResolutionResult(localFilesMissingOnBlob, blobFilesMissingLocally);
+
+    if(!localConflictingWithBlob) {
+      // If local index generation number is higher than blob even than we will declare a conflict.
+      // Since in the presence of higher generation number locally, blob contents cannot establish their legitimacy.
+      localConflictingWithBlob = local.getGeneration() > distant.getGeneration();
+    }
+    // If there is a conflict we will switch index to a newer directory and pull all blob files.
+    // Later in the pipeline at the actual time of pull(CorePushPull#pullUpdateFromBlob) there is an optimization to make use of local index directory
+    // for already available files instead of downloading from blob. It was possible to design that into the contract of this 
+    // resolver to produce list of files to be pulled from blob and list of files to be pulled(read copied) from local index directory.
+    // But that would have unnecessarily convoluted the design of this resolver.
+    Collection<BlobFile> filesToPull = localConflictingWithBlob ? Arrays.asList(distant.getBlobFiles()) : blobFilesMissingLocally.values();
+    return new SharedMetadataResolutionResult(localFilesMissingOnBlob.values(), filesToPull, localConflictingWithBlob);
   }
   
   /** Identify the segments_N file in Blob files. */
diff --git a/solr/core/src/test/org/apache/solr/store/blob/client/CoreStorageClientTest.java b/solr/core/src/test/org/apache/solr/store/blob/client/CoreStorageClientTest.java
index 4e2fea2..1593f2d 100644
--- a/solr/core/src/test/org/apache/solr/store/blob/client/CoreStorageClientTest.java
+++ b/solr/core/src/test/org/apache/solr/store/blob/client/CoreStorageClientTest.java
@@ -171,7 +171,7 @@ public class CoreStorageClientTest extends SolrTestCaseJ4 {
   @Test
   public void testPushPullCoreMetadata() throws Exception {
     blobClient.deleteCore(TEST_CORE_NAME_1);
-    BlobCoreMetadata pushedBcm = new BlobCoreMetadataBuilder(TEST_CORE_NAME_1).build();
+    BlobCoreMetadata pushedBcm = new BlobCoreMetadataBuilder(TEST_CORE_NAME_1, 19L).build();
     Assert.assertNull(blobClient.pullCoreMetadata(TEST_CORE_NAME_1, "core.metadata"));
     
     blobClient.pushCoreMetadata(TEST_CORE_NAME_1, "core.metadata", pushedBcm);
diff --git a/solr/core/src/test/org/apache/solr/store/blob/metadata/BlobCoreMetadataTest.java b/solr/core/src/test/org/apache/solr/store/blob/metadata/BlobCoreMetadataTest.java
index f989c2d..9a3cd62 100644
--- a/solr/core/src/test/org/apache/solr/store/blob/metadata/BlobCoreMetadataTest.java
+++ b/solr/core/src/test/org/apache/solr/store/blob/metadata/BlobCoreMetadataTest.java
@@ -13,23 +13,30 @@ import org.apache.solr.store.blob.client.BlobCoreMetadataBuilder;
 public class BlobCoreMetadataTest extends Assert {
 
     final String SHARED_BLOB_NAME = "collectionName_shardNameTest";
+    final long GENERATION_NUMBER = 456;
+    final long FILE_SIZE = 123000;
+    final long CHECKSUM = 100;
 
     @Test
     public void buildCoreMetadataNoFiles() throws Exception {
-        BlobCoreMetadata bcm = new BlobCoreMetadataBuilder(SHARED_BLOB_NAME).build();
+        BlobCoreMetadata bcm = new BlobCoreMetadataBuilder(SHARED_BLOB_NAME, GENERATION_NUMBER).build();
 
         assertEquals("Blob metadata without any files should not have any files", 0, bcm.getBlobFiles().length);
         assertEquals("Blob metadata should have specified shared blob name", SHARED_BLOB_NAME, bcm.getSharedBlobName());
+        assertEquals("Blob metadata should have specified generation", GENERATION_NUMBER, bcm.getGeneration());
     }
 
     @Test
     public void buildCoreMetadataWithFile() throws Exception {
-        BlobCoreMetadata bcm = new BlobCoreMetadataBuilder(SHARED_BLOB_NAME)
-            .addFile(new BlobCoreMetadata.BlobFile("solrFilename", "blobFilename", 123000L)).build();
+        BlobCoreMetadata bcm = new BlobCoreMetadataBuilder(SHARED_BLOB_NAME, GENERATION_NUMBER)
+            .addFile(new BlobCoreMetadata.BlobFile("solrFilename", "blobFilename", FILE_SIZE, CHECKSUM)).build();
 
         assertEquals("Blob metadata should have specified shared blob name", SHARED_BLOB_NAME, bcm.getSharedBlobName());
+        assertEquals("Blob metadata should have specified generation", GENERATION_NUMBER, bcm.getGeneration());
         assertEquals("Blob metadata should have the correct number of added files", 1, bcm.getBlobFiles().length);
         assertEquals("Blob metadata file should have correct solr filename", "solrFilename", bcm.getBlobFiles()[0].getSolrFileName());
         assertEquals("Blob metadata file should have correct blob store filename", "blobFilename", bcm.getBlobFiles()[0].getBlobName());
+        assertEquals("Blob metadata file should have correct file size", FILE_SIZE, bcm.getBlobFiles()[0].getFileSize());
+        assertEquals("Blob metadata file should have correct checksum", CHECKSUM, bcm.getBlobFiles()[0].getChecksum());
     }
 }
diff --git a/solr/core/src/test/org/apache/solr/store/blob/metadata/CorePushPullTest.java b/solr/core/src/test/org/apache/solr/store/blob/metadata/CorePushPullTest.java
index 71e187a..6121199 100644
--- a/solr/core/src/test/org/apache/solr/store/blob/metadata/CorePushPullTest.java
+++ b/solr/core/src/test/org/apache/solr/store/blob/metadata/CorePushPullTest.java
@@ -171,7 +171,95 @@ public class CorePushPullTest extends SolrTestCaseJ4 {
     assertU(commit());
     assertQ(req("*:*"), "//*[@numFound='2']");
   }
-  
+
+  /**
+   * Tests that pull in the presence of higher local generation number is successful 
+   */
+  @Test
+  public void testLocalHigherGenerationConflictingPullSucceeds() throws Exception {
+    SolrCore core = h.getCore();
+
+    // add a doc that would be pushed to blob
+    assertU(adoc("id", "1"));
+    assertU(commit());
+
+    // the doc should be present
+    assertQ(req("*:*"),  xpathMatches("1"));
+
+    // do a push via CorePushPull, the returned BlobCoreMetadata is what we'd expect to find
+    // on the blob store
+    BlobCoreMetadata returnedBcm = doPush(core);
+
+    // add another doc but that would not be pushed to blob
+    assertU(adoc("id", "2"));
+    assertU(commit());
+
+    // the doc should be present
+    assertQ(req("*:*"),  xpathMatches("1", "2"));
+
+    long localGeneration = core.getDeletionPolicy().getLatestCommit().getGeneration();
+    assertTrue("Local generation is incorrectly not greater than blob generation", localGeneration > returnedBcm.getGeneration());
+
+    // now perform a pull, since blob being source of truth this pull should undo the addition of doc 2
+    SharedMetadataResolutionResult resResult = doPull(core, returnedBcm);
+
+    assertTrue("Pull is incorrectly not identified as conflicting", resResult.isLocalConflictingWithBlob());
+
+    // doc 1 should be present but not doc 2
+    assertQ(req("*:*"), xpathMatches("1"));
+    // for sanity index another doc
+    assertU(adoc("id", "3"));
+    assertU(commit());
+    assertQ(req("*:*"), xpathMatches("1", "3"));
+  }
+
+  /**
+   * Tests that pull in the presence of conflicting files is successful 
+   */
+  @Test
+  public void testConflictingFilesPullSucceeds() throws Exception {
+    SolrCore core = h.getCore();
+
+    // add a doc that would be pushed to blob
+    assertU(adoc("id", "1"));
+    assertU(commit());
+
+    // the doc should be present
+    assertQ(req("*:*"), xpathMatches("1"));
+
+    // do a push via CorePushPull, the returned BlobCoreMetadata is what we'd expect to find
+    // on the blob store
+    BlobCoreMetadata returnedBcm = doPush(core);
+
+    // Delete the core to clear the index data and then re-create it 
+    deleteCore();
+    initCore("solrconfig.xml", "schema-minimal.xml");
+    core = h.getCore();
+
+    // add a different doc, we will not push this to blob
+    assertU(adoc("id", "2"));
+    assertU(commit());
+
+    // the doc should be present
+    assertQ(req("*:*"), xpathMatches("2"));
+
+    // now blob and local should be at same generation number but different contents(conflicting files)
+    long localGeneration = core.getDeletionPolicy().getLatestCommit().getGeneration();
+    assertEquals("Local generation is not equal to blob generation", localGeneration, returnedBcm.getGeneration());
+
+    // now perform a pull
+    SharedMetadataResolutionResult resResult = doPull(core, returnedBcm);
+
+    assertTrue("Pull is not identified as conflicting", resResult.isLocalConflictingWithBlob());
+
+    // the doc should be present, and blob should prevail as source of truth i.e. we go back to doc 1
+    assertQ(req("*:*"), xpathMatches("1"));
+    // add another doc for sanity
+    assertU(adoc("id", "3"));
+    assertU(commit());
+    assertQ(req("*:*"), xpathMatches("1", "3"));
+  }
+
   private BlobCoreMetadata doPush(SolrCore core) throws Exception {
     // build the require metadata
     ServerSideMetadata solrServerMetadata = new ServerSideMetadata(core.getName(), h.getCoreContainer());
@@ -202,7 +290,7 @@ public class CorePushPullTest extends SolrTestCaseJ4 {
     return pushPull.pushToBlobStore();
   }
   
-  private void doPull(SolrCore core, BlobCoreMetadata bcm) throws Exception {
+  private SharedMetadataResolutionResult doPull(SolrCore core, BlobCoreMetadata bcm) throws Exception {
     // build the require metadata
     ServerSideMetadata solrServerMetadata = new ServerSideMetadata(core.getName(), h.getCoreContainer());
     
@@ -220,5 +308,18 @@ public class CorePushPullTest extends SolrTestCaseJ4 {
     
     CorePushPull pushPull = new CorePushPull(storageClient, deleteManager, ppd, resResult, solrServerMetadata, bcm);
     pushPull.pullUpdateFromBlob(true);
-  }  
+    return resResult;
+  }
+
+  private String[] xpathMatches(String... docIds) {
+    String[] tests = new String[docIds != null ? docIds.length + 1 : 1];
+    tests[0] = "*[count(//doc)=" + (tests.length-1) + "]";
+    if (docIds != null && docIds.length > 0) {
+      int i = 1;
+      for (String docId : docIds) {
+        tests[i++] = "//result/doc/str[@name='id'][.='" + docId + "']";
+      }
+    }
+    return tests;
+  }
 }
diff --git a/solr/core/src/test/org/apache/solr/store/blob/metadata/DeleteBlobStrategyTest.java b/solr/core/src/test/org/apache/solr/store/blob/metadata/DeleteBlobStrategyTest.java
index f4d9fb1d..ade1381 100644
--- a/solr/core/src/test/org/apache/solr/store/blob/metadata/DeleteBlobStrategyTest.java
+++ b/solr/core/src/test/org/apache/solr/store/blob/metadata/DeleteBlobStrategyTest.java
@@ -51,18 +51,18 @@ public class DeleteBlobStrategyTest {
     };
 
     // Now create a metadata builder with a set of files to delete and verify the right ones are deleted.
-    BlobCoreMetadataBuilder bcmBuilder = new BlobCoreMetadataBuilder("randomSharedName");
+    BlobCoreMetadataBuilder bcmBuilder = new BlobCoreMetadataBuilder("randomSharedName", 0L);
 
     // file 1 deleted at 5000 (should be removed)
-    BlobCoreMetadata.BlobFileToDelete blobFile1 = new BlobCoreMetadata.BlobFileToDelete("solrFile1", "BlobFile1", 1234L, 5000L);
+    BlobCoreMetadata.BlobFileToDelete blobFile1 = new BlobCoreMetadata.BlobFileToDelete("solrFile1", "BlobFile1", 1234L, 100L, 5000L);
     // file 2 deleted at 15000 (should NOT be removed)
-    BlobCoreMetadata.BlobFileToDelete blobFile2 = new BlobCoreMetadata.BlobFileToDelete("solrFile2", "BlobFile2", 1234L, 15000L);
+    BlobCoreMetadata.BlobFileToDelete blobFile2 = new BlobCoreMetadata.BlobFileToDelete("solrFile2", "BlobFile2", 1234L, 100L, 15000L);
     // file 3 deleted at 1000 (should be removed)
-    BlobCoreMetadata.BlobFileToDelete blobFile3 = new BlobCoreMetadata.BlobFileToDelete("solrFile3", "BlobFile3", 1234L, 1000L);
+    BlobCoreMetadata.BlobFileToDelete blobFile3 = new BlobCoreMetadata.BlobFileToDelete("solrFile3", "BlobFile3", 1234L, 100L, 1000L);
     // file 4 deleted at 1000000000 (should not be removed)
-    BlobCoreMetadata.BlobFileToDelete blobFile4 = new BlobCoreMetadata.BlobFileToDelete("solrFile4", "BlobFile4", 1234L, 1000000000L);
+    BlobCoreMetadata.BlobFileToDelete blobFile4 = new BlobCoreMetadata.BlobFileToDelete("solrFile4", "BlobFile4", 1234L, 100L, 1000000000L);
     // file 5 deleted at 1000000000 (should not be removed)
-    BlobCoreMetadata.BlobFileToDelete blobFile5 = new BlobCoreMetadata.BlobFileToDelete("solrFile5", "BlobFile5", 1234L, 1000000000L);
+    BlobCoreMetadata.BlobFileToDelete blobFile5 = new BlobCoreMetadata.BlobFileToDelete("solrFile5", "BlobFile5", 1234L, 100L, 1000000000L);
       
     bcmBuilder.addFileToDelete(blobFile1);
     bcmBuilder.addFileToDelete(blobFile2);
@@ -123,11 +123,11 @@ public class DeleteBlobStrategyTest {
     };
 
     // Now create a metadata builder with a set of files to delete
-    BlobCoreMetadataBuilder bcmBuilder = new BlobCoreMetadataBuilder("randomSharedName");
+    BlobCoreMetadataBuilder bcmBuilder = new BlobCoreMetadataBuilder("randomSharedName", 0L);
 
-    bcmBuilder.addFileToDelete(new BlobCoreMetadata.BlobFileToDelete("solrFile1", "BlobFile1", 1234L, 123456L));
-    bcmBuilder.addFileToDelete(new BlobCoreMetadata.BlobFileToDelete("solrFile2", "BlobFile2", 1234L, 234567L));
-    bcmBuilder.addFileToDelete(new BlobCoreMetadata.BlobFileToDelete("solrFile3", "BlobFile3", 1234L, 987654321L));
+    bcmBuilder.addFileToDelete(new BlobCoreMetadata.BlobFileToDelete("solrFile1", "BlobFile1", 1234L, 100L, 123456L));
+    bcmBuilder.addFileToDelete(new BlobCoreMetadata.BlobFileToDelete("solrFile2", "BlobFile2", 1234L, 100L, 234567L));
+    bcmBuilder.addFileToDelete(new BlobCoreMetadata.BlobFileToDelete("solrFile3", "BlobFile3", 1234L, 100L, 987654321L));
 
     // Call the delete code
     deleteButFailToEnqueue.enqueueForHardDelete(bcmBuilder);
diff --git a/solr/core/src/test/org/apache/solr/store/blob/metadata/SharedStoreResolutionUtilTest.java b/solr/core/src/test/org/apache/solr/store/blob/metadata/SharedStoreResolutionUtilTest.java
index fcdd104..acc07ea 100644
--- a/solr/core/src/test/org/apache/solr/store/blob/metadata/SharedStoreResolutionUtilTest.java
+++ b/solr/core/src/test/org/apache/solr/store/blob/metadata/SharedStoreResolutionUtilTest.java
@@ -51,7 +51,7 @@ public class SharedStoreResolutionUtilTest extends SolrCloudSharedStoreTestCase
    * Test that passing both local and blob as null to {@link SharedStoreResolutionUtil} throws exception.
    */
   @Test
-  public void testResolveMetadata() throws Exception {
+  public void testBothLocalAndBlobNullThrows() throws Exception {
     try {
       SharedStoreResolutionUtil.resolveMetadata(null, null);
       fail("SharedStoreResolutionUtil did not throw IllegalStateException");
@@ -67,12 +67,11 @@ public class SharedStoreResolutionUtilTest extends SolrCloudSharedStoreTestCase
   @Test
   public void testNoOpWhenBCMEmpty() throws Exception {
     String sharedShardName = "sharedShardName";
-    final BlobCoreMetadata blobMetadata = new BlobCoreMetadataBuilder(sharedShardName)
-        .build();
+    final BlobCoreMetadata blobMetadata = BlobCoreMetadataBuilder.buildEmptyCoreMetadata(sharedShardName);
       
     // expected resolution
     SharedMetadataResolutionResult expectedResult = new 
-        SharedMetadataResolutionResult(Collections.emptySet(), Collections.emptySet());
+        SharedMetadataResolutionResult(Collections.emptySet(), Collections.emptySet(), false);
     
     // do resolution
     SharedMetadataResolutionResult actual = SharedStoreResolutionUtil.resolveMetadata(null, blobMetadata);
@@ -87,14 +86,15 @@ public class SharedStoreResolutionUtilTest extends SolrCloudSharedStoreTestCase
   public void testPushNeededWhenBlobNull() throws Exception {
     String coreName = "localReplica";
     final long localFileSize = 10;
+    final long localFileChecksum = 100;
     // local core metadata
-    final ServerSideMetadata serverMetadata = new ServerSideCoreMetadataBuilder(coreName)
-      .addFile(new CoreFileData(getSolrSegmentFileName(1), localFileSize))
+    final ServerSideMetadata serverMetadata = new ServerSideCoreMetadataBuilder(coreName, 1L)
+      .addLatestCommitFile(new CoreFileData(getSolrSegmentFileName(1), localFileSize, localFileChecksum))
       .build();
       
     // expected resolution
     SharedMetadataResolutionResult expectedResult = new 
-        SharedMetadataResolutionResult(serverMetadata.getFiles(), Collections.emptySet());
+        SharedMetadataResolutionResult(serverMetadata.getLatestCommitFiles(), Collections.emptySet(), false);
     
     // do resolution
     SharedMetadataResolutionResult actual = SharedStoreResolutionUtil.resolveMetadata(serverMetadata, null);
@@ -109,14 +109,15 @@ public class SharedStoreResolutionUtilTest extends SolrCloudSharedStoreTestCase
   public void testPullNeededWhenLocalNull() throws Exception {
     String sharedShardName = "sharedShardName";
     final long blobFileSize = 10;
+    final long blobFileChecksum = 100;
     // blob core metadata
-    final BlobCoreMetadata blobMetadata = new BlobCoreMetadataBuilder(sharedShardName)
-        .addFile(new BlobFile(getSolrSegmentFileName(1), getBlobFileName(1), blobFileSize))
+    final BlobCoreMetadata blobMetadata = new BlobCoreMetadataBuilder(sharedShardName, 1L)
+        .addFile(new BlobFile(getSolrSegmentFileName(1), getBlobFileName(1), blobFileSize, blobFileChecksum))
         .build();
 
     // expected resolution
     SharedMetadataResolutionResult expectedResult = new 
-        SharedMetadataResolutionResult(Collections.emptySet(), Arrays.asList(blobMetadata.getBlobFiles()));
+        SharedMetadataResolutionResult(Collections.emptySet(), Arrays.asList(blobMetadata.getBlobFiles()), false);
     
     // do resolution
     SharedMetadataResolutionResult actual = SharedStoreResolutionUtil.resolveMetadata(null, blobMetadata);
@@ -130,10 +131,11 @@ public class SharedStoreResolutionUtilTest extends SolrCloudSharedStoreTestCase
   public void testSegmentNCorruptionThrowsError() throws Exception {
     String sharedShardName = "sharedShardName";
     final long blobFileSize = 10;
+    final long blobFileChecksum = 100;
     // blob core metadata with duplicate segment_n files
-    BlobCoreMetadata blobMetadata = new BlobCoreMetadataBuilder(sharedShardName)
-        .addFile(new BlobFile(getSolrSegmentFileName(1), getBlobFileName(1), blobFileSize))
-        .addFile(new BlobFile(getSolrSegmentFileName(1), getBlobFileName(2), blobFileSize))
+    BlobCoreMetadata blobMetadata = new BlobCoreMetadataBuilder(sharedShardName, 1L)
+        .addFile(new BlobFile(getSolrSegmentFileName(1), getBlobFileName(1), blobFileSize, blobFileChecksum))
+        .addFile(new BlobFile(getSolrSegmentFileName(1), getBlobFileName(2), blobFileSize, blobFileChecksum))
         .build();
     
     // do resolution
@@ -145,8 +147,8 @@ public class SharedStoreResolutionUtilTest extends SolrCloudSharedStoreTestCase
     }
     
     // blob core metadata with missing segment_n file while we have other segment files
-    blobMetadata = new BlobCoreMetadataBuilder(sharedShardName)
-        .addFile(new BlobFile(getSolrFileName(1), getBlobFileName(1), blobFileSize))
+    blobMetadata = new BlobCoreMetadataBuilder(sharedShardName, 1L)
+        .addFile(new BlobFile(getSolrFileName(1), getBlobFileName(1), blobFileSize, blobFileChecksum))
         .build();
     // do resolution
     try {
@@ -160,25 +162,27 @@ public class SharedStoreResolutionUtilTest extends SolrCloudSharedStoreTestCase
   /**
    * Test resolve of both {@link BlobCoreMetadata} and {@link ServerSideMetadata} contains files to push only
    */
+  @Test
   public void testFilesToPushResolution() throws Exception {
     String coreName = "coreName";
     String sharedShardName = "sharedShardName";
     final long fileSize = 10;
+    final long checksum = 100;
+
+    CoreFileData expectedFileToPush = new CoreFileData(getSolrFileName(1), fileSize, checksum);
     
-    CoreFileData expectedFileToPush = new CoreFileData(getSolrFileName(1), fileSize);
-    
-    final ServerSideMetadata serverMetadata = new ServerSideCoreMetadataBuilder(coreName)
-        .addFile(new CoreFileData(getSolrSegmentFileName(1), fileSize))
-        .addFile(expectedFileToPush)
+    final ServerSideMetadata serverMetadata = new ServerSideCoreMetadataBuilder(coreName, 1L)
+        .addLatestCommitFile(new CoreFileData(getSolrSegmentFileName(1), fileSize, checksum))
+        .addLatestCommitFile(expectedFileToPush)
         .build();
     
-    final BlobCoreMetadata blobMetadata = new BlobCoreMetadataBuilder(sharedShardName)
-        .addFile(new BlobFile(getSolrSegmentFileName(1), getBlobFileName(1), fileSize))
+    final BlobCoreMetadata blobMetadata = new BlobCoreMetadataBuilder(sharedShardName, 1L)
+        .addFile(new BlobFile(getSolrSegmentFileName(1), getBlobFileName(1), fileSize, checksum))
         .build();
     
     // expected resolution
     SharedMetadataResolutionResult expectedResult = new 
-        SharedMetadataResolutionResult(Arrays.asList(expectedFileToPush), Collections.emptySet());
+        SharedMetadataResolutionResult(Arrays.asList(expectedFileToPush), Collections.emptySet(), false);
     
     // do resolution
     SharedMetadataResolutionResult actual = SharedStoreResolutionUtil.resolveMetadata(serverMetadata, blobMetadata);
@@ -188,25 +192,27 @@ public class SharedStoreResolutionUtilTest extends SolrCloudSharedStoreTestCase
   /**
    * Test resolve of both {@link BlobCoreMetadata} and {@link ServerSideMetadata} contains files to pull only
    */
+  @Test
   public void testFilesToPullResolution() throws Exception {
     String coreName = "coreName";
     String sharedShardName = "sharedShardName";
     final long fileSize = 10;
+    final long checksum = 100;
+
+    BlobFile expectedFileToPull = new BlobFile(getSolrFileName(1),  getBlobFileName(1), fileSize, checksum);
     
-    BlobFile expectedFileToPull = new BlobFile(getSolrFileName(1),  getBlobFileName(1), fileSize);
-    
-    final ServerSideMetadata serverMetadata = new ServerSideCoreMetadataBuilder(coreName)
-        .addFile(new CoreFileData(getSolrSegmentFileName(1), fileSize))
+    final ServerSideMetadata serverMetadata = new ServerSideCoreMetadataBuilder(coreName, 1L)
+        .addLatestCommitFile(new CoreFileData(getSolrSegmentFileName(1), fileSize, checksum))
         .build();
     
-    final BlobCoreMetadata blobMetadata = new BlobCoreMetadataBuilder(sharedShardName)
-        .addFile(new BlobFile(getSolrSegmentFileName(1), getBlobFileName(1), fileSize))
+    final BlobCoreMetadata blobMetadata = new BlobCoreMetadataBuilder(sharedShardName, 1L)
+        .addFile(new BlobFile(getSolrSegmentFileName(1), getBlobFileName(1), fileSize, checksum))
         .addFile(expectedFileToPull)
         .build();
     
     // expected resolution
     SharedMetadataResolutionResult expectedResult = new 
-        SharedMetadataResolutionResult(Collections.emptySet(), Arrays.asList(expectedFileToPull));
+        SharedMetadataResolutionResult(Collections.emptySet(), Arrays.asList(expectedFileToPull), false);
     
     // do resolution
     SharedMetadataResolutionResult actual = SharedStoreResolutionUtil.resolveMetadata(serverMetadata, blobMetadata);
@@ -217,36 +223,129 @@ public class SharedStoreResolutionUtilTest extends SolrCloudSharedStoreTestCase
    * Test resolve of both {@link BlobCoreMetadata} and {@link ServerSideMetadata} contains both files to 
    * push and pull
    */
+  @Test
   public void testFilesToPushPullResolution() throws Exception {
     String coreName = "coreName";
     String sharedShardName = "sharedShardName";
     final long fileSize = 10;
+    final long checksum = 100;
+
+    CoreFileData expectedFileToPush = new CoreFileData(getSolrFileName(3), fileSize, checksum);
+    BlobFile expectedFileToPull = new BlobFile(getSolrFileName(2),  getBlobFileName(2), fileSize, checksum);
     
-    CoreFileData expectedFileToPush = new CoreFileData(getSolrFileName(1), fileSize);
-    BlobFile expectedFileToPull = new BlobFile(getSolrFileName(1),  getBlobFileName(1), fileSize);
-    
-    final ServerSideMetadata serverMetadata = new ServerSideCoreMetadataBuilder(coreName)
-        .addFile(new CoreFileData(getSolrSegmentFileName(1), fileSize))
-        .addFile(expectedFileToPush)
+    final ServerSideMetadata serverMetadata = new ServerSideCoreMetadataBuilder(coreName, 1L)
+        .addLatestCommitFile(new CoreFileData(getSolrSegmentFileName(1), fileSize, checksum))
+        .addLatestCommitFile(expectedFileToPush)
         .build();
     
-    final BlobCoreMetadata blobMetadata = new BlobCoreMetadataBuilder(sharedShardName)
-        .addFile(new BlobFile(getSolrSegmentFileName(1), getBlobFileName(1), fileSize))
+    final BlobCoreMetadata blobMetadata = new BlobCoreMetadataBuilder(sharedShardName, 1L)
+        .addFile(new BlobFile(getSolrSegmentFileName(1), getBlobFileName(1), fileSize, checksum))
         .addFile(expectedFileToPull)
         .build();
     
     // expected resolution
     SharedMetadataResolutionResult expectedResult = new 
-        SharedMetadataResolutionResult(Arrays.asList(expectedFileToPush), Arrays.asList(expectedFileToPull));
+        SharedMetadataResolutionResult(Arrays.asList(expectedFileToPush), Arrays.asList(expectedFileToPull), false);
     
     // do resolution
     SharedMetadataResolutionResult actual = SharedStoreResolutionUtil.resolveMetadata(serverMetadata, blobMetadata);
     assertSharedStoreResolutionResult(expectedResult, actual);
   }
-  
+
+  /**
+   * Tests that local generation number being higher than blob resolves into a conflict 
+   */
+  @Test
+  public void testHigherLocalGenerationResolution() throws Exception {
+    String coreName = "coreName";
+    String sharedShardName = "sharedShardName";
+    final long fileSize = 10;
+    final long checksum = 100;
+
+    CoreFileData expectedFileToPush = new CoreFileData(getSolrSegmentFileName(2), fileSize, checksum);
+
+    final ServerSideMetadata serverMetadata = new ServerSideCoreMetadataBuilder(coreName, 2L)
+        .addLatestCommitFile(expectedFileToPush)
+        .addLatestCommitFile(new CoreFileData(getSolrFileName(1), fileSize, checksum))
+        .build();
+
+    final BlobCoreMetadata blobMetadata = new BlobCoreMetadataBuilder(sharedShardName, 1L)
+        .addFile(new BlobFile(getSolrSegmentFileName(1), getBlobFileName(1), fileSize, checksum))
+        .addFile(new BlobFile(getSolrFileName(1), getBlobFileName(1), fileSize, checksum))
+        .build();
+
+    // expected resolution
+    SharedMetadataResolutionResult expectedResult = new
+        SharedMetadataResolutionResult(Arrays.asList(expectedFileToPush), Arrays.asList(blobMetadata.getBlobFiles()), true);
+
+    // do resolution
+    SharedMetadataResolutionResult actual = SharedStoreResolutionUtil.resolveMetadata(serverMetadata, blobMetadata);
+    assertSharedStoreResolutionResult(expectedResult, actual);
+  }
+
+  /**
+   * Tests that file belonging to latest commit point being present both locally and in blob but with different checksum
+   * resolves into a conflict 
+   */
+  @Test
+  public void testFileFromLatestCommitConflictsResolution() throws Exception {
+    String coreName = "coreName";
+    String sharedShardName = "sharedShardName";
+    final long fileSize = 10;
+
+    final ServerSideMetadata serverMetadata = new ServerSideCoreMetadataBuilder(coreName, 1L)
+        .addLatestCommitFile(new CoreFileData(getSolrSegmentFileName(1), fileSize, 99))
+        .build();
+
+    final BlobCoreMetadata blobMetadata = new BlobCoreMetadataBuilder(sharedShardName, 1L)
+        .addFile(new BlobFile(getSolrSegmentFileName(1), getBlobFileName(1), fileSize, 100))
+        .build();
+
+    // expected resolution
+    SharedMetadataResolutionResult expectedResult = new
+        SharedMetadataResolutionResult(Collections.emptySet(), Arrays.asList(blobMetadata.getBlobFiles()), true);
+
+    // do resolution
+    SharedMetadataResolutionResult actual = SharedStoreResolutionUtil.resolveMetadata(serverMetadata, blobMetadata);
+    assertSharedStoreResolutionResult(expectedResult, actual);
+  }
+
+  /**
+   * Tests that file belonging to previous commit point being present both locally and in blob but with different checksum
+   * resolves into a conflict 
+   */
+  @Test
+  public void testFileFromPreviousCommitConflictsResolution() throws Exception {
+    String coreName = "coreName";
+    String sharedShardName = "sharedShardName";
+    final long fileSize = 10;
+    final long checksum = 100;
+
+    CoreFileData expectedFileToPush = new CoreFileData(getSolrSegmentFileName(2), fileSize, checksum);
+
+    final ServerSideMetadata serverMetadata = new ServerSideCoreMetadataBuilder(coreName, 2L)
+        .addLatestCommitFile(expectedFileToPush)
+        .addAllCommitsFile(new CoreFileData(getSolrFileName(1), fileSize, 99))
+        .build();
+
+    final BlobCoreMetadata blobMetadata = new BlobCoreMetadataBuilder(sharedShardName, 3L)
+        .addFile(new BlobFile(getSolrSegmentFileName(3), getBlobFileName(3), fileSize, checksum))
+        .addFile(new BlobFile(getSolrFileName(1), getBlobFileName(1), fileSize, 100))
+        .build();
+
+    // expected resolution
+    SharedMetadataResolutionResult expectedResult = new
+        SharedMetadataResolutionResult(Arrays.asList(expectedFileToPush), Arrays.asList(blobMetadata.getBlobFiles()), true);
+
+    // do resolution
+    SharedMetadataResolutionResult actual = SharedStoreResolutionUtil.resolveMetadata(serverMetadata, blobMetadata);
+    assertSharedStoreResolutionResult(expectedResult, actual);
+  }
+
   private static void assertSharedStoreResolutionResult(SharedMetadataResolutionResult expected, SharedMetadataResolutionResult actual) {
     assertCollections("filesToPull", expected.getFilesToPull(), actual.getFilesToPull());
     assertCollections("filesToPush", expected.getFilesToPush(), actual.getFilesToPush());
+    assertEquals("localConflictingWithBlob", expected.isLocalConflictingWithBlob(), actual.isLocalConflictingWithBlob());
   }
   
   /**
@@ -284,22 +383,34 @@ public class SharedStoreResolutionUtilTest extends SolrCloudSharedStoreTestCase
    */
   private static class ServerSideCoreMetadataBuilder {
     final private String coreName;
-    final private ImmutableSet.Builder<CoreFileData> files;
+    final private long generation;
+    final private ImmutableSet.Builder<CoreFileData> latestCommitFiles;
+    final private ImmutableSet.Builder<CoreFileData> allCommitsFiles;
 
-    ServerSideCoreMetadataBuilder(String coreName) {
+    ServerSideCoreMetadataBuilder(String coreName, Long generation) {
       this.coreName = coreName;
-      files = new ImmutableSet.Builder();
+      this.generation = generation;
+      latestCommitFiles = new ImmutableSet.Builder();
+      allCommitsFiles = new ImmutableSet.Builder();
+    }
+
+    ServerSideCoreMetadataBuilder addLatestCommitFile(CoreFileData file) {
+      this.latestCommitFiles.add(file);
+      addAllCommitsFile(file);
+      return this;
     }
 
-    ServerSideCoreMetadataBuilder addFile(CoreFileData file) {
-      this.files.add(file);
+    ServerSideCoreMetadataBuilder addAllCommitsFile(CoreFileData file) {
+      this.allCommitsFiles.add(file);
       return this;
     }
 
     public ServerSideMetadata build() {
       ServerSideMetadata serverMetadata = mock(ServerSideMetadata.class);
       when(serverMetadata.getCoreName()).thenReturn(coreName);
-      when(serverMetadata.getFiles()).thenReturn(files.build());
+      when(serverMetadata.getGeneration()).thenReturn(generation);
+      when(serverMetadata.getLatestCommitFiles()).thenReturn(latestCommitFiles.build());
+      when(serverMetadata.getAllCommitsFiles()).thenReturn(allCommitsFiles.build());
       return serverMetadata;
     }
   }
diff --git a/solr/core/src/test/org/apache/solr/store/blob/metadata/ToFromJsonTest.java b/solr/core/src/test/org/apache/solr/store/blob/metadata/ToFromJsonTest.java
index 42775c1..6c1c359 100644
--- a/solr/core/src/test/org/apache/solr/store/blob/metadata/ToFromJsonTest.java
+++ b/solr/core/src/test/org/apache/solr/store/blob/metadata/ToFromJsonTest.java
@@ -35,28 +35,28 @@ public class ToFromJsonTest extends Assert {
 
     @Test
     public void jsonCoreMetadataNoFiles() throws Exception {
-        BlobCoreMetadata bcm = new BlobCoreMetadataBuilder(CORE_NAME).build();
+        BlobCoreMetadata bcm = new BlobCoreMetadataBuilder(CORE_NAME, 666L).build();
 
         verifyToJsonAndBack(bcm);
     }
 
     @Test
     public void jsonCoreMetadataFile() throws Exception {
-        BlobCoreMetadataBuilder bb = new BlobCoreMetadataBuilder(CORE_NAME);
+        BlobCoreMetadataBuilder bb = new BlobCoreMetadataBuilder(CORE_NAME, 777L);
 
-        BlobCoreMetadata bcm = bb.addFile(new BlobCoreMetadata.BlobFile("solrFilename", "blobFilename", 123000L)).build();
+        BlobCoreMetadata bcm = bb.addFile(new BlobCoreMetadata.BlobFile("solrFilename", "blobFilename", 123000L, 100L)).build();
 
         verifyToJsonAndBack(bcm);
     }
 
     @Test
     public void jsonCoreMetadataMultiFiles() throws Exception {
-        BlobCoreMetadataBuilder bb = new BlobCoreMetadataBuilder(CORE_NAME);
+        BlobCoreMetadataBuilder bb = new BlobCoreMetadataBuilder(CORE_NAME, 123L);
         Set<BlobCoreMetadata.BlobFile> files = new HashSet<>(Arrays.asList(
-                new BlobCoreMetadata.BlobFile("solrFilename11", "blobFilename11", 1234L),
-                new BlobCoreMetadata.BlobFile("solrFilename21", "blobFilename21", 2345L),
-                new BlobCoreMetadata.BlobFile("solrFilename31", "blobFilename31", 3456L),
-                new BlobCoreMetadata.BlobFile("solrFilename41", "blobFilename41", 4567L)
+                new BlobCoreMetadata.BlobFile("solrFilename11", "blobFilename11", 1234L, 100L),
+                new BlobCoreMetadata.BlobFile("solrFilename21", "blobFilename21", 2345L, 200L),
+                new BlobCoreMetadata.BlobFile("solrFilename31", "blobFilename31", 3456L, 200L),
+                new BlobCoreMetadata.BlobFile("solrFilename41", "blobFilename41", 4567L, 400L)
         ));
         for (BlobCoreMetadata.BlobFile f : files) {
             bb.addFile(f);
@@ -68,6 +68,7 @@ public class ToFromJsonTest extends Assert {
         verifyToJsonAndBack(bcm);
 
         assertEquals("blob core metadata should have core name specified to builder", CORE_NAME, bcm.getSharedBlobName());
+        assertEquals("blob core metadata should have generation specified to builder",123L, bcm.getGeneration());
 
         // Files are not necessarily in the same order
         assertEquals("blob core metadata should have file set specified to builder", files, new HashSet<>(Arrays.asList(bcm.getBlobFiles())));