You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2020/09/10 02:59:37 UTC

[cassandra] branch trunk updated: Mutating sstable component may race with entire-sstable-streaming(ZCS) causing checksum validation failure

This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f3bf077  Mutating sstable component may race with entire-sstable-streaming(ZCS) causing checksum validation failure
f3bf077 is described below

commit f3bf0775a5a8bce228289c22b96d0c922cf2cb0d
Author: Zhao Yang <zh...@gmail.com>
AuthorDate: Wed Sep 9 16:36:57 2020 -0700

    Mutating sstable component may race with entire-sstable-streaming(ZCS) causing checksum validation failure
    
    patch by Zhao Yang; reviewed by Blake Eggleston, Caleb Rackliffe, David Capwell, Benjamin Lerer for CASSANDRA-15861
---
 CHANGES.txt                                        |   1 +
 .../db/compaction/CompactionStrategyManager.java   |   3 +-
 .../cassandra/db/compaction/LeveledManifest.java   |   6 +-
 .../db/compaction/SingleSSTableLCSTask.java        |   3 +-
 .../apache/cassandra/db/compaction/Verifier.java   |   3 +-
 .../streaming/CassandraCompressedStreamReader.java |   6 +-
 .../streaming/CassandraCompressedStreamWriter.java |  17 +-
 .../CassandraEntireSSTableStreamReader.java        |  10 +-
 .../CassandraEntireSSTableStreamWriter.java        |  16 +-
 .../db/streaming/CassandraOutgoingFile.java        | 122 ++---
 .../db/streaming/CassandraStreamHeader.java        |  78 +--
 .../db/streaming/CassandraStreamWriter.java        |  11 +-
 .../cassandra/db/streaming/ComponentContext.java   | 105 ++++
 .../cassandra/db/streaming/ComponentManifest.java  |  39 +-
 .../db/streaming/CompressedInputStream.java        |   6 +-
 .../cassandra/db/streaming/CompressionInfo.java    | 141 +++++-
 .../apache/cassandra/io/sstable/Descriptor.java    |  11 +
 .../cassandra/io/sstable/IndexSummaryManager.java  |   4 +-
 .../org/apache/cassandra/io/sstable/SSTable.java   |   2 +-
 .../cassandra/io/sstable/format/SSTableReader.java | 536 ++++++---------------
 .../io/sstable/format/SSTableReaderBuilder.java    | 475 ++++++++++++++++++
 .../cassandra/io/sstable/format/big/BigFormat.java |   9 +-
 .../io/sstable/format/big/BigTableReader.java      |   7 +-
 .../io/sstable/format/big/BigTableWriter.java      |   9 +-
 .../io/sstable/metadata/IMetadataSerializer.java   |  17 +-
 .../io/sstable/metadata/MetadataSerializer.java    |  24 +-
 .../apache/cassandra/net/BufferPoolAllocator.java  |   2 +-
 .../apache/cassandra/streaming/OutgoingStream.java |   7 +-
 .../apache/cassandra/streaming/StreamSession.java  |   2 +-
 .../cassandra/streaming/StreamTransferTask.java    |   2 +-
 .../io/sstable/format/ForwardingSSTableReader.java |  70 +--
 .../microbench/ZeroCopyStreamingBenchmark.java     |  33 +-
 test/unit/org/apache/cassandra/Util.java           |  23 +
 .../apache/cassandra/db/ColumnFamilyStoreTest.java |   2 +-
 .../unit/org/apache/cassandra/db/KeyspaceTest.java |   2 +-
 .../cassandra/db/lifecycle/LogTransactionTest.java |   2 +-
 .../CassandraEntireSSTableStreamWriterTest.java    |  43 +-
 .../db/streaming/CassandraStreamHeaderTest.java    | 121 +++++
 ...TableStreamConcurrentComponentMutationTest.java | 332 +++++++++++++
 .../cassandra/io/sstable/SSTableReaderTest.java    |   4 +-
 ...ntireSSTableStreamingCorrectFilesCountTest.java |  14 +-
 .../compression/CompressedInputStreamTest.java     |   2 +-
 42 files changed, 1610 insertions(+), 712 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index afa03ac..3af602c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@
  * Avoid trying to keep track of RTs for endpoints we won't write to during read repair (CASSANDRA-16084)
  * When compaction gets interrupted, the exception should include the compactionId (CASSANDRA-15954)
  * Make Table/Keyspace Metric Names Consistent With Each Other (CASSANDRA-15909)
+ * Mutating sstable component may race with entire-sstable-streaming(ZCS) causing checksum validation failure (CASSANDRA-15861)
 
 4.0-beta2
  * Add addition incremental repair visibility to nodetool repair_admin (CASSANDRA-14939)
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 3a05e50..2e1fd0e 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -1196,8 +1196,7 @@ public class CompactionStrategyManager implements INotificationConsumer
         {
             for (SSTableReader sstable: sstables)
             {
-                sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, repairedAt, pendingRepair, isTransient);
-                sstable.reloadSSTableMetadata();
+                sstable.mutateRepairedAndReload(repairedAt, pendingRepair, isTransient);
                 verifyMetadata(sstable, repairedAt, pendingRepair, isTransient);
                 changed.add(sstable);
             }
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 2c32361..37c9435 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -164,8 +164,7 @@ public class LeveledManifest
             try
             {
                 logger.debug("Could not add sstable {} in level {} - dropping to 0", reader, reader.getSSTableLevel());
-                reader.descriptor.getMetadataSerializer().mutateLevel(reader.descriptor, 0);
-                reader.reloadSSTableMetadata();
+                reader.mutateLevelAndReload(0);
             }
             catch (IOException e)
             {
@@ -281,8 +280,7 @@ public class LeveledManifest
         remove(sstable);
         try
         {
-            sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 0);
-            sstable.reloadSSTableMetadata();
+            sstable.mutateLevelAndReload(0);
             add(sstable);
         }
         catch (IOException e)
diff --git a/src/java/org/apache/cassandra/db/compaction/SingleSSTableLCSTask.java b/src/java/org/apache/cassandra/db/compaction/SingleSSTableLCSTask.java
index 02a1c49..2e1dffc 100644
--- a/src/java/org/apache/cassandra/db/compaction/SingleSSTableLCSTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/SingleSSTableLCSTask.java
@@ -77,8 +77,7 @@ public class SingleSSTableLCSTask extends AbstractCompactionTask
             try
             {
                 logger.info("Changing level on {} from {} to {}", sstable, metadataBefore.sstableLevel, level);
-                sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, level);
-                sstable.reloadSSTableMetadata();
+                sstable.mutateLevelAndReload(level);
             }
             catch (Throwable t)
             {
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index 2500a24..577f136 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -466,8 +466,7 @@ public class Verifier implements Closeable
         {
             try
             {
-                sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getPendingRepair(), sstable.isTransient());
-                sstable.reloadSSTableMetadata();
+                sstable.mutateRepairedAndReload(ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getPendingRepair(), sstable.isTransient());
                 cfs.getTracker().notifySSTableRepairedStatusChanged(Collections.singleton(sstable));
             }
             catch(IOException ioe)
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
index 37b1a01..2491fe1 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
@@ -123,10 +123,6 @@ public class CassandraCompressedStreamReader extends CassandraStreamReader
     @Override
     protected long totalSize()
     {
-        long size = 0;
-        // calculate total length of transferring chunks
-        for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
-            size += chunk.length + 4; // 4 bytes for CRC
-        return size;
+        return compressionInfo.getTotalSize();
     }
 }
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
index d92314b..8d0b67f 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db.streaming;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
@@ -48,11 +47,13 @@ public class CassandraCompressedStreamWriter extends CassandraStreamWriter
     private static final Logger logger = LoggerFactory.getLogger(CassandraCompressedStreamWriter.class);
 
     private final CompressionInfo compressionInfo;
+    private final long totalSize;
 
-    public CassandraCompressedStreamWriter(SSTableReader sstable, Collection<SSTableReader.PartitionPositionBounds> sections, CompressionInfo compressionInfo, StreamSession session)
+    public CassandraCompressedStreamWriter(SSTableReader sstable, CassandraStreamHeader header, StreamSession session)
     {
-        super(sstable, sections, session);
-        this.compressionInfo = compressionInfo;
+        super(sstable, header, session);
+        this.compressionInfo = header.compressionInfo;
+        this.totalSize = header.size();
     }
 
     @Override
@@ -67,7 +68,7 @@ public class CassandraCompressedStreamWriter extends CassandraStreamWriter
             long progress = 0L;
 
             // we want to send continuous chunks together to minimise reads from disk and network writes
-            List<Section> sections = fuseAdjacentChunks(compressionInfo.chunks);
+            List<Section> sections = fuseAdjacentChunks(compressionInfo.chunks());
 
             int sectionIdx = 0;
 
@@ -106,11 +107,7 @@ public class CassandraCompressedStreamWriter extends CassandraStreamWriter
     @Override
     protected long totalSize()
     {
-        long size = 0;
-        // calculate total length of transferring chunks
-        for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
-            size += chunk.length + 4; // 4 bytes for CRC
-        return size;
+        return totalSize;
     }
 
     // chunks are assumed to be sorted by offset
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
index eac37d1..0bfe993 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
@@ -21,8 +21,8 @@ package org.apache.cassandra.db.streaming;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.function.UnaryOperator;
 
-import com.google.common.base.Function;
 import com.google.common.base.Throwables;
 import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
 import org.slf4j.Logger;
@@ -135,9 +135,11 @@ public class CassandraEntireSSTableStreamReader implements IStreamReader
                              prettyPrintMemory(totalSize));
             }
 
-            Function<StatsMetadata, StatsMetadata> transform = stats -> stats.mutateLevel(header.sstableLevel)
-                                                                             .mutateRepairedMetadata(messageHeader.repairedAt, messageHeader.pendingRepair, false);
-            writer.descriptor.getMetadataSerializer().mutate(writer.descriptor, transform);
+            UnaryOperator<StatsMetadata> transform = stats -> stats.mutateLevel(header.sstableLevel)
+                                                                   .mutateRepairedMetadata(messageHeader.repairedAt, messageHeader.pendingRepair, false);
+            String description = String.format("level %s and repairedAt time %s and pendingRepair %s",
+                                               header.sstableLevel, messageHeader.repairedAt, messageHeader.pendingRepair);
+            writer.descriptor.getMetadataSerializer().mutate(writer.descriptor, description, transform);
             return writer;
         }
         catch (Throwable e)
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java
index 401b20e..ef82eb2 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java
@@ -19,7 +19,6 @@
 package org.apache.cassandra.db.streaming;
 
 import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.nio.channels.FileChannel;
 
 import org.slf4j.Logger;
@@ -43,15 +42,17 @@ public class CassandraEntireSSTableStreamWriter
     private static final Logger logger = LoggerFactory.getLogger(CassandraEntireSSTableStreamWriter.class);
 
     private final SSTableReader sstable;
+    private final ComponentContext context;
     private final ComponentManifest manifest;
     private final StreamSession session;
     private final StreamRateLimiter limiter;
 
-    public CassandraEntireSSTableStreamWriter(SSTableReader sstable, StreamSession session, ComponentManifest manifest)
+    public CassandraEntireSSTableStreamWriter(SSTableReader sstable, StreamSession session, ComponentContext context)
     {
         this.session = session;
         this.sstable = sstable;
-        this.manifest = manifest;
+        this.context = context;
+        this.manifest = context.manifest();
         this.limiter = StreamManager.getRateLimiter(session.peer);
     }
 
