You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dj...@apache.org on 2020/04/18 18:05:45 UTC

[cassandra] branch trunk updated: Optimize Zero Copy Streaming file containment check by using file sections

This is an automated email from the ASF dual-hosted git repository.

djoshi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b05fe7a  Optimize Zero Copy Streaming file containment check by using file sections
b05fe7a is described below

commit b05fe7ab010218f1fb23b3192e2aea719a9611de
Author: Zhao Yang <zh...@gmail.com>
AuthorDate: Mon Mar 23 22:05:53 2020 +0800

    Optimize Zero Copy Streaming file containment check by using file sections
    
    Patch By Zhao Yang; Reviewed by T Jake Luciani and Dinesh Joshi for CASSANDRA-15657
---
 .../db/streaming/CassandraOutgoingFile.java        | 56 ++++------------------
 .../db/streaming/CassandraOutgoingFileTest.java    | 15 +++---
 2 files changed, 19 insertions(+), 52 deletions(-)

diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
index e8f5485..237c0af 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
@@ -30,14 +30,9 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
-import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.KeyIterator;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.net.AsyncStreamingOutputPlus;
@@ -47,8 +42,6 @@ import org.apache.cassandra.streaming.StreamOperation;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.utils.concurrent.Ref;
 
-import static org.apache.cassandra.db.compaction.Verifier.RangeOwnHelper;
-
 /**
  * used to transfer the part(or whole) of a SSTable data file
  */
@@ -65,9 +58,8 @@ public class CassandraOutgoingFile implements OutgoingStream
     private final CassandraStreamHeader header;
     private final boolean keepSSTableLevel;
     private final ComponentManifest manifest;
-    private Boolean isFullyContained;
 
