You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dj...@apache.org on 2020/04/18 18:05:45 UTC
[cassandra] branch trunk updated: Optimize Zero Copy Streaming file
containment check by using file sections
This is an automated email from the ASF dual-hosted git repository.
djoshi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new b05fe7a Optimize Zero Copy Streaming file containment check by using file sections
b05fe7a is described below
commit b05fe7ab010218f1fb23b3192e2aea719a9611de
Author: Zhao Yang <zh...@gmail.com>
AuthorDate: Mon Mar 23 22:05:53 2020 +0800
Optimize Zero Copy Streaming file containment check by using file sections
Patch By Zhao Yang; Reviewed by T Jake Luciani and Dinesh Joshi for CASSANDRA-15657
---
.../db/streaming/CassandraOutgoingFile.java | 56 ++++------------------
.../db/streaming/CassandraOutgoingFileTest.java | 15 +++---
2 files changed, 19 insertions(+), 52 deletions(-)
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
index e8f5485..237c0af 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
@@ -30,14 +30,9 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
-import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.KeyIterator;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.net.AsyncStreamingOutputPlus;
@@ -47,8 +42,6 @@ import org.apache.cassandra.streaming.StreamOperation;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.utils.concurrent.Ref;
-import static org.apache.cassandra.db.compaction.Verifier.RangeOwnHelper;
-
/**
* used to transfer the part(or whole) of a SSTable data file
*/
@@ -65,9 +58,8 @@ public class CassandraOutgoingFile implements OutgoingStream
private final CassandraStreamHeader header;
private final boolean keepSSTableLevel;
private final ComponentManifest manifest;
- private Boolean isFullyContained;
- private final List<Range<Token>> normalizedRanges;
+ private final boolean shouldStreamEntireSStable;
public CassandraOutgoingFile(StreamOperation operation, Ref<SSTableReader> ref,
List<SSTableReader.PartitionPositionBounds> sections, List<Range<Token>> normalizedRanges,
@@ -78,9 +70,9 @@ public class CassandraOutgoingFile implements OutgoingStream
this.ref = ref;
this.estimatedKeys = estimatedKeys;
this.sections = sections;
- this.normalizedRanges = ImmutableList.copyOf(normalizedRanges);
this.filename = ref.get().getFilename();
this.manifest = getComponentManifest(ref.get());
+ this.shouldStreamEntireSStable = shouldStreamEntireSSTable();
SSTableReader sstable = ref.get();
keepSSTableLevel = operation == StreamOperation.BOOTSTRAP || operation == StreamOperation.REBUILD;
@@ -93,7 +85,7 @@ public class CassandraOutgoingFile implements OutgoingStream
.withSections(sections)
.withCompressionMetadata(sstable.compression ? sstable.getCompressionMetadata() : null)
.withSerializationHeader(sstable.header.toComponent())
- .isEntireSSTable(shouldStreamEntireSSTable())
+ .isEntireSSTable(shouldStreamEntireSStable)
.withComponentManifest(manifest)
.withFirstKey(sstable.first)
.withTableId(sstable.metadata().id)
@@ -163,7 +155,7 @@ public class CassandraOutgoingFile implements OutgoingStream
CassandraStreamHeader.serializer.serialize(header, out, version);
out.flush();
- if (shouldStreamEntireSSTable() && out instanceof AsyncStreamingOutputPlus)
+ if (shouldStreamEntireSStable && out instanceof AsyncStreamingOutputPlus)
{
CassandraEntireSSTableStreamWriter writer = new CassandraEntireSSTableStreamWriter(sstable, session, manifest);
writer.write((AsyncStreamingOutputPlus) out);
@@ -185,46 +177,18 @@ public class CassandraOutgoingFile implements OutgoingStream
if (!DatabaseDescriptor.streamEntireSSTables() || ref.get().getSSTableMetadata().hasLegacyCounterShards)
return false;
- ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(getTableId());
-
- if (cfs == null)
- return false;
-
- AbstractCompactionStrategy compactionStrategy = cfs.getCompactionStrategyManager()
- .getCompactionStrategyFor(ref.get());
-
- if (compactionStrategy instanceof LeveledCompactionStrategy)
- return contained(normalizedRanges, ref.get());
-
- return false;
+ return contained(sections, ref.get());
}
@VisibleForTesting
- public boolean contained(List<Range<Token>> normalizedRanges, SSTableReader sstable)
+ public boolean contained(List<SSTableReader.PartitionPositionBounds> sections, SSTableReader sstable)
{
- if (isFullyContained != null)
- return isFullyContained;
-
- isFullyContained = computeContainment(normalizedRanges, sstable);
- return isFullyContained;
- }
-
- private boolean computeContainment(List<Range<Token>> normalizedRanges, SSTableReader sstable)
- {
- if (normalizedRanges == null)
+ if (sections == null || sections.isEmpty())
return false;
- RangeOwnHelper rangeOwnHelper = new RangeOwnHelper(normalizedRanges);
- try (KeyIterator iter = new KeyIterator(sstable.descriptor, sstable.metadata()))
- {
- while (iter.hasNext())
- {
- DecoratedKey key = iter.next();
- if (!rangeOwnHelper.check(key))
- return false;
- }
- }
- return true;
+ // if transfer sections contain entire sstable
+ long transferLength = sections.stream().mapToLong(p -> p.upperPosition - p.lowerPosition).sum();
+ return transferLength == sstable.uncompressedLength();
}
@Override
diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java
index 5e44346..9d663b5 100644
--- a/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java
@@ -89,11 +89,12 @@ public class CassandraOutgoingFileTest
{
List<Range<Token>> requestedRanges = Arrays.asList(new Range<>(store.getPartitioner().getMinimumToken(), sstable.last.getToken()));
+ List<SSTableReader.PartitionPositionBounds> sections = sstable.getPositionsForRanges(requestedRanges);
CassandraOutgoingFile cof = new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.ref(),
- sstable.getPositionsForRanges(requestedRanges),
+ sections,
requestedRanges, sstable.estimatedKeys());
- assertTrue(cof.contained(requestedRanges, sstable));
+ assertTrue(cof.contained(sections, sstable));
}
@Test
@@ -101,11 +102,12 @@ public class CassandraOutgoingFileTest
{
List<Range<Token>> requestedRanges = Arrays.asList(new Range<>(store.getPartitioner().getMinimumToken(), getTokenAtIndex(2)));
+ List<SSTableReader.PartitionPositionBounds> sections = sstable.getPositionsForRanges(requestedRanges);
CassandraOutgoingFile cof = new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.ref(),
- sstable.getPositionsForRanges(requestedRanges),
+ sections,
requestedRanges, sstable.estimatedKeys());
- assertFalse(cof.contained(requestedRanges, sstable));
+ assertFalse(cof.contained(sections, sstable));
}
@Test
@@ -116,11 +118,12 @@ public class CassandraOutgoingFileTest
new Range<>(getTokenAtIndex(5), sstable.last.getToken()));
requestedRanges = Range.normalize(requestedRanges);
+ List<SSTableReader.PartitionPositionBounds> sections = sstable.getPositionsForRanges(requestedRanges);
CassandraOutgoingFile cof = new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.ref(),
- sstable.getPositionsForRanges(requestedRanges),
+ sections,
requestedRanges, sstable.estimatedKeys());
- assertTrue(cof.contained(requestedRanges, sstable));
+ assertTrue(cof.contained(sections, sstable));
}
private DecoratedKey getKeyAtIndex(int i)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org