@@ -76,11 +77,8 @@ public class CassandraEntireSSTableStreamWriter
 
         for (Component component : manifest.components())
         {
-            @SuppressWarnings("resource") // this is closed after the file is transferred by AsyncChannelOutputPlus
-            FileChannel in = new RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel();
-
             // Total Length to transmit for this file
-            long length = in.size();
+            long length = manifest.sizeOf(component);
 
             // tracks write progress
             logger.debug("[Stream #{}] Streaming {}.{} gen {} component {} size {}", session.planId(),
@@ -90,7 +88,9 @@ public class CassandraEntireSSTableStreamWriter
                          component,
                          prettyPrintMemory(length));
 
-            long bytesWritten = out.writeFileToChannel(in, limiter);
+            @SuppressWarnings("resource") // this is closed after the file is transferred by AsyncChannelOutputPlus
+            FileChannel channel = context.channel(sstable.descriptor, component, length);
+            long bytesWritten = out.writeFileToChannel(channel, limiter);
             progress += bytesWritten;
 
             session.progress(sstable.descriptor.filenameFor(component), ProgressInfo.Direction.OUT, bytesWritten, length);
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
index 0917fba..0904720 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
@@ -18,21 +18,17 @@
 
 package org.apache.cassandra.db.streaming;
 
-import java.io.File;
 import java.io.IOException;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Objects;
 import java.util.UUID;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.net.AsyncStreamingOutputPlus;
@@ -47,19 +43,13 @@ import org.apache.cassandra.utils.concurrent.Ref;
  */
 public class CassandraOutgoingFile implements OutgoingStream
 {
-    public static final List<Component> STREAM_COMPONENTS = ImmutableList.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS,
-                                                                             Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY,
-                                                                             Component.DIGEST, Component.CRC);
-
     private final Ref<SSTableReader> ref;
     private final long estimatedKeys;
     private final List<SSTableReader.PartitionPositionBounds> sections;
     private final String filename;
-    private final CassandraStreamHeader header;
-    private final boolean keepSSTableLevel;
-    private final ComponentManifest manifest;
-
     private final boolean shouldStreamEntireSSTable;
+    private final StreamOperation operation;
+    private final CassandraStreamHeader header;
 
     public CassandraOutgoingFile(StreamOperation operation, Ref<SSTableReader> ref,
                                  List<SSTableReader.PartitionPositionBounds> sections, List<Range<Token>> normalizedRanges,
@@ -67,45 +57,48 @@ public class CassandraOutgoingFile implements OutgoingStream
     {
         Preconditions.checkNotNull(ref.get());
         Range.assertNormalized(normalizedRanges);
+        this.operation = operation;
         this.ref = ref;
         this.estimatedKeys = estimatedKeys;
         this.sections = sections;
-        this.filename = ref.get().getFilename();
-        this.manifest = getComponentManifest(ref.get());
-        this.shouldStreamEntireSSTable = computeShouldStreamEntireSSTables();
 
         SSTableReader sstable = ref.get();
-        keepSSTableLevel = operation == StreamOperation.BOOTSTRAP || operation == StreamOperation.REBUILD;
-        this.header =
-            CassandraStreamHeader.builder()
-                                 .withSSTableFormat(sstable.descriptor.formatType)
-                                 .withSSTableVersion(sstable.descriptor.version)
-                                 .withSSTableLevel(keepSSTableLevel ? sstable.getSSTableLevel() : 0)
-                                 .withEstimatedKeys(estimatedKeys)
-                                 .withSections(sections)
-                                 .withCompressionMetadata(sstable.compression ? sstable.getCompressionMetadata() : null)
-                                 .withSerializationHeader(sstable.header.toComponent())
-                                 .isEntireSSTable(shouldStreamEntireSSTable)
-                                 .withComponentManifest(manifest)
-                                 .withFirstKey(sstable.first)
-                                 .withTableId(sstable.metadata().id)
-                                 .build();
-    }
-
-    @VisibleForTesting
-    public static ComponentManifest getComponentManifest(SSTableReader sstable)
-    {
-        LinkedHashMap<Component, Long> components = new LinkedHashMap<>(STREAM_COMPONENTS.size());
-        for (Component component : STREAM_COMPONENTS)
-        {
-            File file = new File(sstable.descriptor.filenameFor(component));
-            if (file.exists())
-                components.put(component, file.length());
-        }
 
-        return new ComponentManifest(components);
+        this.filename = sstable.getFilename();
+        this.shouldStreamEntireSSTable = computeShouldStreamEntireSSTables();
+        ComponentManifest manifest = ComponentManifest.create(sstable.descriptor);
+        this.header = makeHeader(sstable, operation, sections, estimatedKeys, shouldStreamEntireSSTable, manifest);
+    }
+
+    private static CassandraStreamHeader makeHeader(SSTableReader sstable,
+                                                    StreamOperation operation,
+                                                    List<SSTableReader.PartitionPositionBounds> sections,
+                                                    long estimatedKeys,
+                                                    boolean shouldStreamEntireSSTable,
+                                                    ComponentManifest manifest)
+    {
+        boolean keepSSTableLevel = operation == StreamOperation.BOOTSTRAP || operation == StreamOperation.REBUILD;
+
+        CompressionInfo compressionInfo = sstable.compression
+                ? CompressionInfo.newLazyInstance(sstable.getCompressionMetadata(), sections)
+                : null;
+
+        return CassandraStreamHeader.builder()
+                                    .withSSTableFormat(sstable.descriptor.formatType)
+                                    .withSSTableVersion(sstable.descriptor.version)
+                                    .withSSTableLevel(keepSSTableLevel ? sstable.getSSTableLevel() : 0)
+                                    .withEstimatedKeys(estimatedKeys)
+                                    .withSections(sections)
+                                    .withCompressionInfo(compressionInfo)
+                                    .withSerializationHeader(sstable.header.toComponent())
+                                    .isEntireSSTable(shouldStreamEntireSSTable)
+                                    .withComponentManifest(manifest)
+                                    .withFirstKey(sstable.first)
+                                    .withTableId(sstable.metadata().id)
+                                    .build();
     }
 
+    @VisibleForTesting
     public static CassandraOutgoingFile fromStream(OutgoingStream stream)
     {
         Preconditions.checkArgument(stream instanceof CassandraOutgoingFile);
@@ -125,7 +118,7 @@ public class CassandraOutgoingFile implements OutgoingStream
     }
 
     @Override
-    public long getSize()
+    public long getEstimatedSize()
     {
         return header.size();
     }
@@ -139,7 +132,7 @@ public class CassandraOutgoingFile implements OutgoingStream
     @Override
     public int getNumFiles()
     {
-        return shouldStreamEntireSSTable ? getManifestSize() : 1;
+        return shouldStreamEntireSSTable ? header.componentManifest.components().size() : 1;
     }
 
     @Override
@@ -154,29 +147,40 @@ public class CassandraOutgoingFile implements OutgoingStream
         return ref.get().getPendingRepair();
     }
 
-    public int getManifestSize()
-    {
-        return manifest.components().size();
-    }
-
     @Override
     public void write(StreamSession session, DataOutputStreamPlus out, int version) throws IOException
     {
+        // FileStreamTask uses AsyncStreamingOutputPlus for streaming.
+        assert out instanceof AsyncStreamingOutputPlus : "Unexpected DataOutputStreamPlus " + out.getClass();
+
         SSTableReader sstable = ref.get();
-        CassandraStreamHeader.serializer.serialize(header, out, version);
-        out.flush();
 
-        if (shouldStreamEntireSSTable && out instanceof AsyncStreamingOutputPlus)
+        if (shouldStreamEntireSSTable)
         {
-            CassandraEntireSSTableStreamWriter writer = new CassandraEntireSSTableStreamWriter(sstable, session, manifest);
-            writer.write((AsyncStreamingOutputPlus) out);
+            // Acquire lock to avoid concurrent sstable component mutation because of stats update or index summary
+            // redistribution, otherwise file sizes recorded in component manifest will be different from actual
+            // file sizes. (Note: Windows doesn't support atomic replace and index summary redistribution deletes
+            // existing file first)
+            // Recreate the latest manifest and hard links for mutatable components in case they are modified.
+            try (ComponentContext context = sstable.runWithLock(ignored -> ComponentContext.create(sstable.descriptor)))
+            {
+                CassandraStreamHeader current = makeHeader(sstable, operation, sections, estimatedKeys, true, context.manifest());
+                CassandraStreamHeader.serializer.serialize(current, out, version);
+                out.flush();
+
+                CassandraEntireSSTableStreamWriter writer = new CassandraEntireSSTableStreamWriter(sstable, session, context);
+                writer.write((AsyncStreamingOutputPlus) out);
+            }
         }
         else
         {
-            CassandraStreamWriter writer = (header.compressionInfo == null) ?
-                     new CassandraStreamWriter(sstable, header.sections, session) :
-                     new CassandraCompressedStreamWriter(sstable, header.sections,
-                                                         header.compressionInfo, session);
+            // legacy streaming is not affected by stats metadata mutation and index sumary redistribution
+            CassandraStreamHeader.serializer.serialize(header, out, version);
+            out.flush();
+
+            CassandraStreamWriter writer = header.isCompressed() ?
+                                           new CassandraCompressedStreamWriter(sstable, header, session) :
+                                           new CassandraStreamWriter(sstable, header, session);
             writer.write(out);
         }
     }
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java
index 2af56de..c9e10cf 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java
@@ -32,7 +32,6 @@ import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.Version;
@@ -52,13 +51,7 @@ public class CassandraStreamHeader
     public final SSTableFormat.Type format;
     public final long estimatedKeys;
     public final List<SSTableReader.PartitionPositionBounds> sections;
-    /**
-     * Compression info for SSTable to send. Can be null if SSTable is not compressed.
-     * On sender, this field is always null to avoid holding large number of Chunks.
-     * Use compressionMetadata instead.
-     */
-    private final CompressionMetadata compressionMetadata;
-    public volatile CompressionInfo compressionInfo;
+    public final CompressionInfo compressionInfo;
     public final int sstableLevel;
     public final SerializationHeader.Component serializationHeader;
 
@@ -70,7 +63,7 @@ public class CassandraStreamHeader
     public final ComponentManifest componentManifest;
 
     /* cached size value */
-    private transient final long size;
+    private final long size;
 
     private CassandraStreamHeader(Builder builder)
     {
@@ -78,7 +71,6 @@ public class CassandraStreamHeader
         format = builder.format;
         estimatedKeys = builder.estimatedKeys;
         sections = builder.sections;
-        compressionMetadata = builder.compressionMetadata;
         compressionInfo = builder.compressionInfo;
         sstableLevel = builder.sstableLevel;
         serializationHeader = builder.serializationHeader;
@@ -107,48 +99,22 @@ public class CassandraStreamHeader
         return size;
     }
 
-    private long calculateSize()
+    @VisibleForTesting
+    public long calculateSize()
     {
         if (isEntireSSTable)
             return componentManifest.totalSize();
 
-        long transferSize = 0;
         if (compressionInfo != null)
-        {
-            // calculate total length of transferring chunks
-            for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
-                transferSize += chunk.length + 4; // 4 bytes for CRC
-        }
-        else
-        {
-            for (SSTableReader.PartitionPositionBounds section : sections)
-                transferSize += section.upperPosition - section.lowerPosition;
-        }
-        return transferSize;
-    }
+            return compressionInfo.getTotalSize();
 
-    public synchronized void calculateCompressionInfo()
-    {
-        if (compressionMetadata != null && compressionInfo == null)
-            compressionInfo = CompressionInfo.fromCompressionMetadata(compressionMetadata, sections);
+        long transferSize = 0;
+        for (SSTableReader.PartitionPositionBounds section : sections)
+            transferSize += section.upperPosition - section.lowerPosition;
+        return transferSize;
     }
 
     @Override
-    public String toString()
-    {
-        return "CassandraStreamHeader{" +
-               "version=" + version +
-               ", format=" + format +
-               ", estimatedKeys=" + estimatedKeys +
-               ", sections=" + sections +
-               ", sstableLevel=" + sstableLevel +
-               ", header=" + serializationHeader +
-               ", isEntireSSTable=" + isEntireSSTable +
-               ", firstKey=" + firstKey +
-               ", tableId=" + tableId +
-               '}';
-    }
-
     public boolean equals(Object o)
     {
         if (this == o) return true;
@@ -167,12 +133,29 @@ public class CassandraStreamHeader
                Objects.equals(tableId, that.tableId);
     }
 
+    @Override
     public int hashCode()
     {
         return Objects.hash(version, format, estimatedKeys, sections, compressionInfo, sstableLevel, serializationHeader, componentManifest,
                             isEntireSSTable, firstKey, tableId);
     }
 
+    @Override
+    public String toString()
+    {
+        return "CassandraStreamHeader{" +
+               "version=" + version +
+               ", format=" + format +
+               ", estimatedKeys=" + estimatedKeys +
+               ", sections=" + sections +
+               ", sstableLevel=" + sstableLevel +
+               ", header=" + serializationHeader +
+               ", isEntireSSTable=" + isEntireSSTable +
+               ", firstKey=" + firstKey +
+               ", tableId=" + tableId +
+               '}';
+    }
+
     public static final IVersionedSerializer<CassandraStreamHeader> serializer = new CassandraStreamHeaderSerializer();
 
     public static class CassandraStreamHeaderSerializer implements IVersionedSerializer<CassandraStreamHeader>
@@ -189,7 +172,6 @@ public class CassandraStreamHeader
                 out.writeLong(section.lowerPosition);
                 out.writeLong(section.upperPosition);
             }
-            header.calculateCompressionInfo();
             CompressionInfo.serializer.serialize(header.compressionInfo, out, version);
             out.writeInt(header.sstableLevel);
 
@@ -275,7 +257,6 @@ public class CassandraStreamHeader
                 size += TypeSizes.sizeof(section.upperPosition);
             }
 
-            header.calculateCompressionInfo();
             size += CompressionInfo.serializer.serializedSize(header.compressionInfo, version);
             size += TypeSizes.sizeof(header.sstableLevel);
 
@@ -299,7 +280,6 @@ public class CassandraStreamHeader
         private SSTableFormat.Type format;
         private long estimatedKeys;
         private List<SSTableReader.PartitionPositionBounds> sections;
-        private CompressionMetadata compressionMetadata;
         private CompressionInfo compressionInfo;
         private int sstableLevel;
         private SerializationHeader.Component serializationHeader;
@@ -338,12 +318,6 @@ public class CassandraStreamHeader
             return this;
         }
 
-        public Builder withCompressionMetadata(CompressionMetadata compressionMetadata)
-        {
-            this.compressionMetadata = compressionMetadata;
-            return this;
-        }
-
         public Builder withCompressionInfo(CompressionInfo compressionInfo)
         {
             this.compressionInfo = compressionInfo;
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
index 8382f0a..10296fb 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
@@ -59,13 +59,15 @@ public class CassandraStreamWriter
     protected final Collection<SSTableReader.PartitionPositionBounds> sections;
     protected final StreamRateLimiter limiter;
     protected final StreamSession session;
+    private final long totalSize;
 
-    public CassandraStreamWriter(SSTableReader sstable, Collection<SSTableReader.PartitionPositionBounds> sections, StreamSession session)
+    public CassandraStreamWriter(SSTableReader sstable, CassandraStreamHeader header, StreamSession session)
     {
         this.session = session;
         this.sstable = sstable;
-        this.sections = sections;
+        this.sections = header.sections;
         this.limiter =  StreamManager.getRateLimiter(session.peer);
+        this.totalSize = header.size();
     }
 
     /**
@@ -127,10 +129,7 @@ public class CassandraStreamWriter
 
     protected long totalSize()
     {
-        long size = 0;
-        for (SSTableReader.PartitionPositionBounds section : sections)
-            size += section.upperPosition - section.lowerPosition;
-        return size;
+        return totalSize;
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/db/streaming/ComponentContext.java b/src/java/org/apache/cassandra/db/streaming/ComponentContext.java
new file mode 100644
index 0000000..b9c60b9
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/streaming/ComponentContext.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.util.FileUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Mutable SSTable components and their hardlinks to avoid concurrent sstable component modification
+ * during entire-sstable-streaming.
+ */
+public class ComponentContext implements AutoCloseable
+{
+    private static final Logger logger = LoggerFactory.getLogger(ComponentContext.class);
+
+    private static final Set<Component> MUTABLE_COMPONENTS = ImmutableSet.of(Component.STATS, Component.SUMMARY);
+
+    private final Map<Component, File> hardLinks;
+    private final ComponentManifest manifest;
+
+    private ComponentContext(Map<Component, File> hardLinks, ComponentManifest manifest)
+    {
+        this.hardLinks = hardLinks;
+        this.manifest = manifest;
+    }
+
+    public static ComponentContext create(Descriptor descriptor)
+    {
+        Map<Component, File> hardLinks = new HashMap<>(1);
+
+        for (Component component : MUTABLE_COMPONENTS)
+        {
+            File file = new File(descriptor.filenameFor(component));
+            if (!file.exists())
+                continue;
+
+            File hardlink = new File(descriptor.tmpFilenameForStreaming(component));
+            FileUtils.createHardLink(file, hardlink);
+            hardLinks.put(component, hardlink);
+        }
+
+        return new ComponentContext(hardLinks, ComponentManifest.create(descriptor));
+    }
+
+    public ComponentManifest manifest()
+    {
+        return manifest;
+    }
+
+    /**
+     * @return file channel to be streamed, either original component or hardlinked component.
+     */
+    public FileChannel channel(Descriptor descriptor, Component component, long size) throws IOException
+    {
+        String toTransfer = hardLinks.containsKey(component) ? hardLinks.get(component).getPath() : descriptor.filenameFor(component);
+        @SuppressWarnings("resource") // file channel will be closed by Caller
+        FileChannel channel = new RandomAccessFile(toTransfer, "r").getChannel();
+
+        assert size == channel.size() : String.format("Entire sstable streaming expects %s file size to be %s but got %s.",
+                                                      component, size, channel.size());
+        return channel;
+    }
+
+    @Override
+    public void close()
+    {
+        Throwable accumulate = null;
+        for (File file : hardLinks.values())
+            accumulate = FileUtils.deleteWithConfirm(file, accumulate);
+
+        hardLinks.clear();
+
+        if (accumulate != null)
+            logger.warn("Failed to remove hard link files", accumulate);
+    }
+}
diff --git a/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java b/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java
index 90e3dbd..bb896ca 100644
--- a/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java
+++ b/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java
@@ -18,23 +18,29 @@
 
 package org.apache.cassandra.db.streaming;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
-
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * SSTable components and their sizes to be tranfered via entire-sstable-streaming
+ */
 public final class ComponentManifest implements Iterable<Component>
 {
+    private static final List<Component> STREAM_COMPONENTS = ImmutableList.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS,
+                                                                             Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY,
+                                                                             Component.DIGEST, Component.CRC);
+
     private final LinkedHashMap<Component, Long> components;
 
     public ComponentManifest(Map<Component, Long> components)
@@ -42,6 +48,23 @@ public final class ComponentManifest implements Iterable<Component>
         this.components = new LinkedHashMap<>(components);
     }
 