-    private final List<Range<Token>> normalizedRanges;
+    private final boolean shouldStreamEntireSStable;
 
     public CassandraOutgoingFile(StreamOperation operation, Ref<SSTableReader> ref,
                                  List<SSTableReader.PartitionPositionBounds> sections, List<Range<Token>> normalizedRanges,
@@ -78,9 +70,9 @@ public class CassandraOutgoingFile implements OutgoingStream
         this.ref = ref;
         this.estimatedKeys = estimatedKeys;
         this.sections = sections;
-        this.normalizedRanges = ImmutableList.copyOf(normalizedRanges);
         this.filename = ref.get().getFilename();
         this.manifest = getComponentManifest(ref.get());
+        this.shouldStreamEntireSStable = shouldStreamEntireSSTable();
 
         SSTableReader sstable = ref.get();
         keepSSTableLevel = operation == StreamOperation.BOOTSTRAP || operation == StreamOperation.REBUILD;
@@ -93,7 +85,7 @@ public class CassandraOutgoingFile implements OutgoingStream
                                  .withSections(sections)
                                  .withCompressionMetadata(sstable.compression ? sstable.getCompressionMetadata() : null)
                                  .withSerializationHeader(sstable.header.toComponent())
-                                 .isEntireSSTable(shouldStreamEntireSSTable())
+                                 .isEntireSSTable(shouldStreamEntireSStable)
                                  .withComponentManifest(manifest)
                                  .withFirstKey(sstable.first)
                                  .withTableId(sstable.metadata().id)
@@ -163,7 +155,7 @@ public class CassandraOutgoingFile implements OutgoingStream
         CassandraStreamHeader.serializer.serialize(header, out, version);
         out.flush();
 
-        if (shouldStreamEntireSSTable() && out instanceof AsyncStreamingOutputPlus)
+        if (shouldStreamEntireSStable && out instanceof AsyncStreamingOutputPlus)
         {
             CassandraEntireSSTableStreamWriter writer = new CassandraEntireSSTableStreamWriter(sstable, session, manifest);
             writer.write((AsyncStreamingOutputPlus) out);
@@ -185,46 +177,18 @@ public class CassandraOutgoingFile implements OutgoingStream
         if (!DatabaseDescriptor.streamEntireSSTables() || ref.get().getSSTableMetadata().hasLegacyCounterShards)
             return false;
 
-        ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(getTableId());
-
-        if (cfs == null)
-            return false;
-
-        AbstractCompactionStrategy compactionStrategy = cfs.getCompactionStrategyManager()
-                                                           .getCompactionStrategyFor(ref.get());
-
-        if (compactionStrategy instanceof LeveledCompactionStrategy)
-            return contained(normalizedRanges, ref.get());
-
-        return false;
+        return contained(sections, ref.get());
     }
 
     @VisibleForTesting
-    public boolean contained(List<Range<Token>> normalizedRanges, SSTableReader sstable)
+    public boolean contained(List<SSTableReader.PartitionPositionBounds> sections, SSTableReader sstable)
     {
-        if (isFullyContained != null)
-            return isFullyContained;
-
-        isFullyContained = computeContainment(normalizedRanges, sstable);
-        return isFullyContained;
-    }
-
-    private boolean computeContainment(List<Range<Token>> normalizedRanges, SSTableReader sstable)
-    {
-        if (normalizedRanges == null)
+        if (sections == null || sections.isEmpty())
             return false;
 
-        RangeOwnHelper rangeOwnHelper = new RangeOwnHelper(normalizedRanges);
-        try (KeyIterator iter = new KeyIterator(sstable.descriptor, sstable.metadata()))
-        {
-            while (iter.hasNext())
-            {
-                DecoratedKey key = iter.next();
-                if (!rangeOwnHelper.check(key))
-                    return false;
-            }
-        }
-        return true;
+        // if transfer sections contain entire sstable
+        long transferLength = sections.stream().mapToLong(p -> p.upperPosition - p.lowerPosition).sum();
+        return transferLength == sstable.uncompressedLength();
     }
 
     @Override
diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java
index 5e44346..9d663b5 100644
--- a/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java
@@ -89,11 +89,12 @@ public class CassandraOutgoingFileTest
     {
         List<Range<Token>> requestedRanges = Arrays.asList(new Range<>(store.getPartitioner().getMinimumToken(), sstable.last.getToken()));
 
+        List<SSTableReader.PartitionPositionBounds> sections = sstable.getPositionsForRanges(requestedRanges);
         CassandraOutgoingFile cof = new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.ref(),
-                                                              sstable.getPositionsForRanges(requestedRanges),
+                                                              sections,
                                                               requestedRanges, sstable.estimatedKeys());
 
-        assertTrue(cof.contained(requestedRanges, sstable));
+        assertTrue(cof.contained(sections, sstable));
     }
 
     @Test
@@ -101,11 +102,12 @@ public class CassandraOutgoingFileTest
     {
         List<Range<Token>> requestedRanges = Arrays.asList(new Range<>(store.getPartitioner().getMinimumToken(), getTokenAtIndex(2)));
 
+        List<SSTableReader.PartitionPositionBounds> sections = sstable.getPositionsForRanges(requestedRanges);
         CassandraOutgoingFile cof = new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.ref(),
-                                                              sstable.getPositionsForRanges(requestedRanges),
+                                                              sections,
                                                               requestedRanges, sstable.estimatedKeys());
 
-        assertFalse(cof.contained(requestedRanges, sstable));
+        assertFalse(cof.contained(sections, sstable));
     }
 
     @Test
@@ -116,11 +118,12 @@ public class CassandraOutgoingFileTest
                                                          new Range<>(getTokenAtIndex(5), sstable.last.getToken()));
         requestedRanges = Range.normalize(requestedRanges);
 
+        List<SSTableReader.PartitionPositionBounds> sections = sstable.getPositionsForRanges(requestedRanges);
         CassandraOutgoingFile cof = new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.ref(),
-                                                              sstable.getPositionsForRanges(requestedRanges),
+                                                              sections,
                                                               requestedRanges, sstable.estimatedKeys());
 
-        assertTrue(cof.contained(requestedRanges, sstable));
+        assertTrue(cof.contained(sections, sstable));
     }
 
     private DecoratedKey getKeyAtIndex(int i)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org