+    @VisibleForTesting
+    public static ComponentManifest create(Descriptor descriptor)
+    {
+        LinkedHashMap<Component, Long> components = new LinkedHashMap<>(STREAM_COMPONENTS.size());
+
+        for (Component component : STREAM_COMPONENTS)
+        {
+            File file = new File(descriptor.filenameFor(component));
+            if (!file.exists())
+                continue;
+
+            components.put(component, file.length());
+        }
+
+        return new ComponentManifest(components);
+    }
+
     public long sizeOf(Component component)
     {
         Long size = components.get(component);
diff --git a/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java b/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java
index b8626ff..d0b1e4c 100644
--- a/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java
@@ -75,15 +75,15 @@ public class CompressedInputStream extends RebufferingInputStream implements Aut
                                  ChecksumType checksumType,
                                  DoubleSupplier validateChecksumChance)
     {
-        super(ByteBuffer.allocateDirect(compressionInfo.parameters.chunkLength()));
+        super(ByteBuffer.allocateDirect(compressionInfo.parameters().chunkLength()));
         buffer.limit(0);
 
         this.input = input;
         this.checksumType = checksumType;
         this.validateChecksumChance = validateChecksumChance;
 
-        compressionParams = compressionInfo.parameters;
-        compressedChunks = Iterators.forArray(compressionInfo.chunks);
+        compressionParams = compressionInfo.parameters();
+        compressedChunks = Iterators.forArray(compressionInfo.chunks());
         compressedChunk = ByteBuffer.allocateDirect(compressionParams.chunkLength());
     }
 
diff --git a/src/java/org/apache/cassandra/db/streaming/CompressionInfo.java b/src/java/org/apache/cassandra/db/streaming/CompressionInfo.java
index aef57e3..84563da 100644
--- a/src/java/org/apache/cassandra/db/streaming/CompressionInfo.java
+++ b/src/java/org/apache/cassandra/db/streaming/CompressionInfo.java
@@ -18,11 +18,14 @@
 package org.apache.cassandra.db.streaming;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.compress.CompressionMetadata.Chunk;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.CompressionParams;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -31,31 +34,133 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 /**
  * Container that carries compression parameters and chunks to decompress data from stream.
  */
-public class CompressionInfo
+public abstract class CompressionInfo
 {
     public static final IVersionedSerializer<CompressionInfo> serializer = new CompressionInfoSerializer();
 
-    public final CompressionMetadata.Chunk[] chunks;
-    public final CompressionParams parameters;
+    /**
+     * Returns the compression parameters.
+     *
+     * @return the compression parameters.
+     */
+    public abstract CompressionParams parameters();
 
-    public CompressionInfo(CompressionMetadata.Chunk[] chunks, CompressionParams parameters)
+    /**
+     * Returns the offset and length of the file chunks.
+     *
+     * @return the offset and length of the file chunks.
+     */
+    public abstract CompressionMetadata.Chunk[] chunks();
+
+    /**
+     * Computes the size of the file to transfer.
+     *
+     * @return the size of the file in bytes
+     */
+    public long getTotalSize()
+    {
+        long size = 0;
+        for (CompressionMetadata.Chunk chunk : chunks())
+        {
+            size += chunk.length + 4; // 4 bytes for CRC
+        }
+        return size;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof CompressionInfo))
+            return false;
+
+        CompressionInfo that = (CompressionInfo) o;
+
+        return Objects.equals(parameters(), that.parameters())
+               && Arrays.equals(chunks(), that.chunks());
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(parameters(), chunks());
+    }
+
+    /**
+     * Create a {@code CompressionInfo} instance which is fully initialized.
+     *
+     * @param chunks the file chunks
+     * @param parameters the compression parameters
+     */
+    public static CompressionInfo newInstance(CompressionMetadata.Chunk[] chunks, CompressionParams parameters)
     {
         assert chunks != null && parameters != null;
-        this.chunks = chunks;
-        this.parameters = parameters;
+
+        return new CompressionInfo()
+        {
+            @Override
+            public Chunk[] chunks()
+            {
+                return chunks;
+            }
+
+            @Override
+            public CompressionParams parameters()
+            {
+                return parameters;
+            }
+        };
     }
 
-    static CompressionInfo fromCompressionMetadata(CompressionMetadata metadata, List<SSTableReader.PartitionPositionBounds> sections)
+    /**
+     * Create a {@code CompressionInfo} that will computes the file chunks only upon request.
+     *
+     * <p>The instance returned by that method will only computes the file chunks when the {@code chunks},
+     * {@code equals} or {@code hashcode} methods are called for the first time. This is done to reduce the GC
+     * pressure. See CASSANDRA-10680 for more details</p>.
+     *
+     * @param metadata the compression metadata
+     * @param sections the file sections
+     * @return a {@code CompressionInfo} that will computes the file chunks only upon request.
+     */
+    static CompressionInfo newLazyInstance(CompressionMetadata metadata, List<SSTableReader.PartitionPositionBounds> sections)
     {
         if (metadata == null)
         {
             return null;
         }
-        else
+
+        return new CompressionInfo()
         {
-            return new CompressionInfo(metadata.getChunksForSections(sections), metadata.parameters);
-        }
+            private volatile Chunk[] chunks;
+
+            @Override
+            public synchronized Chunk[] chunks()
+            {
+                if (chunks == null)
+                    chunks = metadata.getChunksForSections(sections);
 
+                return chunks;
+            }
+
+            @Override
+            public CompressionParams parameters()
+            {
+                return metadata.parameters;
+            }
+
+            @Override
+            public long getTotalSize()
+            {
+                // If the chunks have not been loaded yet we avoid to compute them.
+                if (chunks == null)
+                    return metadata.getTotalSizeForSections(sections);
+
+                return super.getTotalSize();
+            }
+        };
     }
 
     static class CompressionInfoSerializer implements IVersionedSerializer<CompressionInfo>
@@ -68,12 +173,13 @@ public class CompressionInfo
                 return;
             }
 
-            int chunkCount = info.chunks.length;
+            Chunk[] chunks = info.chunks();
+            int chunkCount = chunks.length;
             out.writeInt(chunkCount);
             for (int i = 0; i < chunkCount; i++)
-                CompressionMetadata.Chunk.serializer.serialize(info.chunks[i], out, version);
+                CompressionMetadata.Chunk.serializer.serialize(chunks[i], out, version);
             // compression params
-            CompressionParams.serializer.serialize(info.parameters, out, version);
+            CompressionParams.serializer.serialize(info.parameters(), out, version);
         }
 
         public CompressionInfo deserialize(DataInputPlus in, int version) throws IOException
@@ -89,7 +195,7 @@ public class CompressionInfo
 
             // compression params
             CompressionParams parameters = CompressionParams.serializer.deserialize(in, version);
-            return new CompressionInfo(chunks, parameters);
+            return CompressionInfo.newInstance(chunks, parameters);
         }
 
         public long serializedSize(CompressionInfo info, int version)
@@ -98,12 +204,13 @@ public class CompressionInfo
                 return TypeSizes.sizeof(-1);
 
             // chunks
-            int chunkCount = info.chunks.length;
+            Chunk[] chunks = info.chunks();
+            int chunkCount = chunks.length;
             long size = TypeSizes.sizeof(chunkCount);
             for (int i = 0; i < chunkCount; i++)
-                size += CompressionMetadata.Chunk.serializer.serializedSize(info.chunks[i], version);
+                size += CompressionMetadata.Chunk.serializer.serializedSize(chunks[i], version);
             // compression params
-            size += CompressionParams.serializer.serializedSize(info.parameters, version);
+            size += CompressionParams.serializer.serializedSize(info.parameters(), version);
             return size;
         }
     }
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index fca0f8d..dc67451 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.sstable.metadata.IMetadataSerializer;
 import org.apache.cassandra.io.sstable.metadata.MetadataSerializer;
 import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.UUIDGen;
 
 import static org.apache.cassandra.io.sstable.Component.separator;
 
@@ -114,6 +115,16 @@ public class Descriptor
         return filenameFor(component) + TMP_EXT;
     }
 
+    /**
+     * @return a unique temporary file name for given component during entire-sstable-streaming.
+     */
+    public String tmpFilenameForStreaming(Component component)
+    {
+        // Use UUID to handle concurrent streamings on the same sstable.
+        // TMP_EXT allows temp file to be removed by {@link ColumnFamilyStore#scrubDataDirectories}
+        return String.format("%s.%s%s", filenameFor(component), UUIDGen.getTimeUUID(), TMP_EXT);
+    }
+
     public String filenameFor(Component component)
     {
         return baseFilename() + separator + component.name();
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index 880b738..a85d855 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -209,14 +209,14 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
             for (ColumnFamilyStore cfStore: ks.getColumnFamilyStores())
             {
                 Set<SSTableReader> nonCompacting, allSSTables;
-                LifecycleTransaction txn = null;
+                LifecycleTransaction txn;
                 do
                 {
                     View view = cfStore.getTracker().getView();
                     allSSTables = ImmutableSet.copyOf(view.select(SSTableSet.CANONICAL));
                     nonCompacting = ImmutableSet.copyOf(view.getUncompacting(allSSTables));
                 }
-                while (null == (txn = cfStore.getTracker().tryModify(nonCompacting, OperationType.UNKNOWN)));
+                while (null == (txn = cfStore.getTracker().tryModify(nonCompacting, OperationType.INDEX_SUMMARY)));
 
                 allNonCompacting.put(cfStore.metadata.id, txn);
                 allCompacting.addAll(Sets.difference(allSSTables, nonCompacting));
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index 353b624..0471be3 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -265,7 +265,7 @@ public abstract class SSTable
     }
 
     /** @return An estimate of the number of keys contained in the given index file. */
-    protected long estimateRowsFromIndex(RandomAccessReader ifile) throws IOException
+    public static long estimateRowsFromIndex(RandomAccessReader ifile, Descriptor descriptor) throws IOException
     {
         // collect sizes for the first 10000 keys, or first 10 megabytes of data
         final int SAMPLES_CAP = 10000, BYTES_CAP = (int)Math.min(10000000, ifile.length());
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index cd929ba..3e14422 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -17,8 +17,6 @@
  */
 package org.apache.cassandra.io.sstable.format;
 
-import java.nio.file.Files;
-import java.nio.file.Paths;
 import java.io.*;
 import java.lang.ref.WeakReference;
 import java.nio.ByteBuffer;
@@ -39,13 +37,11 @@ import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
 import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
 import com.clearspring.analytics.stream.cardinality.ICardinality;
 
-import org.apache.cassandra.cache.ChunkCache;
 import org.apache.cassandra.cache.InstrumentingCache;
 import org.apache.cassandra.cache.KeyCacheKey;
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
-import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
@@ -68,7 +64,6 @@ import org.apache.cassandra.schema.CachingParams;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.CacheService;
@@ -204,12 +199,12 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
     public final UniqueIdentifier instanceId = new UniqueIdentifier();
 
     // indexfile and datafile: might be null before a call to load()
-    protected FileHandle ifile;
-    protected FileHandle dfile;
-    protected IndexSummary indexSummary;
-    protected IFilter bf;
+    protected final FileHandle ifile;
+    protected final FileHandle dfile;
+    protected final IFilter bf;
+    public final IndexSummary indexSummary;
 
-    protected final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
+    protected final RowIndexEntry.IndexSerializer<?> rowIndexEntrySerializer;
 
     protected InstrumentingCache<KeyCacheKey, RowIndexEntry> keyCache;
 
@@ -430,52 +425,14 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
             System.exit(1);
         }
 
-        long fileLength = new File(descriptor.filenameFor(Component.DATA)).length();
-        logger.info("Opening {} ({})", descriptor, FBUtilities.prettyPrintMemory(fileLength));
-
-        final SSTableReader sstable;
         try
         {
-            sstable = internalOpen(descriptor,
-                                   components,
-                                   metadata,
-                                   System.currentTimeMillis(),
-                                   statsMetadata,
-                                   OpenReason.NORMAL,
-                                   header.toHeader(metadata.get()));
+            return new SSTableReaderBuilder.ForBatch(descriptor, metadata, components, statsMetadata, header.toHeader(metadata.get())).build();
         }
         catch (UnknownColumnException e)
         {
             throw new IllegalStateException(e);
         }
-
-        try(FileHandle.Builder ibuilder = new FileHandle.Builder(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX))
-                                                     .mmapped(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap)
-                                                     .withChunkCache(ChunkCache.instance);
-            FileHandle.Builder dbuilder = new FileHandle.Builder(sstable.descriptor.filenameFor(Component.DATA)).compressed(sstable.compression)
-                                                     .mmapped(DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap)
-                                                     .withChunkCache(ChunkCache.instance))
-        {
-            if (!sstable.loadSummary())
-            {
-                try
-                {
-                    sstable.buildSummary(false, false, Downsampling.BASE_SAMPLING_LEVEL);
-                }
-                catch(IOException e)
-                {
-                    throw new CorruptSSTableException(e, sstable.getFilename());
-                }
-            }
-            long indexFileLength = new File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length();
-            int dataBufferSize = sstable.optimizationStrategy.bufferSize(statsMetadata.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
-            int indexBufferSize = sstable.optimizationStrategy.bufferSize(indexFileLength / sstable.indexSummary.size());
-            sstable.ifile = ibuilder.bufferSize(indexBufferSize).complete();
-            sstable.dfile = dbuilder.bufferSize(dataBufferSize).complete();
-            sstable.bf = FilterFactory.AlwaysPresent;
-            sstable.setup(false);
-            return sstable;
-        }
     }
 
     /**
@@ -531,19 +488,16 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
             System.exit(1);
         }
 
-        long fileLength = new File(descriptor.filenameFor(Component.DATA)).length();
-        logger.info("Opening {} ({})", descriptor, FBUtilities.prettyPrintMemory(fileLength));
-
-        final SSTableReader sstable;
+        SSTableReader sstable;
         try
         {
-            sstable = internalOpen(descriptor,
-                                   components,
-                                   metadata,
-                                   System.currentTimeMillis(),
-                                   statsMetadata,
-                                   OpenReason.NORMAL,
-                                   header.toHeader(metadata.get()));
+            sstable = new SSTableReaderBuilder.ForRead(descriptor,
+                                                       metadata,
+                                                       validationMetadata,
+                                                       isOffline,
+                                                       components,
+                                                       statsMetadata,
+                                                       header.toHeader(metadata.get())).build();
         }
         catch (UnknownColumnException e)
         {
@@ -552,12 +506,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
 
         try
         {
-            // load index and filter
-            long start = System.nanoTime();
-            sstable.load(validationMetadata, isOffline);
-            logger.trace("INDEX LOAD TIME for {}: {} ms.", descriptor, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
-
-            sstable.setup(!isOffline); // Don't track hotness if we're offline.
             if (validate)
                 sstable.validate();
 
@@ -626,42 +574,21 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
      * Open a RowIndexedReader which already has its state initialized (by SSTableWriter).
      */
     public static SSTableReader internalOpen(Descriptor desc,
-                                      Set<Component> components,
-                                      TableMetadataRef metadata,
-                                      FileHandle ifile,
-                                      FileHandle dfile,
-                                      IndexSummary isummary,
-                                      IFilter bf,
-                                      long maxDataAge,
-                                      StatsMetadata sstableMetadata,
-                                      OpenReason openReason,
-                                      SerializationHeader header)
-    {
-        assert desc != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null;
-
-        SSTableReader reader = internalOpen(desc, components, metadata, maxDataAge, sstableMetadata, openReason, header);
-
-        reader.bf = bf;
-        reader.ifile = ifile;
-        reader.dfile = dfile;
-        reader.indexSummary = isummary;
-        reader.setup(true);
-
-        return reader;
-    }
-
-
-    private static SSTableReader internalOpen(final Descriptor descriptor,
-                                              Set<Component> components,
-                                              TableMetadataRef metadata,
-                                              Long maxDataAge,
-                                              StatsMetadata sstableMetadata,
-                                              OpenReason openReason,
-                                              SerializationHeader header)
+                                             Set<Component> components,
+                                             TableMetadataRef metadata,
+                                             FileHandle ifile,
+                                             FileHandle dfile,
+                                             IndexSummary summary,
+                                             IFilter bf,
+                                             long maxDataAge,
+                                             StatsMetadata sstableMetadata,
+                                             OpenReason openReason,
+                                             SerializationHeader header)
     {
-        Factory readerFactory = descriptor.getFormat().getReaderFactory();
+        assert desc != null && ifile != null && dfile != null && summary != null && bf != null && sstableMetadata != null;
 
-        return readerFactory.open(descriptor, components, metadata, maxDataAge, sstableMetadata, openReason, header);
+        return new SSTableReaderBuilder.ForWriter(desc, metadata, maxDataAge, components, sstableMetadata, openReason, header)
+                .bf(bf).ifile(ifile).dfile(dfile).summary(summary).build();
     }
 
     /**
@@ -695,17 +622,40 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         }
     }
 
+    protected SSTableReader(SSTableReaderBuilder builder)
+    {
+        this(builder.descriptor,
+             builder.components,
+             builder.metadataRef,
+             builder.maxDataAge,
+             builder.statsMetadata,
+             builder.openReason,
+             builder.header,
+             builder.summary,
+             builder.dfile,
+             builder.ifile,
+             builder.bf);
+    }
+
     protected SSTableReader(final Descriptor desc,
                             Set<Component> components,
                             TableMetadataRef metadata,
                             long maxDataAge,
                             StatsMetadata sstableMetadata,
                             OpenReason openReason,
-                            SerializationHeader header)
+                            SerializationHeader header,
+                            IndexSummary summary,
+                            FileHandle dfile,
+                            FileHandle ifile,
+                            IFilter bf)
     {
         super(desc, components, metadata, DatabaseDescriptor.getDiskOptimizationStrategy());
         this.sstableMetadata = sstableMetadata;
         this.header = header;
+        this.indexSummary = summary;
+        this.dfile = dfile;
+        this.ifile = ifile;
+        this.bf = bf;
         this.maxDataAge = maxDataAge;
         this.openReason = openReason;
         this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata.get(), desc.version, header);
@@ -760,240 +710,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
     }
 
     /**
-     * See {@link #load(boolean, boolean)}
-     * @param validation Metadata for SSTable being loaded
-     * @param isOffline Whether the SSTable is being loaded by an offline tool (sstabledump, scrub, etc)
-     * @throws IOException
-     */
-    private void load(ValidationMetadata validation, boolean isOffline) throws IOException
-    {
-        if (metadata().params.bloomFilterFpChance == 1.0)
-        {
-            // bf is disabled.
-            load(false, !isOffline);
-            bf = FilterFactory.AlwaysPresent;
-        }
-        else if (!components.contains(Component.PRIMARY_INDEX)) // What happens if filter component and primary index is missing?
-        {
-            // avoid any reading of the missing primary index component.
-            // this should only happen during StandaloneScrubber
-            load(false, !isOffline);
-        }
-        else if (!components.contains(Component.FILTER) || validation == null)
-        {
-            // bf is enabled, but filter component is missing.
-            load(!isOffline, !isOffline);
-            if (isOffline)
-                bf = FilterFactory.AlwaysPresent;
-        }
-        else
-        {
-            // bf is enabled and fp chance matches the currently configured value.
-            load(false, !isOffline);
-            loadBloomFilter(descriptor.version.hasOldBfFormat());
-        }
-    }
-
-    /**
-     * Load bloom filter from Filter.db file.
-     *
-     * @throws IOException
-     * @param oldBfFormat
-     */
-    private void loadBloomFilter(boolean oldBfFormat) throws IOException
-    {
-        try (DataInputStream stream = new DataInputStream(new BufferedInputStream(Files.newInputStream(Paths.get(descriptor.filenameFor(Component.FILTER))))))
-        {
-            bf = BloomFilterSerializer.deserialize(stream, oldBfFormat);
-        }
-    }
-
-    /**
-     * Loads ifile, dfile and indexSummary, and optionally recreates and persists the bloom filter.
-     * @param recreateBloomFilter Recreate the bloomfilter.
-     * @param saveSummaryIfCreated for bulk loading purposes, if the summary was absent and needed to be built, you can
-     *                             avoid persisting it to disk by setting this to false
-     */
-    private void load(boolean recreateBloomFilter, boolean saveSummaryIfCreated) throws IOException
-    {
-        try(FileHandle.Builder ibuilder = new FileHandle.Builder(descriptor.filenameFor(Component.PRIMARY_INDEX))
-                                                     .mmapped(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap)
-                                                     .withChunkCache(ChunkCache.instance);
-            FileHandle.Builder dbuilder = new FileHandle.Builder(descriptor.filenameFor(Component.DATA)).compressed(compression)
-                                                     .mmapped(DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap)
-                                                     .withChunkCache(ChunkCache.instance))
-        {
-            boolean summaryLoaded = loadSummary();
-            boolean buildSummary = !summaryLoaded || recreateBloomFilter;
-            if (buildSummary)
-                buildSummary(recreateBloomFilter, summaryLoaded, Downsampling.BASE_SAMPLING_LEVEL);
-
-            int dataBufferSize = optimizationStrategy.bufferSize(sstableMetadata.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
-
-            if (components.contains(Component.PRIMARY_INDEX))
-            {
-                long indexFileLength = new File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length();
-                int indexBufferSize = optimizationStrategy.bufferSize(indexFileLength / indexSummary.size());
-                ifile = ibuilder.bufferSize(indexBufferSize).complete();
-            }
-
-            dfile = dbuilder.bufferSize(dataBufferSize).complete();
-
-            if (buildSummary)
-            {
-                if (saveSummaryIfCreated)
-                    saveSummary();
-                if (recreateBloomFilter)
-                    saveBloomFilter();
-            }
-        }
-        catch (Throwable t)
-        { // Because the tidier has not been set-up yet in SSTableReader.open(), we must release the files in case of error
-            if (ifile != null)
-            {
-                ifile.close();
-                ifile = null;
-            }
-
-            if (dfile != null)
-            {
-                dfile.close();
-                dfile = null;
-            }
-
-            if (indexSummary != null)
-            {
-                indexSummary.close();
-                indexSummary = null;
-            }
-
-            throw t;
-        }
-    }
-
-    /**
-     * Build index summary(and optionally bloom filter) by reading through Index.db file.
-     *
-     * @param recreateBloomFilter true if recreate bloom filter
-     * @param summaryLoaded true if index summary is already loaded and not need to build again
-     * @throws IOException
-     */
-    private void buildSummary(boolean recreateBloomFilter, boolean summaryLoaded, int samplingLevel) throws IOException
-    {
-        if (!components.contains(Component.PRIMARY_INDEX))
-            return;
-
-        if (logger.isDebugEnabled())
-            logger.debug("Attempting to build summary for {}", descriptor);
-
-        // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
-        try (RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))))
-        {
-            long indexSize = primaryIndex.length();
-            long histogramCount = sstableMetadata.estimatedPartitionSize.count();
-            long estimatedKeys = histogramCount > 0 && !sstableMetadata.estimatedPartitionSize.isOverflowed()
-                    ? histogramCount
-                    : estimateRowsFromIndex(primaryIndex); // statistics is supposed to be optional
-
-            if (recreateBloomFilter)
-                bf = FilterFactory.getFilter(estimatedKeys, metadata().params.bloomFilterFpChance);
-
-            try (IndexSummaryBuilder summaryBuilder = summaryLoaded ? null : new IndexSummaryBuilder(estimatedKeys, metadata().params.minIndexInterval, samplingLevel))
-            {
-                long indexPosition;
-
-                while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
-                {
-                    ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
-                    RowIndexEntry.Serializer.skip(primaryIndex, descriptor.version);
-                    DecoratedKey decoratedKey = decorateKey(key);
-                    if (first == null)
-                        first = decoratedKey;
-                    last = decoratedKey;
-
-                    if (recreateBloomFilter)
-                        bf.add(decoratedKey);
-
-                    // if summary was already read from disk we don't want to re-populate it using primary index
-                    if (!summaryLoaded)
-                    {
-                        summaryBuilder.maybeAddEntry(decoratedKey, indexPosition);
-                    }
-                }
-
-                if (!summaryLoaded)
-                    indexSummary = summaryBuilder.build(getPartitioner());
-            }
-        }
-
-        first = getMinimalKey(first);
-        last = getMinimalKey(last);
-    }
-
-    /**
-     * Load index summary from Summary.db file if it exists.
-     *
-     * if loaded index summary has different index interval from current value stored in schema,
-     * then Summary.db file will be deleted and this returns false to rebuild summary.
-     *
-     * @return true if index summary is loaded successfully from Summary.db file.
-     */
-    @SuppressWarnings("resource")
-    public boolean loadSummary()
-    {
-        File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
-        if (!summariesFile.exists())
-        {
-            if (logger.isDebugEnabled())
-                logger.debug("SSTable Summary File {} does not exist", summariesFile.getAbsolutePath());
-            return false;
-        }
-
-        DataInputStream iStream = null;
-        try
-        {
-            TableMetadata metadata = metadata();
-            iStream = new DataInputStream(Files.newInputStream(summariesFile.toPath()));
-            indexSummary = IndexSummary.serializer.deserialize(
-                    iStream, getPartitioner(),
-                    metadata.params.minIndexInterval, metadata.params.maxIndexInterval);
-            first = decorateKey(ByteBufferUtil.readWithLength(iStream));
-            last = decorateKey(ByteBufferUtil.readWithLength(iStream));
-        }
-        catch (IOException e)
-        {
-            if (indexSummary != null)
-                indexSummary.close();
-            logger.trace("Cannot deserialize SSTable Summary File {}: {}", summariesFile.getPath(), e.getMessage());
-            // corrupted; delete it and fall back to creating a new summary
-            FileUtils.closeQuietly(iStream);
-            // delete it and fall back to creating a new summary
-            FileUtils.deleteWithConfirm(summariesFile);
-            return false;
-        }
-        finally
-        {
-            FileUtils.closeQuietly(iStream);
-        }
-
-        return true;
-    }
-
-    /**
-     * Save index summary to Summary.db file.
-     */
-
-    public void saveSummary()
-    {
-        saveSummary(this.descriptor, this.first, this.last, indexSummary);
-    }
-
-    private void saveSummary(IndexSummary newSummary)
-    {
-        saveSummary(this.descriptor, this.first, this.last, newSummary);
-    }
-
-    /**
      * Save index summary to Summary.db file.
      */
     public static void saveSummary(Descriptor descriptor, DecoratedKey first, DecoratedKey last, IndexSummary summary)
@@ -1002,7 +718,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         if (summariesFile.exists())
             FileUtils.deleteWithConfirm(summariesFile);
 
-        try (DataOutputStreamPlus oStream = new BufferedDataOutputStreamPlus(new FileOutputStream(summariesFile));)
+        try (DataOutputStreamPlus oStream = new BufferedDataOutputStreamPlus(new FileOutputStream(summariesFile)))
         {
             IndexSummary.serializer.serialize(summary, oStream);
             ByteBufferUtil.writeWithLength(first.getKey(), oStream);
@@ -1018,11 +734,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         }
     }
 
-    public void saveBloomFilter()
-    {
-        saveBloomFilter(this.descriptor, bf);
-    }
-
     public static void saveBloomFilter(Descriptor descriptor, IFilter filter)
     {
         File filterFile = new File(descriptor.filenameFor(Component.FILTER));
@@ -1042,6 +753,19 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
 
     }
 
+    /**
+     * Execute provided task with sstable lock to avoid racing with index summary redistribution, SEE CASSANDRA-15861.
+     *
+     * @param task to be guarded by sstable lock
+     */
+    public <R> R runWithLock(CheckedFunction<Descriptor, R, IOException> task) throws IOException
+    {
+        synchronized (tidy.global)
+        {
+            return task.apply(descriptor);
+        }
+    }
+
     public void setReplaced()
     {
         synchronized (tidy.global)
@@ -1131,12 +855,41 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                                                  sstableMetadata,
                                                  reason,
                                                  header);
+
         replacement.first = newFirst;
         replacement.last = last;
         replacement.isSuspect.set(isSuspect.get());
         return replacement;
     }
 
+    /**
+     * Clone this reader with the new values and set the clone as replacement.
+     *
+     * @param newBloomFilter for the replacement
+     *
+     * @return the cloned reader. That reader is set as a replacement by the method.
+     */
+    @VisibleForTesting
+    public SSTableReader cloneAndReplace(IFilter newBloomFilter)
+    {
+        SSTableReader replacement = internalOpen(descriptor,
+                                                 components,
+                                                 metadata,
+                                                 ifile.sharedCopy(),
+                                                 dfile.sharedCopy(),
+                                                 indexSummary,
+                                                 newBloomFilter,
+                                                 maxDataAge,
+                                                 sstableMetadata,
+                                                 openReason,
+                                                 header);
+
+        replacement.first = first;
+        replacement.last = last;
+        replacement.isSuspect.set(isSuspect.get());
+        return replacement;
+    }
+
     public SSTableReader cloneWithRestoredStart(DecoratedKey restoredStart)
     {
         synchronized (tidy.global)
@@ -1202,39 +955,38 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
     @SuppressWarnings("resource")
     public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore parent, int samplingLevel) throws IOException
     {
-        synchronized (tidy.global)
-        {
-            assert openReason != OpenReason.EARLY;
+        assert openReason != OpenReason.EARLY;
 
-            int minIndexInterval = metadata().params.minIndexInterval;
-            int maxIndexInterval = metadata().params.maxIndexInterval;
-            double effectiveInterval = indexSummary.getEffectiveIndexInterval();
+        int minIndexInterval = metadata().params.minIndexInterval;
+        int maxIndexInterval = metadata().params.maxIndexInterval;
+        double effectiveInterval = indexSummary.getEffectiveIndexInterval();
 
-            IndexSummary newSummary;
+        IndexSummary newSummary;
 
-            // We have to rebuild the summary from the on-disk primary index in three cases:
-            // 1. The sampling level went up, so we need to read more entries off disk
-            // 2. The min_index_interval changed (in either direction); this changes what entries would be in the summary
-            //    at full sampling (and consequently at any other sampling level)
-            // 3. The max_index_interval was lowered, forcing us to raise the sampling level
-            if (samplingLevel > indexSummary.getSamplingLevel() || indexSummary.getMinIndexInterval() != minIndexInterval || effectiveInterval > maxIndexInterval)
-            {
-                newSummary = buildSummaryAtLevel(samplingLevel);
-            }
-            else if (samplingLevel < indexSummary.getSamplingLevel())
-            {
-                // we can use the existing index summary to make a smaller one
-                newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, getPartitioner());
-            }
-            else
-            {
-                throw new AssertionError("Attempted to clone SSTableReader with the same index summary sampling level and " +
-                        "no adjustments to min/max_index_interval");
-            }
-
-            // Always save the resampled index
-            saveSummary(newSummary);
+        // We have to rebuild the summary from the on-disk primary index in three cases:
+        // 1. The sampling level went up, so we need to read more entries off disk
+        // 2. The min_index_interval changed (in either direction); this changes what entries would be in the summary
+        //    at full sampling (and consequently at any other sampling level)
+        // 3. The max_index_interval was lowered, forcing us to raise the sampling level
+        if (samplingLevel > indexSummary.getSamplingLevel() || indexSummary.getMinIndexInterval() != minIndexInterval || effectiveInterval > maxIndexInterval)
+        {
+            newSummary = buildSummaryAtLevel(samplingLevel);
+        }
+        else if (samplingLevel < indexSummary.getSamplingLevel())
+        {
+            // we can use the existing index summary to make a smaller one
+            newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, getPartitioner());
+        }
+        else
+        {
+            throw new AssertionError("Attempted to clone SSTableReader with the same index summary sampling level and " +
+                    "no adjustments to min/max_index_interval");
+        }
 
+        // Always save the resampled index with lock to avoid racing with entire-sstable streaming
+        synchronized (tidy.global)
+        {
+            saveSummary(descriptor, first, last, newSummary);
             return cloneAndReplace(first, OpenReason.METADATA_CHANGE, newSummary);
         }
     }
@@ -1292,7 +1044,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
     public void releaseSummary()
     {
         tidy.releaseSummary();
-        indexSummary = null;
     }
 
     private void validate()
@@ -1365,14 +1116,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         return getCompressionMetadata().offHeapSize();
     }
 
-    /**
-     * For testing purposes only.
-     */
-    public void forceFilterFailures()
-    {
-        bf = FilterFactory.AlwaysPresent;
-    }
-
     public IFilter getBloomFilter()
     {
         return bf;
@@ -2068,6 +1811,30 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
     }
 
     /**
+     * Mutate sstable level with a lock to avoid racing with entire-sstable-streaming and then reload sstable metadata
+     */
+    public void mutateLevelAndReload(int newLevel) throws IOException
+    {
+        synchronized (tidy.global)
+        {
+            descriptor.getMetadataSerializer().mutateLevel(descriptor, newLevel);
+            reloadSSTableMetadata();
+        }
+    }
+
+    /**
+     * Mutate sstable repair metadata with a lock to avoid racing with entire-sstable-streaming and then reload sstable metadata
+     */
+    public void mutateRepairedAndReload(long newRepairedAt, UUID newPendingRepair, boolean isTransient) throws IOException
+    {
+        synchronized (tidy.global)
+        {
+            descriptor.getMetadataSerializer().mutateRepairMetadata(descriptor, newRepairedAt, newPendingRepair, isTransient);
+            reloadSSTableMetadata();
+        }
+    }
+
+    /**
      * Reloads the sstable metadata from disk.
      *
      * Called after level is changed on sstable, for example if the sstable is dropped to L0
@@ -2320,7 +2087,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
      */
     static final class GlobalTidy implements Tidy
     {
-        static WeakReference<ScheduledFuture<?>> NULL = new WeakReference<>(null);
+        static final WeakReference<ScheduledFuture<?>> NULL = new WeakReference<>(null);
         // keyed by descriptor, mapping to the shared GlobalTidy for that descriptor
         static final ConcurrentMap<Descriptor, Ref<GlobalTidy>> lookup = new ConcurrentHashMap<>();
 
@@ -2424,14 +2191,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
 
     public static abstract class Factory
     {
-        public abstract SSTableReader open(final Descriptor descriptor,
-                                           Set<Component> components,
-                                           TableMetadataRef metadata,
-                                           Long maxDataAge,
-                                           StatsMetadata sstableMetadata,
-                                           OpenReason openReason,
-                                           SerializationHeader header);
-
+        public abstract SSTableReader open(SSTableReaderBuilder builder);
     }
 
     public static class PartitionPositionBounds
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReaderBuilder.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReaderBuilder.java
new file mode 100644
index 0000000..8fe1def
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReaderBuilder.java
@@ -0,0 +1,475 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable.format;
+
+import org.apache.cassandra.cache.ChunkCache;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.sstable.metadata.ValidationMetadata;
+import org.apache.cassandra.io.util.DiskOptimizationStrategy;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.utils.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public abstract class SSTableReaderBuilder
+{
+    private static final Logger logger = LoggerFactory.getLogger(SSTableReaderBuilder.class);
+
+    protected final SSTableReader.Factory readerFactory;
+    protected final Descriptor descriptor;
+    protected final TableMetadataRef metadataRef;
+    protected final TableMetadata metadata;
+    protected final long maxDataAge;
+    protected final Set<Component> components;
+    protected final StatsMetadata statsMetadata;
+    protected final SSTableReader.OpenReason openReason;
+    protected final SerializationHeader header;
+
+    protected IndexSummary summary;
+    protected DecoratedKey first;
+    protected DecoratedKey last;
+    protected IFilter bf;
+    protected FileHandle ifile;
+    protected FileHandle dfile;
+
+    public SSTableReaderBuilder(Descriptor descriptor,
+                                TableMetadataRef metadataRef,
+                                long maxDataAge,
+                                Set<Component> components,
+                                StatsMetadata statsMetadata,
+                                SSTableReader.OpenReason openReason,
+                                SerializationHeader header)
+    {
+        this.descriptor = descriptor;
+        this.metadataRef = metadataRef;
+        this.metadata = metadataRef.get();
+        this.maxDataAge = maxDataAge;
+        this.components = components;
+        this.statsMetadata = statsMetadata;
+        this.openReason = openReason;
+        this.header = header;
+        this.readerFactory = descriptor.getFormat().getReaderFactory();
+    }
+
+    public abstract SSTableReader build();
+
+    public SSTableReaderBuilder dfile(FileHandle dfile)
+    {
+        this.dfile = dfile;
+        return this;
+    }
+
+    public SSTableReaderBuilder ifile(FileHandle ifile)
+    {
+        this.ifile = ifile;
+        return this;
+    }
+
+    public SSTableReaderBuilder bf(IFilter bf)
+    {
+        this.bf = bf;
+        return this;
+    }
+
+    public SSTableReaderBuilder summary(IndexSummary summary)
+    {
+        this.summary = summary;
+        return this;
+    }
+
+    /**
+     * Load index summary, first key and last key from Summary.db file if it exists.
+     *
+     * if loaded index summary has different index interval from current value stored in schema,
+     * then Summary.db file will be deleted and need to be rebuilt.
+     */
+    void loadSummary()
+    {
+        File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
+        if (!summariesFile.exists())
+        {
+            if (logger.isDebugEnabled())
+                logger.debug("SSTable Summary File {} does not exist", summariesFile.getAbsolutePath());
+            return;
+        }
+
+        DataInputStream iStream = null;
+        try
+        {
+            iStream = new DataInputStream(Files.newInputStream(summariesFile.toPath()));
+            summary = IndexSummary.serializer.deserialize(iStream,
+                                                          metadata.partitioner,
+                                                          metadata.params.minIndexInterval,
+                                                          metadata.params.maxIndexInterval);
+            first = metadata.partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
+            last = metadata.partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
+        }
+        catch (IOException e)
+        {
+            if (summary != null)
+                summary.close();
+            logger.trace("Cannot deserialize SSTable Summary File {}: {}", summariesFile.getPath(), e.getMessage());
+            // corrupted; delete it and fall back to creating a new summary
+            FileUtils.closeQuietly(iStream);
+            // delete it and fall back to creating a new summary
+            FileUtils.deleteWithConfirm(summariesFile);
+        }
+        finally
+        {
+            FileUtils.closeQuietly(iStream);
+        }
+    }
+
+    /**
+     * Build index summary, first key, last key if {@code summaryLoaded} is false and recreate bloom filter if
+     * {@code recreteBloomFilter} is true by reading through Index.db file.
+     *
+     * @param recreateBloomFilter true if recreate bloom filter
+     * @param summaryLoaded true if index summary, first key and last key are already loaded and not need to build again
+     */
+    void buildSummaryAndBloomFilter(boolean recreateBloomFilter,
+                                    boolean summaryLoaded,
+                                    Set<Component> components,
+                                    StatsMetadata statsMetadata) throws IOException
+    {
+        if (!components.contains(Component.PRIMARY_INDEX))
+            return;
+
+        if (logger.isDebugEnabled())
+            logger.debug("Attempting to build summary for {}", descriptor);
+
+
+        // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
+        try (RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))))
+        {
+            long indexSize = primaryIndex.length();
+            long histogramCount = statsMetadata.estimatedPartitionSize.count();
+            long estimatedKeys = histogramCount > 0 && !statsMetadata.estimatedPartitionSize.isOverflowed()
+                                 ? histogramCount
+                                 : SSTable.estimateRowsFromIndex(primaryIndex, descriptor); // statistics is supposed to be optional
+
+            if (recreateBloomFilter)
+                bf = FilterFactory.getFilter(estimatedKeys, metadata.params.bloomFilterFpChance);
+
+            try (IndexSummaryBuilder summaryBuilder = summaryLoaded ? null : new IndexSummaryBuilder(estimatedKeys, metadata.params.minIndexInterval, Downsampling.BASE_SAMPLING_LEVEL))
+            {
+                long indexPosition;
+
+                while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
+                {
+                    ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
+                    RowIndexEntry.Serializer.skip(primaryIndex, descriptor.version);
+                    DecoratedKey decoratedKey = metadata.partitioner.decorateKey(key);
+
+                    if (!summaryLoaded)
+                    {
+                        if (first == null)
+                            first = decoratedKey;
+                        last = decoratedKey;
+                    }
+
+                    if (recreateBloomFilter)
+                        bf.add(decoratedKey);
+
+                    // if summary was already read from disk we don't want to re-populate it using primary index
+                    if (!summaryLoaded)
+                    {
+                        summaryBuilder.maybeAddEntry(decoratedKey, indexPosition);
+                    }
+                }
+
+                if (!summaryLoaded)
+                    summary = summaryBuilder.build(metadata.partitioner);
+            }
+        }
+
+        if (!summaryLoaded)
+        {
+            first = SSTable.getMinimalKey(first);
+            last = SSTable.getMinimalKey(last);
+        }
+    }
+
+    /**
+     * Load bloom filter from Filter.db file.
+     *
+     * @throws IOException
+     */
+    IFilter loadBloomFilter() throws IOException
+    {
+        try (DataInputStream stream = new DataInputStream(new BufferedInputStream(Files.newInputStream(Paths.get(descriptor.filenameFor(Component.FILTER))))))
+        {
+            return BloomFilterSerializer.deserialize(stream, descriptor.version.hasOldBfFormat());
+        }
+    }
+
+    public static class ForWriter extends SSTableReaderBuilder
+    {
+        public ForWriter(Descriptor descriptor,
+                         TableMetadataRef metadataRef,
+                         long maxDataAge,
+                         Set<Component> components,
+                         StatsMetadata statsMetadata,
+                         SSTableReader.OpenReason openReason,
+                         SerializationHeader header)
+        {
+            super(descriptor, metadataRef, maxDataAge, components, statsMetadata, openReason, header);
+        }
+
+        @Override
+        public SSTableReader build()
+        {
+            SSTableReader reader = readerFactory.open(this);
+
+            reader.setup(true);
+            return reader;
+        }
+    }
+
+    public static class ForBatch extends SSTableReaderBuilder
+    {
+        public ForBatch(Descriptor descriptor,
+                        TableMetadataRef metadataRef,
+                        Set<Component> components,
+                        StatsMetadata statsMetadata,
+                        SerializationHeader header)
+        {
+            super(descriptor, metadataRef, System.currentTimeMillis(), components, statsMetadata, SSTableReader.OpenReason.NORMAL, header);
+        }
+
+        @Override
+        public SSTableReader build()
+        {
+            String dataFilePath = descriptor.filenameFor(Component.DATA);
+            long fileLength = new File(dataFilePath).length();
+            logger.info("Opening {} ({})", descriptor, FBUtilities.prettyPrintMemory(fileLength));
+
+            initSummary(dataFilePath, components, statsMetadata);
+
+            boolean compression = components.contains(Component.COMPRESSION_INFO);
+            try (FileHandle.Builder ibuilder = new FileHandle.Builder(descriptor.filenameFor(Component.PRIMARY_INDEX))
+                    .mmapped(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap)
+                    .withChunkCache(ChunkCache.instance);
+                    FileHandle.Builder dbuilder = new FileHandle.Builder(descriptor.filenameFor(Component.DATA)).compressed(compression)
+                                                                                                                .mmapped(DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap)
+                                                                                                                .withChunkCache(ChunkCache.instance))
+            {
+                long indexFileLength = new File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length();
+                DiskOptimizationStrategy optimizationStrategy = DatabaseDescriptor.getDiskOptimizationStrategy();
+                int dataBufferSize = optimizationStrategy.bufferSize(statsMetadata.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
+                int indexBufferSize = optimizationStrategy.bufferSize(indexFileLength / summary.size());
+                ifile = ibuilder.bufferSize(indexBufferSize).complete();
+                dfile = dbuilder.bufferSize(dataBufferSize).complete();
+                bf = FilterFactory.AlwaysPresent;
+
+                SSTableReader sstable = readerFactory.open(this);
+
+                sstable.first = first;
+                sstable.last = last;
+
+                sstable.setup(false);
+                return sstable;
+            }
+        }
+
+        void initSummary(String dataFilePath, Set<Component> components, StatsMetadata statsMetadata)
+        {
+            loadSummary();
+            if (summary == null)
+            {
+                try
+                {
+                    buildSummaryAndBloomFilter(false, false, components, statsMetadata);
+                }
+                catch (IOException e)
+                {
+                    throw new CorruptSSTableException(e, dataFilePath);
+                }
+            }
+        }
+    }
+
+    public static class ForRead extends SSTableReaderBuilder
+    {
+        private final ValidationMetadata validationMetadata;
+        private final boolean isOffline;
+
+        public ForRead(Descriptor descriptor,
+                       TableMetadataRef metadataRef,
+                       ValidationMetadata validationMetadata,
+                       boolean isOffline,
+                       Set<Component> components,
+                       StatsMetadata statsMetadata,
+                       SerializationHeader header)
+        {
+            super(descriptor, metadataRef, System.currentTimeMillis(), components, statsMetadata, SSTableReader.OpenReason.NORMAL, header);
+            this.validationMetadata = validationMetadata;
+            this.isOffline = isOffline;
+        }
+
+        @Override
+        public SSTableReader build()
+        {
+            String dataFilePath = descriptor.filenameFor(Component.DATA);
+            long fileLength = new File(dataFilePath).length();
+            logger.info("Opening {} ({})", descriptor, FBUtilities.prettyPrintMemory(fileLength));
+
+            try
+            {
+                // load index and filter
+                long start = System.nanoTime();
+                load(validationMetadata, isOffline, components, DatabaseDescriptor.getDiskOptimizationStrategy(), statsMetadata);
+                logger.trace("INDEX LOAD TIME for {}: {} ms.", descriptor, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+            }
+            catch (IOException t)
+            {
+                throw new CorruptSSTableException(t, dataFilePath);
+            }
+
+            SSTableReader sstable = readerFactory.open(this);
+
+            sstable.first = first;
+            sstable.last = last;
+
+            sstable.setup(!isOffline); // Don't track hotness if we're offline.
+            return sstable;
+        }
+
+        /**
+         * @param validation Metadata for SSTable being loaded
+         * @param isOffline Whether the SSTable is being loaded by an offline tool (sstabledump, scrub, etc)
+         */
+        private void load(ValidationMetadata validation,
+                          boolean isOffline,
+                          Set<Component> components,
+                          DiskOptimizationStrategy optimizationStrategy,
+                          StatsMetadata statsMetadata) throws IOException
+        {
+            if (metadata.params.bloomFilterFpChance == 1.0)
+            {
+                // bf is disabled.
+                load(false, !isOffline, optimizationStrategy, statsMetadata, components);
+                bf = FilterFactory.AlwaysPresent;
+            }
+            else if (!components.contains(Component.PRIMARY_INDEX)) // What happens if filter component and primary index is missing?
+            {
+                // avoid any reading of the missing primary index component.
+                // this should only happen during StandaloneScrubber
+                load(false, !isOffline, optimizationStrategy, statsMetadata, components);
+            }
+            else if (!components.contains(Component.FILTER) || validation == null)
+            {
+                // bf is enabled, but filter component is missing.
+                load(!isOffline, !isOffline, optimizationStrategy, statsMetadata, components);
+                if (isOffline)
+                    bf = FilterFactory.AlwaysPresent;
+            }
+            else
+            {
+                // bf is enabled and fp chance matches the currently configured value.
+                load(false, !isOffline, optimizationStrategy, statsMetadata, components);
+                bf = loadBloomFilter();
+            }
+        }
+
+        /**
+         * Loads ifile, dfile and indexSummary, and optionally recreates and persists the bloom filter.
+         * @param recreateBloomFilter Recreate the bloomfilter.
+         * @param saveSummaryIfCreated for bulk loading purposes, if the summary was absent and needed to be built, you can
+         *                             avoid persisting it to disk by setting this to false
+         */
+        void load(boolean recreateBloomFilter,
+                  boolean saveSummaryIfCreated,
+                  DiskOptimizationStrategy optimizationStrategy,
+                  StatsMetadata statsMetadata,
+                  Set<Component> components) throws IOException
+        {
+            try(FileHandle.Builder ibuilder = new FileHandle.Builder(descriptor.filenameFor(Component.PRIMARY_INDEX))
+                    .mmapped(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap)
+                    .withChunkCache(ChunkCache.instance);
+                    FileHandle.Builder dbuilder = new FileHandle.Builder(descriptor.filenameFor(Component.DATA)).compressed(components.contains(Component.COMPRESSION_INFO))
+                                                                                                                .mmapped(DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap)
+                                                                                                                .withChunkCache(ChunkCache.instance))
+            {
+                loadSummary();
+                boolean buildSummary = summary == null || recreateBloomFilter;
+                if (buildSummary)
+                    buildSummaryAndBloomFilter(recreateBloomFilter, summary != null, components, statsMetadata);
+
+                int dataBufferSize = optimizationStrategy.bufferSize(statsMetadata.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
+
+                if (components.contains(Component.PRIMARY_INDEX))
+                {
+                    long indexFileLength = new File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length();
+                    int indexBufferSize = optimizationStrategy.bufferSize(indexFileLength / summary.size());
+                    ifile = ibuilder.bufferSize(indexBufferSize).complete();
+                }
+
+                dfile = dbuilder.bufferSize(dataBufferSize).complete();
+
+                if (buildSummary)
+                {
+                    if (saveSummaryIfCreated)
+                        SSTableReader.saveSummary(descriptor, first, last, summary);
+                    if (recreateBloomFilter)
+                        SSTableReader.saveBloomFilter(descriptor, bf);
+                }
+            }
+            catch (Throwable t)
+            { // Because the tidier has not been set-up yet in SSTableReader.open(), we must release the files in case of error
+                if (ifile != null)
+                {
+                    ifile.close();
+                }
+
+                if (dfile != null)
+                {
+                    dfile.close();
+                }
+
+                if (summary != null)
+                {
+                    summary.close();
+                }
+
+                throw t;
+            }
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index 448808c..6f3ba39 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -18,22 +18,17 @@
 package org.apache.cassandra.io.sstable.format.big;
 
 import java.util.Collection;
-import java.util.Set;
 import java.util.UUID;
 
-import com.google.common.base.Preconditions;
-
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
-import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.*;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.net.MessagingService;
 
 /**
@@ -103,9 +98,9 @@ public class BigFormat implements SSTableFormat
     static class ReaderFactory extends SSTableReader.Factory
     {
         @Override
-        public SSTableReader open(Descriptor descriptor, Set<Component> components, TableMetadataRef metadata, Long maxDataAge, StatsMetadata sstableMetadata, SSTableReader.OpenReason openReason, SerializationHeader header)
+        public SSTableReader open(SSTableReaderBuilder builder)
         {
-            return new BigTableReader(descriptor, components, metadata, maxDataAge, sstableMetadata, openReason, header);
+            return new BigTableReader(builder);
         }
     }
 
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index 03d7562..a333df0 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import org.apache.cassandra.io.sstable.format.SSTableReaderBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,9 +40,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
 import org.apache.cassandra.io.sstable.format.SSTableReadsListener.SelectionReason;
 import org.apache.cassandra.io.sstable.format.SSTableReadsListener.SkippingReason;
-import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -53,9 +52,9 @@ public class BigTableReader extends SSTableReader
 {
     private static final Logger logger = LoggerFactory.getLogger(BigTableReader.class);
 
-    BigTableReader(Descriptor desc, Set<Component> components, TableMetadataRef metadata, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason, SerializationHeader header)
+    BigTableReader(SSTableReaderBuilder builder)
     {
-        super(desc, components, metadata, maxDataAge, sstableMetadata, openReason, header);
+        super(builder);
     }
 
     public UnfilteredRowIterator iterator(DecoratedKey key,
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 04ab08a..eb98662 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -333,8 +333,13 @@ public class BigTableWriter extends SSTableWriter
         invalidateCacheAtBoundary(dfile);
         SSTableReader sstable = SSTableReader.internalOpen(descriptor,
                                                            components, metadata,
-                                                           ifile, dfile, indexSummary,
-                                                           iwriter.bf.sharedCopy(), maxDataAge, stats, SSTableReader.OpenReason.EARLY, header);
+                                                           ifile, dfile,
+                                                           indexSummary,
+                                                           iwriter.bf.sharedCopy(), 
+                                                           maxDataAge, 
+                                                           stats, 
+                                                           SSTableReader.OpenReason.EARLY, 
+                                                           header);
 
         // now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed)
         sstable.first = getMinimalKey(first);
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
index db9f161..fc1ce42 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
@@ -21,9 +21,10 @@ import java.io.IOException;
 import java.util.EnumSet;
 import java.util.Map;
 import java.util.UUID;
-import java.util.function.Function;
+import java.util.function.UnaryOperator;
 
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.DataOutputPlus;
 
@@ -65,15 +66,22 @@ public interface IMetadataSerializer
     /**
      * Mutate SSTable Metadata
      *
+     * NOTE: mutating stats metadata of a live sstable will race with entire-sstable-streaming, please use
+     * {@link SSTableReader#mutateLevelAndReload} instead on live sstable.
+     *
      * @param descriptor SSTable descriptor
+     * @param description on changed attributions
      * @param transform function to mutate sstable metadata
      * @throws IOException
      */
-    public void mutate(Descriptor descriptor, Function<StatsMetadata, StatsMetadata> transform) throws IOException;
+    public void mutate(Descriptor descriptor, String description, UnaryOperator<StatsMetadata> transform) throws IOException;
 
     /**
      * Mutate SSTable level
      *
+     * NOTE: mutating stats metadata of a live sstable will race with entire-sstable-streaming, please use
+     * {@link SSTableReader#mutateLevelAndReload} instead on live sstable.
+     *
      * @param descriptor SSTable descriptor
      * @param newLevel new SSTable level
      * @throws IOException
@@ -81,7 +89,10 @@ public interface IMetadataSerializer
     void mutateLevel(Descriptor descriptor, int newLevel) throws IOException;
 
     /**
-     * Mutate the repairedAt time, pendingRepair ID, and transient status
+     * Mutate the repairedAt time, pendingRepair ID, and transient status.
+     *
+     * NOTE: mutating stats metadata of a live sstable will race with entire-sstable-streaming, please use
+     * {@link SSTableReader#mutateLevelAndReload} instead on live sstable.
      */
     public void mutateRepairMetadata(Descriptor descriptor, long newRepairedAt, UUID newPendingRepair, boolean isTransient) throws IOException;
 
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
index d886338..042103e 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.io.sstable.metadata;
 
 import java.io.*;
 import java.util.*;
-import java.util.function.Function;
+import java.util.function.UnaryOperator;
 import java.util.zip.CRC32;
 
 import com.google.common.base.Throwables;
@@ -114,7 +114,7 @@ public class MetadataSerializer implements IMetadataSerializer
             out.writeInt((int) crc.getValue());
     }
 
-    public Map<MetadataType, MetadataComponent> deserialize( Descriptor descriptor, EnumSet<MetadataType> types) throws IOException
+    public Map<MetadataType, MetadataComponent> deserialize(Descriptor descriptor, EnumSet<MetadataType> types) throws IOException
     {
         Map<MetadataType, MetadataComponent> components;
         logger.trace("Load metadata for {}", descriptor);
@@ -224,15 +224,15 @@ public class MetadataSerializer implements IMetadataSerializer
     }
 
     @Override
-    public void mutate(Descriptor descriptor, Function<StatsMetadata, StatsMetadata> transform) throws IOException
+    public void mutate(Descriptor descriptor, String description, UnaryOperator<StatsMetadata> transform) throws IOException
     {
-        Map<MetadataType, MetadataComponent> currentComponents = deserialize(descriptor, EnumSet.allOf(MetadataType.class));
-        StatsMetadata stats = (StatsMetadata) currentComponents.remove(MetadataType.STATS);
+        if (logger.isTraceEnabled() )
+            logger.trace("Mutating {} to {}", descriptor.filenameFor(Component.STATS), description);
 
-        currentComponents.put(MetadataType.STATS, transform.apply(stats));
-        rewriteSSTableMetadata(descriptor, currentComponents);
+        mutate(descriptor, transform);
     }
 
+    @Override
     public void mutateLevel(Descriptor descriptor, int newLevel) throws IOException
     {
         if (logger.isTraceEnabled())
@@ -241,6 +241,7 @@ public class MetadataSerializer implements IMetadataSerializer
         mutate(descriptor, stats -> stats.mutateLevel(newLevel));
     }
 
+    @Override
     public void mutateRepairMetadata(Descriptor descriptor, long newRepairedAt, UUID newPendingRepair, boolean isTransient) throws IOException
     {
         if (logger.isTraceEnabled())
@@ -250,6 +251,15 @@ public class MetadataSerializer implements IMetadataSerializer
         mutate(descriptor, stats -> stats.mutateRepairedMetadata(newRepairedAt, newPendingRepair, isTransient));
     }
 
+    private void mutate(Descriptor descriptor, UnaryOperator<StatsMetadata> transform) throws IOException
+    {
+        Map<MetadataType, MetadataComponent> currentComponents = deserialize(descriptor, EnumSet.allOf(MetadataType.class));
+        StatsMetadata stats = (StatsMetadata) currentComponents.remove(MetadataType.STATS);
+
+        currentComponents.put(MetadataType.STATS, transform.apply(stats));
+        rewriteSSTableMetadata(descriptor, currentComponents);
+    }
+
     public void rewriteSSTableMetadata(Descriptor descriptor, Map<MetadataType, MetadataComponent> currentComponents) throws IOException
     {
         String filePath = descriptor.tmpFilenameFor(Component.STATS);
diff --git a/src/java/org/apache/cassandra/net/BufferPoolAllocator.java b/src/java/org/apache/cassandra/net/BufferPoolAllocator.java
index 8782c03..224f690 100644
--- a/src/java/org/apache/cassandra/net/BufferPoolAllocator.java
+++ b/src/java/org/apache/cassandra/net/BufferPoolAllocator.java
@@ -30,7 +30,7 @@ import org.apache.cassandra.utils.memory.BufferPool;
  * A trivial wrapper around BufferPool for integrating with Netty, but retaining ownership of pooling behaviour
  * that is integrated into Cassandra's other pooling.
  */
-abstract class BufferPoolAllocator extends AbstractByteBufAllocator
+public abstract class BufferPoolAllocator extends AbstractByteBufAllocator
 {
     BufferPoolAllocator()
     {
diff --git a/src/java/org/apache/cassandra/streaming/OutgoingStream.java b/src/java/org/apache/cassandra/streaming/OutgoingStream.java
index 4a58cae..546462d 100644
--- a/src/java/org/apache/cassandra/streaming/OutgoingStream.java
+++ b/src/java/org/apache/cassandra/streaming/OutgoingStream.java
@@ -47,7 +47,12 @@ public interface OutgoingStream
     UUID getPendingRepair();
 
     String getName();
-    long getSize();
+
+    /**
+     * @return estimated file size to be streamed. This should only be used for metrics, because concurrent
+     * stats metadata update and index redistribution will change file sizes.
+     */
+    long getEstimatedSize();
     TableId getTableId();
     int getNumFiles();
 }
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 0281952..ff3ff5a 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -763,7 +763,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
      */
     public void streamSent(OutgoingStreamMessage message)
     {
-        long headerSize = message.stream.getSize();
+        long headerSize = message.stream.getEstimatedSize();
         StreamingMetrics.totalOutgoingBytes.inc(headerSize);
         metrics.outgoingBytes.inc(headerSize);
         // schedule timeout for receiving ACK
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index 0f7a834..d12ecfd 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -70,7 +70,7 @@ public class StreamTransferTask extends StreamTask
         OutgoingStreamMessage message = new OutgoingStreamMessage(tableId, session, stream, sequenceNumber.getAndIncrement());
         message = StreamHook.instance.reportOutgoingStream(session, stream, message);
         streams.put(message.header.sequenceNumber, message);
-        totalSize += message.stream.getSize();
+        totalSize += message.stream.getEstimatedSize();
         totalFiles += message.stream.getNumFiles();
     }
 
diff --git a/test/distributed/org/apache/cassandra/io/sstable/format/ForwardingSSTableReader.java b/test/distributed/org/apache/cassandra/io/sstable/format/ForwardingSSTableReader.java
index 7e21253..9a76661 100644
--- a/test/distributed/org/apache/cassandra/io/sstable/format/ForwardingSSTableReader.java
+++ b/test/distributed/org/apache/cassandra/io/sstable/format/ForwardingSSTableReader.java
@@ -19,8 +19,6 @@
 package org.apache.cassandra.io.sstable.format;
 
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Iterator;
@@ -62,31 +60,13 @@ import org.apache.cassandra.utils.concurrent.Ref;
 
 public abstract class ForwardingSSTableReader extends SSTableReader
 {
-    // This method is only accessiable via extension and not for calling directly;
-    // to work around this, rely on reflection if the method gets called
-    private static final Method ESTIMATE_ROWS_FROM_INDEX;
-
-    static
-    {
-        try
-        {
-            Method m = SSTable.class.getDeclaredMethod("estimateRowsFromIndex", RandomAccessReader.class);
-            m.setAccessible(true);
-            ESTIMATE_ROWS_FROM_INDEX = m;
-        }
-        catch (NoSuchMethodException e)
-        {
-            throw new AssertionError(e);
-        }
-    }
-
     private final SSTableReader delegate;
 
     public ForwardingSSTableReader(SSTableReader delegate)
     {
         super(delegate.descriptor, SSTable.componentsFor(delegate.descriptor),
               TableMetadataRef.forOfflineTools(delegate.metadata()), delegate.maxDataAge, delegate.getSSTableMetadata(),
-              delegate.openReason, delegate.header);
+              delegate.openReason, delegate.header, delegate.indexSummary, delegate.dfile, delegate.ifile, delegate.bf);
         this.delegate = delegate;
         this.first = delegate.first;
         this.last = delegate.last;
@@ -117,24 +97,6 @@ public abstract class ForwardingSSTableReader extends SSTableReader
     }
 
     @Override
-    public boolean loadSummary()
-    {
-        return delegate.loadSummary();
-    }
-
-    @Override
-    public void saveSummary()
-    {
-        delegate.saveSummary();
-    }
-
-    @Override
-    public void saveBloomFilter()
-    {
-        delegate.saveBloomFilter();
-    }
-
-    @Override
     public void setReplaced()
     {
         delegate.setReplaced();
@@ -225,12 +187,6 @@ public abstract class ForwardingSSTableReader extends SSTableReader
     }
 
     @Override
-    public void forceFilterFailures()
-    {
-        delegate.forceFilterFailures();
-    }
-
-    @Override
     public IFilter getBloomFilter()
     {
         return delegate.getBloomFilter();
@@ -777,30 +733,6 @@ public abstract class ForwardingSSTableReader extends SSTableReader
     }
 
     @Override
-    protected long estimateRowsFromIndex(RandomAccessReader ifile) throws IOException
-    {
-        try
-        {
-            return (Long) ESTIMATE_ROWS_FROM_INDEX.invoke(delegate, ifile);
-        }
-        catch (IllegalAccessException e)
-        {
-            throw new AssertionError(e);
-        }
-        catch (InvocationTargetException e)
-        {
-            Throwable cause = e.getCause();
-            if (cause instanceof IOException)
-                throw (IOException) cause;
-            if (cause instanceof Error)
-                throw (Error) cause;
-            if (cause instanceof RuntimeException)
-                throw (RuntimeException) cause;
-            throw new RuntimeException(cause);
-        }
-    }
-
-    @Override
     public long bytesOnDisk()
     {
         return delegate.bytesOnDisk();
diff --git a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
index 8ecf6cb..9e84e0a 100644
--- a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
+++ b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
@@ -42,10 +42,10 @@ import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.streaming.CassandraEntireSSTableStreamReader;
 import org.apache.cassandra.db.streaming.CassandraEntireSSTableStreamWriter;
-import org.apache.cassandra.db.streaming.CassandraOutgoingFile;
 import org.apache.cassandra.db.streaming.CassandraStreamHeader;
 import org.apache.cassandra.db.streaming.CassandraStreamReader;
 import org.apache.cassandra.db.streaming.CassandraStreamWriter;
+import org.apache.cassandra.db.streaming.ComponentContext;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
@@ -103,6 +103,7 @@ public class ZeroCopyStreamingBenchmark
         private static ColumnFamilyStore store;
         private StreamSession session;
         private CassandraEntireSSTableStreamWriter blockStreamWriter;
+        private ComponentContext context;
         private ByteBuf serializedBlockStream;
         private InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
         private CassandraEntireSSTableStreamReader blockStreamReader;
@@ -119,7 +120,8 @@ public class ZeroCopyStreamingBenchmark
 
             sstable = store.getLiveSSTables().iterator().next();
             session = setupStreamingSessionForTest();
-            blockStreamWriter = new CassandraEntireSSTableStreamWriter(sstable, session, CassandraOutgoingFile.getComponentManifest(sstable));
+            context = ComponentContext.create(sstable.descriptor);
+            blockStreamWriter = new CassandraEntireSSTableStreamWriter(sstable, session, context);
 
             CapturingNettyChannel blockStreamCaptureChannel = new CapturingNettyChannel(STREAM_SIZE);
             AsyncStreamingOutputPlus out = new AsyncStreamingOutputPlus(blockStreamCaptureChannel);
@@ -137,7 +139,7 @@ public class ZeroCopyStreamingBenchmark
                                      .withEstimatedKeys(sstable.estimatedKeys())
                                      .withSections(Collections.emptyList())
                                      .withSerializationHeader(sstable.header.toComponent())
-                                     .withComponentManifest(CassandraOutgoingFile.getComponentManifest(sstable))
+                                     .withComponentManifest(context.manifest())
                                      .isEntireSSTable(true)
                                      .withFirstKey(sstable.first)
                                      .withTableId(sstable.metadata().id)
@@ -149,23 +151,23 @@ public class ZeroCopyStreamingBenchmark
                                                                                                null), entireSSTableStreamHeader, session);
 
             List<Range<Token>> requestedRanges = Arrays.asList(new Range<>(sstable.first.minValue().getToken(), sstable.last.getToken()));
-            partialStreamWriter = new CassandraStreamWriter(sstable, sstable.getPositionsForRanges(requestedRanges), session);
+            CassandraStreamHeader partialSSTableStreamHeader =
+            CassandraStreamHeader.builder()
+                                 .withSSTableFormat(sstable.descriptor.formatType)
+                                 .withSSTableVersion(sstable.descriptor.version)
+                                 .withSSTableLevel(0)
+                                 .withEstimatedKeys(sstable.estimatedKeys())
+                                 .withSections(sstable.getPositionsForRanges(requestedRanges))
+                                 .withSerializationHeader(sstable.header.toComponent())
+                                 .withTableId(sstable.metadata().id)
+                                 .build();
+
+            partialStreamWriter = new CassandraStreamWriter(sstable, partialSSTableStreamHeader, session);
 
             CapturingNettyChannel partialStreamChannel = new CapturingNettyChannel(STREAM_SIZE);
             partialStreamWriter.write(new AsyncStreamingOutputPlus(partialStreamChannel));
             serializedPartialStream = partialStreamChannel.getSerializedStream();
 
-            CassandraStreamHeader partialSSTableStreamHeader =
-                CassandraStreamHeader.builder()
-                                     .withSSTableFormat(sstable.descriptor.formatType)
-                                     .withSSTableVersion(sstable.descriptor.version)
-                                     .withSSTableLevel(0)
-                                     .withEstimatedKeys(sstable.estimatedKeys())
-                                     .withSections(sstable.getPositionsForRanges(requestedRanges))
-                                     .withSerializationHeader(sstable.header.toComponent())
-                                     .withTableId(sstable.metadata().id)
-                                     .build();
-
             partialStreamReader = new CassandraStreamReader(new StreamMessageHeader(sstable.metadata().id,
                                                                                     peer, session.planId(), false,
                                                                                     0, 0, 0,
@@ -207,6 +209,7 @@ public class ZeroCopyStreamingBenchmark
         @TearDown
         public void tearDown() throws IOException
         {
+            context.close();
             SchemaLoader.cleanupAndLeaveDirs();
             CommitLog.instance.stopUnsafe(true);
         }
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 7880276..4d6e2e8 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -45,6 +45,7 @@ import org.slf4j.LoggerFactory;
 import afu.org.checkerframework.checker.oigj.qual.O;
 import org.apache.cassandra.db.compaction.ActiveCompactionsTracker;
 import org.apache.cassandra.db.compaction.CompactionTasks;
+import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.ReplicaCollection;
@@ -79,6 +80,7 @@ import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.CounterId;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.FilterFactory;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -773,4 +775,25 @@ public class Util
         }
         assertEquals(expectedSSTableCount, fileCount);
     }
+
+    /**
+     * Disable bloom filter on all sstables of given table
+     */
+    public static void disableBloomFilter(ColumnFamilyStore cfs)
+    {
+        Collection<SSTableReader> sstables = cfs.getLiveSSTables();
+        try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+        {
+            for (SSTableReader sstable : sstables)
+            {
+                sstable = sstable.cloneAndReplace(FilterFactory.AlwaysPresent);
+                txn.update(sstable, true);
+                txn.checkpoint();
+            }
+            txn.finish();
+        }
+
+        for (SSTableReader reader : cfs.getLiveSSTables())
+            assertEquals(FilterFactory.AlwaysPresent, reader.getBloomFilter());
+    }
 }
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 7913679..54c2b35 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -129,7 +129,7 @@ public class ColumnFamilyStoreTest
 
         List<SSTableReader> ssTables = keyspace.getAllSSTables(SSTableSet.LIVE);
         assertEquals(1, ssTables.size());
-        ssTables.get(0).forceFilterFailures();
+        Util.disableBloomFilter(cfs);
         Util.assertEmpty(Util.cmd(cfs, "key2").build());
     }
 
diff --git a/test/unit/org/apache/cassandra/db/KeyspaceTest.java b/test/unit/org/apache/cassandra/db/KeyspaceTest.java
index 3e088fb..e7d7814 100644
--- a/test/unit/org/apache/cassandra/db/KeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyspaceTest.java
@@ -142,7 +142,7 @@ public class KeyspaceTest extends CQLTester
 
         Collection<SSTableReader> sstables = cfs.getLiveSSTables();
         assertEquals(1, sstables.size());
-        sstables.iterator().next().forceFilterFailures();
+        Util.disableBloomFilter(cfs);
 
         for (String key : new String[]{"0", "2"})
             Util.assertEmpty(Util.cmd(cfs, key).build());
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
index 2703e44..a41365b 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
@@ -1241,8 +1241,8 @@ public class LogTransactionTest extends AbstractTransactionalTest
         SSTableReader reader = SSTableReader.internalOpen(descriptor,
                                                           components,
                                                           cfs.metadata,
-                                                          dFile,
                                                           iFile,
+                                                          dFile,
                                                           MockSchema.indexSummary.sharedCopy(),
                                                           new AlwaysPresentFilter(),
                                                           1L,
diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
index 00a48d1..58d26c1 100644
--- a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.Queue;
 import java.util.UUID;
 
+import org.apache.cassandra.io.sstable.Descriptor;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -63,7 +64,6 @@ import org.apache.cassandra.utils.FBUtilities;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 public class CassandraEntireSSTableStreamWriterTest
 {
@@ -73,6 +73,7 @@ public class CassandraEntireSSTableStreamWriterTest
     public static final String CF_STANDARDLOWINDEXINTERVAL = "StandardLowIndexInterval";
 
     private static SSTableReader sstable;
+    private static Descriptor descriptor;
     private static ColumnFamilyStore store;
 
     @BeforeClass
@@ -105,6 +106,7 @@ public class CassandraEntireSSTableStreamWriterTest
         CompactionManager.instance.performMaximal(store, false);
 
         sstable = store.getLiveSSTables().iterator().next();
+        descriptor = sstable.descriptor;
     }
 
     @Test
@@ -112,15 +114,18 @@ public class CassandraEntireSSTableStreamWriterTest
     {
         StreamSession session = setupStreamingSessionForTest();
 
-        CassandraEntireSSTableStreamWriter writer = new CassandraEntireSSTableStreamWriter(sstable, session, CassandraOutgoingFile.getComponentManifest(sstable));
-
         EmbeddedChannel channel = new EmbeddedChannel();
-        AsyncStreamingOutputPlus out = new AsyncStreamingOutputPlus(channel);
-        writer.write(out);
+        try (AsyncStreamingOutputPlus out = new AsyncStreamingOutputPlus(channel);
+             ComponentContext context = ComponentContext.create(descriptor))
+        {
+            CassandraEntireSSTableStreamWriter writer = new CassandraEntireSSTableStreamWriter(sstable, session, context);
 
-        Queue msgs = channel.outboundMessages();
+            writer.write(out);
 
-        assertTrue(msgs.peek() instanceof DefaultFileRegion);
+            Queue msgs = channel.outboundMessages();
+
+            assertTrue(msgs.peek() instanceof DefaultFileRegion);
+        }
     }
 
     @Test
@@ -129,18 +134,19 @@ public class CassandraEntireSSTableStreamWriterTest
         StreamSession session = setupStreamingSessionForTest();
         InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
 
-        CassandraEntireSSTableStreamWriter writer = new CassandraEntireSSTableStreamWriter(sstable, session, CassandraOutgoingFile.getComponentManifest(sstable));
 
         // This is needed as Netty releases the ByteBuffers as soon as the channel is flushed
         ByteBuf serializedFile = Unpooled.buffer(8192);
         EmbeddedChannel channel = createMockNettyChannel(serializedFile);
-        AsyncStreamingOutputPlus out = new AsyncStreamingOutputPlus(channel);
-
-        writer.write(out);
+        try (AsyncStreamingOutputPlus out = new AsyncStreamingOutputPlus(channel);
+             ComponentContext context = ComponentContext.create(descriptor))
+        {
+            CassandraEntireSSTableStreamWriter writer = new CassandraEntireSSTableStreamWriter(sstable, session, context);
+            writer.write(out);
 
-        session.prepareReceiving(new StreamSummary(sstable.metadata().id, 1, 5104));
+            session.prepareReceiving(new StreamSummary(sstable.metadata().id, 1, 5104));
 
-        CassandraStreamHeader header =
+            CassandraStreamHeader header =
             CassandraStreamHeader.builder()
                                  .withSSTableFormat(sstable.descriptor.formatType)
                                  .withSSTableVersion(sstable.descriptor.version)
@@ -148,18 +154,19 @@ public class CassandraEntireSSTableStreamWriterTest
                                  .withEstimatedKeys(sstable.estimatedKeys())
                                  .withSections(Collections.emptyList())
                                  .withSerializationHeader(sstable.header.toComponent())
-                                 .withComponentManifest(CassandraOutgoingFile.getComponentManifest(sstable))
+                                 .withComponentManifest(context.manifest())
                                  .isEntireSSTable(true)
                                  .withFirstKey(sstable.first)
                                  .withTableId(sstable.metadata().id)
                                  .build();
 
-        CassandraEntireSSTableStreamReader reader = new CassandraEntireSSTableStreamReader(new StreamMessageHeader(sstable.metadata().id, peer, session.planId(), false, 0, 0, 0, null), header, session);
+            CassandraEntireSSTableStreamReader reader = new CassandraEntireSSTableStreamReader(new StreamMessageHeader(sstable.metadata().id, peer, session.planId(), false, 0, 0, 0, null), header, session);
 
-        SSTableMultiWriter sstableWriter = reader.read(new DataInputBuffer(serializedFile.nioBuffer(), false));
-        Collection<SSTableReader> newSstables = sstableWriter.finished();
+            SSTableMultiWriter sstableWriter = reader.read(new DataInputBuffer(serializedFile.nioBuffer(), false));
+            Collection<SSTableReader> newSstables = sstableWriter.finished();
 
-        assertEquals(1, newSstables.size());
+            assertEquals(1, newSstables.size());
+        }
     }
 
     private EmbeddedChannel createMockNettyChannel(ByteBuf serializedFile) throws Exception
diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java
index e48abf6..999a44e 100644
--- a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java
@@ -20,23 +20,144 @@ package org.apache.cassandra.db.streaming;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.LinkedHashMap;
+import java.util.List;
 
+import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
 import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.streaming.CassandraStreamHeader.CassandraStreamHeaderSerializer;
 import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.big.BigFormat;
 import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.serializers.SerializationUtils;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
 public class CassandraStreamHeaderTest
 {
+    public static final String KEYSPACE = "CassandraStreamHeaderTest";
+    public static final String CF_COMPRESSED = "compressed";
+
+    private static SSTableReader sstable;
+    private static ColumnFamilyStore store;
+
+    @BeforeClass
+    public static void defineSchemaAndPrepareSSTable()
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE, CF_COMPRESSED).compression(CompressionParams.DEFAULT));
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        store = keyspace.getColumnFamilyStore(CF_COMPRESSED);
+
+        // insert data and compact to a single sstable
+        CompactionManager.instance.disableAutoCompaction();
+        for (int j = 0; j < 10; j++)
+        {
+            new RowUpdateBuilder(store.metadata(), j, String.valueOf(j))
+            .clustering("0")
+            .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
+            .build()
+            .applyUnsafe();
+        }
+        store.forceBlockingFlush();
+        CompactionManager.instance.performMaximal(store, false);
+
+        sstable = store.getLiveSSTables().iterator().next();
+    }
+
+    @Test
+    public void transferedSizeWithCompressionTest()
+    {
+        // compression info is lazily initialized to reduce GC, compute size based on compressionMetadata
+        CassandraStreamHeader header = header(false, true);
+        long transferedSize = header.size();
+        assertEquals(transferedSize, header.calculateSize());
+
+        // computing file chunks before sending over network, and verify size is the same
+        header.compressionInfo.chunks();
+        assertEquals(transferedSize, header.calculateSize());
+
+        SerializationUtils.assertSerializationCycle(header, CassandraStreamHeader.serializer);
+    }
+
+    @Test
+    public void transferedSizeWithZeroCopyStreamingTest()
+    {
+        // verify all component on-disk length is used for ZCS
+        CassandraStreamHeader header = header(true, true);
+        long transferedSize = header.size();
+        assertEquals(ComponentManifest.create(sstable.descriptor).totalSize(), transferedSize);
+        assertEquals(transferedSize, header.calculateSize());
+
+        // verify that computing file chunks doesn't change transferred size for ZCS
+        header.compressionInfo.chunks();
+        assertEquals(transferedSize, header.calculateSize());
+
+        SerializationUtils.assertSerializationCycle(header, CassandraStreamHeader.serializer);
+    }
+
+    @Test
+    public void transferedSizeWithoutCompressionTest()
+    {
+        // verify section size is used as transferred size
+        CassandraStreamHeader header = header(false, false);
+        long transferedSize = header.size();
+        assertNull(header.compressionInfo);
+        assertEquals(sstable.uncompressedLength(), transferedSize);
+        assertEquals(transferedSize, header.calculateSize());
+
+        SerializationUtils.assertSerializationCycle(header, CassandraStreamHeader.serializer);
+    }
+
+    private CassandraStreamHeader header(boolean entireSSTable, boolean compressed)
+    {
+        List<Range<Token>> requestedRanges = Collections.singletonList(new Range<>(store.getPartitioner().getMinimumToken(), sstable.last.getToken()));
+        requestedRanges = Range.normalize(requestedRanges);
+
+        List<SSTableReader.PartitionPositionBounds> sections = sstable.getPositionsForRanges(requestedRanges);
+        CompressionInfo compressionInfo = compressed ? CompressionInfo.newLazyInstance(sstable.getCompressionMetadata(), sections)
+                                                     : null;
+
+        TableMetadata metadata = store.metadata();
+        SerializationHeader.Component serializationHeader = SerializationHeader.makeWithoutStats(metadata).toComponent();
+        ComponentManifest componentManifest = entireSSTable ? ComponentManifest.create(sstable.descriptor) : null;
+        DecoratedKey firstKey = entireSSTable ? sstable.first : null;
+        return CassandraStreamHeader.builder()
+                                    .withSSTableFormat(SSTableFormat.Type.BIG)
+                                    .withSSTableVersion(BigFormat.latestVersion)
+                                    .withSSTableLevel(0)
+                                    .withEstimatedKeys(10)
+                                    .withCompressionInfo(compressionInfo)
+                                    .withSections(sections)
+                                    .isEntireSSTable(entireSSTable)
+                                    .withComponentManifest(componentManifest)
+                                    .withFirstKey(firstKey)
+                                    .withSerializationHeader(serializationHeader)
+                                    .withTableId(metadata.id)
+                                    .build();
+    }
+
     @Test
     public void serializerTest()
     {
diff --git a/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java b/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java
new file mode 100644
index 0000000..3cc8943
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.IndexSummaryManager;
+import org.apache.cassandra.io.sstable.IndexSummaryRedistribution;
+import org.apache.cassandra.io.sstable.SSTableUtils;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.net.AsyncStreamingOutputPlus;
+import org.apache.cassandra.net.BufferPoolAllocator;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.SharedDefaultFileRegion;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.MigrationManager;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.DefaultConnectionFactory;
+import org.apache.cassandra.streaming.OutgoingStream;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.SessionInfo;
+import org.apache.cassandra.streaming.StreamCoordinator;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.StreamResultFuture;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.StreamSummary;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Throwables;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+
+import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(BMUnitRunner.class)
+public class EntireSSTableStreamConcurrentComponentMutationTest
+{
+    public static final String KEYSPACE = "CassandraEntireSSTableStreamLockTest";
+    public static final String CF_STANDARD = "Standard1";
+
+    private static final Callable<?> NO_OP = () -> null;
+
+    private static SSTableReader sstable;
+    private static Descriptor descriptor;
+    private static ColumnFamilyStore store;
+    private static RangesAtEndpoint rangesAtEndpoint;
+
+    private static ExecutorService service;
+
+    private static CountDownLatch latch = new CountDownLatch(1);
+
+    @BeforeClass
+    public static void defineSchemaAndPrepareSSTable()
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE, CF_STANDARD));
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        store = keyspace.getColumnFamilyStore("Standard1");
+
+        // insert data and compact to a single sstable
+        CompactionManager.instance.disableAutoCompaction();
+        for (int j = 0; j < 10; j++)
+        {
+            new RowUpdateBuilder(store.metadata(), j, String.valueOf(j))
+            .clustering("0")
+            .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
+            .build()
+            .applyUnsafe();
+        }
+        store.forceBlockingFlush();
+        CompactionManager.instance.performMaximal(store, false);
+
+        Token start = ByteOrderedPartitioner.instance.getTokenFactory().fromString(Long.toHexString(0));
+        Token end = ByteOrderedPartitioner.instance.getTokenFactory().fromString(Long.toHexString(100));
+        rangesAtEndpoint = RangesAtEndpoint.toDummyList(Collections.singleton(new Range<>(start, end)));
+
+        service = Executors.newFixedThreadPool(2);
+    }
+
+    @AfterClass
+    public static void cleanup()
+    {
+        service.shutdown();
+    }
+
+    @Before
+    public void init()
+    {
+        sstable = store.getLiveSSTables().iterator().next();
+        descriptor = sstable.descriptor;
+    }
+
+    @After
+    public void reset() throws IOException
+    {
+        latch = new CountDownLatch(1);
+        // reset repair info to avoid test interfering each other
+        descriptor.getMetadataSerializer().mutateRepairMetadata(descriptor, 0, ActiveRepairService.NO_PENDING_REPAIR, false);
+    }
+
+    @Test
+    public void testStream() throws Exception
+    {
+        testStreamWithConcurrentComponentMutation(NO_OP, NO_OP);
+    }
+
+    /**
+     * Entire-sstable-streaming receiver will throw checksum validation failure because concurrent stats metadata
+     * update causes the actual transfered file size to be different from the one in {@link ComponentManifest}
+     */
+    @Test
+    public void testStreamWithStatsMutation() throws Exception
+    {
+        testStreamWithConcurrentComponentMutation(() -> {
+
+            Descriptor desc = sstable.descriptor;
+            desc.getMetadataSerializer().mutate(desc, "testing", stats -> stats.mutateRepairedMetadata(0, UUID.randomUUID(), false));
+
+            return null;
+        }, NO_OP);
+    }
+
+    @Test
+    @BMRule(name = "Delay saving index summary, manifest may link partially written file if there is no lock",
+            targetClass = "SSTableReader",
+            targetMethod = "saveSummary(Descriptor, DecoratedKey, DecoratedKey, IndexSummary)",
+            targetLocation = "AFTER INVOKE serialize",
+            condition = "$descriptor.cfname.contains(\"Standard1\")",
+            action = "org.apache.cassandra.db.streaming.EntireSSTableStreamConcurrentComponentMutationTest.countDown();Thread.sleep(5000);")
+    public void testStreamWithIndexSummaryRedistributionDelaySavingSummary() throws Exception
+    {
+        testStreamWithConcurrentComponentMutation(() -> {
+            // wait until new index summary is partially written
+            latch.await(1, TimeUnit.MINUTES);
+            return null;
+        }, this::indexSummaryRedistribution);
+    }
+
+    // used by byteman
+    private static void countDown()
+    {
+        latch.countDown();
+    }
+
+    private void testStreamWithConcurrentComponentMutation(Callable<?> runBeforeStreaming, Callable<?> runConcurrentWithStreaming) throws Exception
+    {
+        ByteBuf serializedFile = Unpooled.buffer(8192);
+        InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
+        StreamSession session = setupStreamingSessionForTest();
+        Collection<OutgoingStream> outgoingStreams = store.getStreamManager().createOutgoingStreams(session, rangesAtEndpoint, NO_PENDING_REPAIR, PreviewKind.NONE);
+        CassandraOutgoingFile outgoingFile = (CassandraOutgoingFile) Iterables.getOnlyElement(outgoingStreams);
+
+        Future<?> streaming = executeAsync(() -> {
+            runBeforeStreaming.call();
+
+            try (AsyncStreamingOutputPlus out = new AsyncStreamingOutputPlus(createMockNettyChannel(serializedFile)))
+            {
+                outgoingFile.write(session, out, MessagingService.current_version);
+                assertTrue(sstable.descriptor.getTemporaryFiles().isEmpty());
+            }
+            return null;
+        });
+
+        Future<?> concurrentMutations = executeAsync(runConcurrentWithStreaming);
+
+        streaming.get(3, TimeUnit.MINUTES);
+        concurrentMutations.get(3, TimeUnit.MINUTES);
+
+        session.prepareReceiving(new StreamSummary(sstable.metadata().id, 1, 5104));
+        StreamMessageHeader messageHeader = new StreamMessageHeader(sstable.metadata().id, peer, session.planId(), false, 0, 0, 0, null);
+
+        try (DataInputBuffer in = new DataInputBuffer(serializedFile.nioBuffer(), false))
+        {
+            CassandraStreamHeader header = CassandraStreamHeader.serializer.deserialize(in, MessagingService.current_version);
+            CassandraEntireSSTableStreamReader reader = new CassandraEntireSSTableStreamReader(messageHeader, header, session);
+            SSTableReader streamedSSTable = Iterables.getOnlyElement(reader.read(in).finished());
+
+            SSTableUtils.assertContentEquals(sstable, streamedSSTable);
+        }
+    }
+
+    private boolean indexSummaryRedistribution() throws IOException
+    {
+        long nonRedistributingOffHeapSize = 0;
+        long memoryPoolBytes = 1024 * 1024;
+
+        // rewrite index summary file with new min/max index interval
+        TableMetadata origin = store.metadata();
+        MigrationManager.announceTableUpdate(origin.unbuild().minIndexInterval(1).maxIndexInterval(2).build(), true);
+
+        try (LifecycleTransaction txn = store.getTracker().tryModify(sstable, OperationType.INDEX_SUMMARY))
+        {
+            IndexSummaryManager.redistributeSummaries(new IndexSummaryRedistribution(ImmutableMap.of(store.metadata().id, txn),
+                                                                                     nonRedistributingOffHeapSize,
+                                                                                     memoryPoolBytes));
+        }
+
+        // reset min/max index interval
+        MigrationManager.announceTableUpdate(origin, true);
+        return true;
+    }
+
+    private Future<?> executeAsync(Callable<?> task)
+    {
+        return service.submit(() -> {
+            try
+            {
+                task.call();
+            }
+            catch (Exception e)
+            {
+                throw Throwables.unchecked(e);
+            }
+        });
+    }
+
+    private EmbeddedChannel createMockNettyChannel(ByteBuf serializedFile)
+    {
+        WritableByteChannel wbc = new WritableByteChannel()
+        {
+            private boolean isOpen = true;
+            public int write(ByteBuffer src)
+            {
+                int size = src.limit();
+                serializedFile.writeBytes(src);
+                return size;
+            }
+
+            public boolean isOpen()
+            {
+                return isOpen;
+            }
+
+            public void close()
+            {
+                isOpen = false;
+            }
+        };
+
+        return new EmbeddedChannel(new ChannelOutboundHandlerAdapter() {
+                @Override
+                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception
+                {
+                    if (msg instanceof BufferPoolAllocator.Wrapped)
+                    {
+
+                        ByteBuffer buf = ((BufferPoolAllocator.Wrapped) msg).adopt();
+                        wbc.write(buf);
+                    }
+                    else
+                    {
+                        ((SharedDefaultFileRegion) msg).transferTo(wbc, 0);
+                    }
+                    super.write(ctx, msg, promise);
+                }
+            });
+    }
+
+    private StreamSession setupStreamingSessionForTest()
+    {
+        StreamCoordinator streamCoordinator = new StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(), false, false, null, PreviewKind.NONE);
+        StreamResultFuture future = StreamResultFuture.createInitiator(UUID.randomUUID(), StreamOperation.BOOTSTRAP, Collections.emptyList(), streamCoordinator);
+
+        InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
+        streamCoordinator.addSessionInfo(new SessionInfo(peer, 0, peer, Collections.emptyList(), Collections.emptyList(), StreamSession.State.INITIALIZED));
+
+        StreamSession session = streamCoordinator.getOrCreateNextSession(peer);
+        session.init(future);
+        return session;
+    }
+}
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index 36221ed..bfb3e03 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -18,12 +18,10 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.attribute.FileAttribute;
 import java.util.*;
 import java.util.concurrent.*;
 
@@ -480,7 +478,7 @@ public class SSTableReaderTest
             SSTableReader sstable = indexCfs.getLiveSSTables().iterator().next();
             assert sstable.first.getToken() instanceof LocalToken;
 
-            sstable.saveSummary();
+            SSTableReader.saveSummary(sstable.descriptor, sstable.first, sstable.last, sstable.indexSummary);
             SSTableReader reopened = SSTableReader.open(sstable.descriptor);
             assert reopened.first.getToken() instanceof LocalToken;
             reopened.selfRef().release();
diff --git a/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java b/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
index 262a200..850e05c 100644
--- a/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
+++ b/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
@@ -41,9 +41,11 @@ import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.RowUpdateBuilder;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.streaming.CassandraOutgoingFile;
+import org.apache.cassandra.db.streaming.ComponentManifest;
 import org.apache.cassandra.dht.ByteOrderedPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -60,6 +62,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class EntireSSTableStreamingCorrectFilesCountTest
@@ -120,18 +123,23 @@ public class EntireSSTableStreamingCorrectFilesCountTest
                                                                                                     PreviewKind.NONE);
 
         session.addTransferStreams(outgoingStreams);
-        DataOutputStreamPlus out = constructDataOutputStream();
+        AsyncStreamingOutputPlus out = constructDataOutputStream();
 
         for (OutgoingStream outgoingStream : outgoingStreams)
+        {
             outgoingStream.write(session, out, MessagingService.VERSION_40);
+            // verify hardlinks are removed after streaming
+            Descriptor descriptor = ((CassandraOutgoingFile) outgoingStream).getRef().get().descriptor;
+            assertTrue(descriptor.getTemporaryFiles().isEmpty());
+        }
 
         int totalNumberOfFiles = session.transfers.get(store.metadata.id).getTotalNumberOfFiles();
 
-        assertEquals(CassandraOutgoingFile.getComponentManifest(sstable).components().size(), totalNumberOfFiles);
+        assertEquals(ComponentManifest.create(sstable.descriptor).components().size(), totalNumberOfFiles);
         assertEquals(streamEventHandler.fileNames.size(), totalNumberOfFiles);
     }
 
-    private DataOutputStreamPlus constructDataOutputStream()
+    private AsyncStreamingOutputPlus constructDataOutputStream()
     {
         // This is needed as Netty releases the ByteBuffers as soon as the channel is flushed
         ByteBuf serializedFile = Unpooled.buffer(8192);
diff --git a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
index be443b5..11f6b55 100644
--- a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
@@ -172,7 +172,7 @@ public class CompressedInputStreamTest
         }
 
         // read buffer using CompressedInputStream
-        CompressionInfo info = new CompressionInfo(chunks, param);
+        CompressionInfo info = CompressionInfo.newInstance(chunks, param);
 
         if (testException)
         {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org