You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2020/09/10 02:59:37 UTC
[cassandra] branch trunk updated: Mutating sstable component may
race with entire-sstable-streaming(ZCS) causing checksum validation failure
This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new f3bf077 Mutating sstable component may race with entire-sstable-streaming(ZCS) causing checksum validation failure
f3bf077 is described below
commit f3bf0775a5a8bce228289c22b96d0c922cf2cb0d
Author: Zhao Yang <zh...@gmail.com>
AuthorDate: Wed Sep 9 16:36:57 2020 -0700
Mutating sstable component may race with entire-sstable-streaming(ZCS) causing checksum validation failure
patch by Zhao Yang; reviewed by Blake Eggleston, Caleb Rackliffe, David Capwell, Benjamin Lerer for CASSANDRA-15861
---
CHANGES.txt | 1 +
.../db/compaction/CompactionStrategyManager.java | 3 +-
.../cassandra/db/compaction/LeveledManifest.java | 6 +-
.../db/compaction/SingleSSTableLCSTask.java | 3 +-
.../apache/cassandra/db/compaction/Verifier.java | 3 +-
.../streaming/CassandraCompressedStreamReader.java | 6 +-
.../streaming/CassandraCompressedStreamWriter.java | 17 +-
.../CassandraEntireSSTableStreamReader.java | 10 +-
.../CassandraEntireSSTableStreamWriter.java | 16 +-
.../db/streaming/CassandraOutgoingFile.java | 122 ++---
.../db/streaming/CassandraStreamHeader.java | 78 +--
.../db/streaming/CassandraStreamWriter.java | 11 +-
.../cassandra/db/streaming/ComponentContext.java | 105 ++++
.../cassandra/db/streaming/ComponentManifest.java | 39 +-
.../db/streaming/CompressedInputStream.java | 6 +-
.../cassandra/db/streaming/CompressionInfo.java | 141 +++++-
.../apache/cassandra/io/sstable/Descriptor.java | 11 +
.../cassandra/io/sstable/IndexSummaryManager.java | 4 +-
.../org/apache/cassandra/io/sstable/SSTable.java | 2 +-
.../cassandra/io/sstable/format/SSTableReader.java | 536 ++++++---------------
.../io/sstable/format/SSTableReaderBuilder.java | 475 ++++++++++++++++++
.../cassandra/io/sstable/format/big/BigFormat.java | 9 +-
.../io/sstable/format/big/BigTableReader.java | 7 +-
.../io/sstable/format/big/BigTableWriter.java | 9 +-
.../io/sstable/metadata/IMetadataSerializer.java | 17 +-
.../io/sstable/metadata/MetadataSerializer.java | 24 +-
.../apache/cassandra/net/BufferPoolAllocator.java | 2 +-
.../apache/cassandra/streaming/OutgoingStream.java | 7 +-
.../apache/cassandra/streaming/StreamSession.java | 2 +-
.../cassandra/streaming/StreamTransferTask.java | 2 +-
.../io/sstable/format/ForwardingSSTableReader.java | 70 +--
.../microbench/ZeroCopyStreamingBenchmark.java | 33 +-
test/unit/org/apache/cassandra/Util.java | 23 +
.../apache/cassandra/db/ColumnFamilyStoreTest.java | 2 +-
.../unit/org/apache/cassandra/db/KeyspaceTest.java | 2 +-
.../cassandra/db/lifecycle/LogTransactionTest.java | 2 +-
.../CassandraEntireSSTableStreamWriterTest.java | 43 +-
.../db/streaming/CassandraStreamHeaderTest.java | 121 +++++
...TableStreamConcurrentComponentMutationTest.java | 332 +++++++++++++
.../cassandra/io/sstable/SSTableReaderTest.java | 4 +-
...ntireSSTableStreamingCorrectFilesCountTest.java | 14 +-
.../compression/CompressedInputStreamTest.java | 2 +-
42 files changed, 1610 insertions(+), 712 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index afa03ac..3af602c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@
* Avoid trying to keep track of RTs for endpoints we won't write to during read repair (CASSANDRA-16084)
* When compaction gets interrupted, the exception should include the compactionId (CASSANDRA-15954)
* Make Table/Keyspace Metric Names Consistent With Each Other (CASSANDRA-15909)
+ * Mutating sstable component may race with entire-sstable-streaming(ZCS) causing checksum validation failure (CASSANDRA-15861)
4.0-beta2
* Add addition incremental repair visibility to nodetool repair_admin (CASSANDRA-14939)
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 3a05e50..2e1fd0e 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -1196,8 +1196,7 @@ public class CompactionStrategyManager implements INotificationConsumer
{
for (SSTableReader sstable: sstables)
{
- sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, repairedAt, pendingRepair, isTransient);
- sstable.reloadSSTableMetadata();
+ sstable.mutateRepairedAndReload(repairedAt, pendingRepair, isTransient);
verifyMetadata(sstable, repairedAt, pendingRepair, isTransient);
changed.add(sstable);
}
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 2c32361..37c9435 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -164,8 +164,7 @@ public class LeveledManifest
try
{
logger.debug("Could not add sstable {} in level {} - dropping to 0", reader, reader.getSSTableLevel());
- reader.descriptor.getMetadataSerializer().mutateLevel(reader.descriptor, 0);
- reader.reloadSSTableMetadata();
+ reader.mutateLevelAndReload(0);
}
catch (IOException e)
{
@@ -281,8 +280,7 @@ public class LeveledManifest
remove(sstable);
try
{
- sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 0);
- sstable.reloadSSTableMetadata();
+ sstable.mutateLevelAndReload(0);
add(sstable);
}
catch (IOException e)
diff --git a/src/java/org/apache/cassandra/db/compaction/SingleSSTableLCSTask.java b/src/java/org/apache/cassandra/db/compaction/SingleSSTableLCSTask.java
index 02a1c49..2e1dffc 100644
--- a/src/java/org/apache/cassandra/db/compaction/SingleSSTableLCSTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/SingleSSTableLCSTask.java
@@ -77,8 +77,7 @@ public class SingleSSTableLCSTask extends AbstractCompactionTask
try
{
logger.info("Changing level on {} from {} to {}", sstable, metadataBefore.sstableLevel, level);
- sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, level);
- sstable.reloadSSTableMetadata();
+ sstable.mutateLevelAndReload(level);
}
catch (Throwable t)
{
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index 2500a24..577f136 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -466,8 +466,7 @@ public class Verifier implements Closeable
{
try
{
- sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getPendingRepair(), sstable.isTransient());
- sstable.reloadSSTableMetadata();
+ sstable.mutateRepairedAndReload(ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getPendingRepair(), sstable.isTransient());
cfs.getTracker().notifySSTableRepairedStatusChanged(Collections.singleton(sstable));
}
catch(IOException ioe)
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
index 37b1a01..2491fe1 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
@@ -123,10 +123,6 @@ public class CassandraCompressedStreamReader extends CassandraStreamReader
@Override
protected long totalSize()
{
- long size = 0;
- // calculate total length of transferring chunks
- for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
- size += chunk.length + 4; // 4 bytes for CRC
- return size;
+ return compressionInfo.getTotalSize();
}
}
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
index d92314b..8d0b67f 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db.streaming;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -48,11 +47,13 @@ public class CassandraCompressedStreamWriter extends CassandraStreamWriter
private static final Logger logger = LoggerFactory.getLogger(CassandraCompressedStreamWriter.class);
private final CompressionInfo compressionInfo;
+ private final long totalSize;
- public CassandraCompressedStreamWriter(SSTableReader sstable, Collection<SSTableReader.PartitionPositionBounds> sections, CompressionInfo compressionInfo, StreamSession session)
+ public CassandraCompressedStreamWriter(SSTableReader sstable, CassandraStreamHeader header, StreamSession session)
{
- super(sstable, sections, session);
- this.compressionInfo = compressionInfo;
+ super(sstable, header, session);
+ this.compressionInfo = header.compressionInfo;
+ this.totalSize = header.size();
}
@Override
@@ -67,7 +68,7 @@ public class CassandraCompressedStreamWriter extends CassandraStreamWriter
long progress = 0L;
// we want to send continuous chunks together to minimise reads from disk and network writes
- List<Section> sections = fuseAdjacentChunks(compressionInfo.chunks);
+ List<Section> sections = fuseAdjacentChunks(compressionInfo.chunks());
int sectionIdx = 0;
@@ -106,11 +107,7 @@ public class CassandraCompressedStreamWriter extends CassandraStreamWriter
@Override
protected long totalSize()
{
- long size = 0;
- // calculate total length of transferring chunks
- for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
- size += chunk.length + 4; // 4 bytes for CRC
- return size;
+ return totalSize;
}
// chunks are assumed to be sorted by offset
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
index eac37d1..0bfe993 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
@@ -21,8 +21,8 @@ package org.apache.cassandra.db.streaming;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
+import java.util.function.UnaryOperator;
-import com.google.common.base.Function;
import com.google.common.base.Throwables;
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.slf4j.Logger;
@@ -135,9 +135,11 @@ public class CassandraEntireSSTableStreamReader implements IStreamReader
prettyPrintMemory(totalSize));
}
- Function<StatsMetadata, StatsMetadata> transform = stats -> stats.mutateLevel(header.sstableLevel)
- .mutateRepairedMetadata(messageHeader.repairedAt, messageHeader.pendingRepair, false);
- writer.descriptor.getMetadataSerializer().mutate(writer.descriptor, transform);
+ UnaryOperator<StatsMetadata> transform = stats -> stats.mutateLevel(header.sstableLevel)
+ .mutateRepairedMetadata(messageHeader.repairedAt, messageHeader.pendingRepair, false);
+ String description = String.format("level %s and repairedAt time %s and pendingRepair %s",
+ header.sstableLevel, messageHeader.repairedAt, messageHeader.pendingRepair);
+ writer.descriptor.getMetadataSerializer().mutate(writer.descriptor, description, transform);
return writer;
}
catch (Throwable e)
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java
index 401b20e..ef82eb2 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java
@@ -19,7 +19,6 @@
package org.apache.cassandra.db.streaming;
import java.io.IOException;
-import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import org.slf4j.Logger;
@@ -43,15 +42,17 @@ public class CassandraEntireSSTableStreamWriter
private static final Logger logger = LoggerFactory.getLogger(CassandraEntireSSTableStreamWriter.class);
private final SSTableReader sstable;
+ private final ComponentContext context;
private final ComponentManifest manifest;
private final StreamSession session;
private final StreamRateLimiter limiter;
- public CassandraEntireSSTableStreamWriter(SSTableReader sstable, StreamSession session, ComponentManifest manifest)
+ public CassandraEntireSSTableStreamWriter(SSTableReader sstable, StreamSession session, ComponentContext context)
{
this.session = session;
this.sstable = sstable;
- this.manifest = manifest;
+ this.context = context;
+ this.manifest = context.manifest();
this.limiter = StreamManager.getRateLimiter(session.peer);
}
@@ -76,11 +77,8 @@ public class CassandraEntireSSTableStreamWriter
for (Component component : manifest.components())
{
- @SuppressWarnings("resource") // this is closed after the file is transferred by AsyncChannelOutputPlus
- FileChannel in = new RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel();
-
// Total Length to transmit for this file
- long length = in.size();
+ long length = manifest.sizeOf(component);
// tracks write progress
logger.debug("[Stream #{}] Streaming {}.{} gen {} component {} size {}", session.planId(),
@@ -90,7 +88,9 @@ public class CassandraEntireSSTableStreamWriter
component,
prettyPrintMemory(length));
- long bytesWritten = out.writeFileToChannel(in, limiter);
+ @SuppressWarnings("resource") // this is closed after the file is transferred by AsyncChannelOutputPlus
+ FileChannel channel = context.channel(sstable.descriptor, component, length);
+ long bytesWritten = out.writeFileToChannel(channel, limiter);
progress += bytesWritten;
session.progress(sstable.descriptor.filenameFor(component), ProgressInfo.Direction.OUT, bytesWritten, length);
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
index 0917fba..0904720 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
@@ -18,21 +18,17 @@
package org.apache.cassandra.db.streaming;
-import java.io.File;
import java.io.IOException;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.net.AsyncStreamingOutputPlus;
@@ -47,19 +43,13 @@ import org.apache.cassandra.utils.concurrent.Ref;
*/
public class CassandraOutgoingFile implements OutgoingStream
{
- public static final List<Component> STREAM_COMPONENTS = ImmutableList.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS,
- Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY,
- Component.DIGEST, Component.CRC);
-
private final Ref<SSTableReader> ref;
private final long estimatedKeys;
private final List<SSTableReader.PartitionPositionBounds> sections;
private final String filename;
- private final CassandraStreamHeader header;
- private final boolean keepSSTableLevel;
- private final ComponentManifest manifest;
-
private final boolean shouldStreamEntireSSTable;
+ private final StreamOperation operation;
+ private final CassandraStreamHeader header;
public CassandraOutgoingFile(StreamOperation operation, Ref<SSTableReader> ref,
List<SSTableReader.PartitionPositionBounds> sections, List<Range<Token>> normalizedRanges,
@@ -67,45 +57,48 @@ public class CassandraOutgoingFile implements OutgoingStream
{
Preconditions.checkNotNull(ref.get());
Range.assertNormalized(normalizedRanges);
+ this.operation = operation;
this.ref = ref;
this.estimatedKeys = estimatedKeys;
this.sections = sections;
- this.filename = ref.get().getFilename();
- this.manifest = getComponentManifest(ref.get());
- this.shouldStreamEntireSSTable = computeShouldStreamEntireSSTables();
SSTableReader sstable = ref.get();
- keepSSTableLevel = operation == StreamOperation.BOOTSTRAP || operation == StreamOperation.REBUILD;
- this.header =
- CassandraStreamHeader.builder()
- .withSSTableFormat(sstable.descriptor.formatType)
- .withSSTableVersion(sstable.descriptor.version)
- .withSSTableLevel(keepSSTableLevel ? sstable.getSSTableLevel() : 0)
- .withEstimatedKeys(estimatedKeys)
- .withSections(sections)
- .withCompressionMetadata(sstable.compression ? sstable.getCompressionMetadata() : null)
- .withSerializationHeader(sstable.header.toComponent())
- .isEntireSSTable(shouldStreamEntireSSTable)
- .withComponentManifest(manifest)
- .withFirstKey(sstable.first)
- .withTableId(sstable.metadata().id)
- .build();
- }
-
- @VisibleForTesting
- public static ComponentManifest getComponentManifest(SSTableReader sstable)
- {
- LinkedHashMap<Component, Long> components = new LinkedHashMap<>(STREAM_COMPONENTS.size());
- for (Component component : STREAM_COMPONENTS)
- {
- File file = new File(sstable.descriptor.filenameFor(component));
- if (file.exists())
- components.put(component, file.length());
- }
- return new ComponentManifest(components);
+ this.filename = sstable.getFilename();
+ this.shouldStreamEntireSSTable = computeShouldStreamEntireSSTables();
+ ComponentManifest manifest = ComponentManifest.create(sstable.descriptor);
+ this.header = makeHeader(sstable, operation, sections, estimatedKeys, shouldStreamEntireSSTable, manifest);
+ }
+
+ private static CassandraStreamHeader makeHeader(SSTableReader sstable,
+ StreamOperation operation,
+ List<SSTableReader.PartitionPositionBounds> sections,
+ long estimatedKeys,
+ boolean shouldStreamEntireSSTable,
+ ComponentManifest manifest)
+ {
+ boolean keepSSTableLevel = operation == StreamOperation.BOOTSTRAP || operation == StreamOperation.REBUILD;
+
+ CompressionInfo compressionInfo = sstable.compression
+ ? CompressionInfo.newLazyInstance(sstable.getCompressionMetadata(), sections)
+ : null;
+
+ return CassandraStreamHeader.builder()
+ .withSSTableFormat(sstable.descriptor.formatType)
+ .withSSTableVersion(sstable.descriptor.version)
+ .withSSTableLevel(keepSSTableLevel ? sstable.getSSTableLevel() : 0)
+ .withEstimatedKeys(estimatedKeys)
+ .withSections(sections)
+ .withCompressionInfo(compressionInfo)
+ .withSerializationHeader(sstable.header.toComponent())
+ .isEntireSSTable(shouldStreamEntireSSTable)
+ .withComponentManifest(manifest)
+ .withFirstKey(sstable.first)
+ .withTableId(sstable.metadata().id)
+ .build();
}
+ @VisibleForTesting
public static CassandraOutgoingFile fromStream(OutgoingStream stream)
{
Preconditions.checkArgument(stream instanceof CassandraOutgoingFile);
@@ -125,7 +118,7 @@ public class CassandraOutgoingFile implements OutgoingStream
}
@Override
- public long getSize()
+ public long getEstimatedSize()
{
return header.size();
}
@@ -139,7 +132,7 @@ public class CassandraOutgoingFile implements OutgoingStream
@Override
public int getNumFiles()
{
- return shouldStreamEntireSSTable ? getManifestSize() : 1;
+ return shouldStreamEntireSSTable ? header.componentManifest.components().size() : 1;
}
@Override
@@ -154,29 +147,40 @@ public class CassandraOutgoingFile implements OutgoingStream
return ref.get().getPendingRepair();
}
- public int getManifestSize()
- {
- return manifest.components().size();
- }
-
@Override
public void write(StreamSession session, DataOutputStreamPlus out, int version) throws IOException
{
+ // FileStreamTask uses AsyncStreamingOutputPlus for streaming.
+ assert out instanceof AsyncStreamingOutputPlus : "Unexpected DataOutputStreamPlus " + out.getClass();
+
SSTableReader sstable = ref.get();
- CassandraStreamHeader.serializer.serialize(header, out, version);
- out.flush();
- if (shouldStreamEntireSSTable && out instanceof AsyncStreamingOutputPlus)
+ if (shouldStreamEntireSSTable)
{
- CassandraEntireSSTableStreamWriter writer = new CassandraEntireSSTableStreamWriter(sstable, session, manifest);
- writer.write((AsyncStreamingOutputPlus) out);
+ // Acquire lock to avoid concurrent sstable component mutation because of stats update or index summary
+ // redistribution, otherwise file sizes recorded in component manifest will be different from actual
+ // file sizes. (Note: Windows doesn't support atomic replace and index summary redistribution deletes
+ // existing file first)
+ // Recreate the latest manifest and hard links for mutatable components in case they are modified.
+ try (ComponentContext context = sstable.runWithLock(ignored -> ComponentContext.create(sstable.descriptor)))
+ {
+ CassandraStreamHeader current = makeHeader(sstable, operation, sections, estimatedKeys, true, context.manifest());
+ CassandraStreamHeader.serializer.serialize(current, out, version);
+ out.flush();
+
+ CassandraEntireSSTableStreamWriter writer = new CassandraEntireSSTableStreamWriter(sstable, session, context);
+ writer.write((AsyncStreamingOutputPlus) out);
+ }
}
else
{
- CassandraStreamWriter writer = (header.compressionInfo == null) ?
- new CassandraStreamWriter(sstable, header.sections, session) :
- new CassandraCompressedStreamWriter(sstable, header.sections,
- header.compressionInfo, session);
+ // legacy streaming is not affected by stats metadata mutation and index sumary redistribution
+ CassandraStreamHeader.serializer.serialize(header, out, version);
+ out.flush();
+
+ CassandraStreamWriter writer = header.isCompressed() ?
+ new CassandraCompressedStreamWriter(sstable, header, session) :
+ new CassandraStreamWriter(sstable, header, session);
writer.write(out);
}
}
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java
index 2af56de..c9e10cf 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java
@@ -32,7 +32,6 @@ import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.Version;
@@ -52,13 +51,7 @@ public class CassandraStreamHeader
public final SSTableFormat.Type format;
public final long estimatedKeys;
public final List<SSTableReader.PartitionPositionBounds> sections;
- /**
- * Compression info for SSTable to send. Can be null if SSTable is not compressed.
- * On sender, this field is always null to avoid holding large number of Chunks.
- * Use compressionMetadata instead.
- */
- private final CompressionMetadata compressionMetadata;
- public volatile CompressionInfo compressionInfo;
+ public final CompressionInfo compressionInfo;
public final int sstableLevel;
public final SerializationHeader.Component serializationHeader;
@@ -70,7 +63,7 @@ public class CassandraStreamHeader
public final ComponentManifest componentManifest;
/* cached size value */
- private transient final long size;
+ private final long size;
private CassandraStreamHeader(Builder builder)
{
@@ -78,7 +71,6 @@ public class CassandraStreamHeader
format = builder.format;
estimatedKeys = builder.estimatedKeys;
sections = builder.sections;
- compressionMetadata = builder.compressionMetadata;
compressionInfo = builder.compressionInfo;
sstableLevel = builder.sstableLevel;
serializationHeader = builder.serializationHeader;
@@ -107,48 +99,22 @@ public class CassandraStreamHeader
return size;
}
- private long calculateSize()
+ @VisibleForTesting
+ public long calculateSize()
{
if (isEntireSSTable)
return componentManifest.totalSize();
- long transferSize = 0;
if (compressionInfo != null)
- {
- // calculate total length of transferring chunks
- for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
- transferSize += chunk.length + 4; // 4 bytes for CRC
- }
- else
- {
- for (SSTableReader.PartitionPositionBounds section : sections)
- transferSize += section.upperPosition - section.lowerPosition;
- }
- return transferSize;
- }
+ return compressionInfo.getTotalSize();
- public synchronized void calculateCompressionInfo()
- {
- if (compressionMetadata != null && compressionInfo == null)
- compressionInfo = CompressionInfo.fromCompressionMetadata(compressionMetadata, sections);
+ long transferSize = 0;
+ for (SSTableReader.PartitionPositionBounds section : sections)
+ transferSize += section.upperPosition - section.lowerPosition;
+ return transferSize;
}
@Override
- public String toString()
- {
- return "CassandraStreamHeader{" +
- "version=" + version +
- ", format=" + format +
- ", estimatedKeys=" + estimatedKeys +
- ", sections=" + sections +
- ", sstableLevel=" + sstableLevel +
- ", header=" + serializationHeader +
- ", isEntireSSTable=" + isEntireSSTable +
- ", firstKey=" + firstKey +
- ", tableId=" + tableId +
- '}';
- }
-
public boolean equals(Object o)
{
if (this == o) return true;
@@ -167,12 +133,29 @@ public class CassandraStreamHeader
Objects.equals(tableId, that.tableId);
}
+ @Override
public int hashCode()
{
return Objects.hash(version, format, estimatedKeys, sections, compressionInfo, sstableLevel, serializationHeader, componentManifest,
isEntireSSTable, firstKey, tableId);
}
+ @Override
+ public String toString()
+ {
+ return "CassandraStreamHeader{" +
+ "version=" + version +
+ ", format=" + format +
+ ", estimatedKeys=" + estimatedKeys +
+ ", sections=" + sections +
+ ", sstableLevel=" + sstableLevel +
+ ", header=" + serializationHeader +
+ ", isEntireSSTable=" + isEntireSSTable +
+ ", firstKey=" + firstKey +
+ ", tableId=" + tableId +
+ '}';
+ }
+
public static final IVersionedSerializer<CassandraStreamHeader> serializer = new CassandraStreamHeaderSerializer();
public static class CassandraStreamHeaderSerializer implements IVersionedSerializer<CassandraStreamHeader>
@@ -189,7 +172,6 @@ public class CassandraStreamHeader
out.writeLong(section.lowerPosition);
out.writeLong(section.upperPosition);
}
- header.calculateCompressionInfo();
CompressionInfo.serializer.serialize(header.compressionInfo, out, version);
out.writeInt(header.sstableLevel);
@@ -275,7 +257,6 @@ public class CassandraStreamHeader
size += TypeSizes.sizeof(section.upperPosition);
}
- header.calculateCompressionInfo();
size += CompressionInfo.serializer.serializedSize(header.compressionInfo, version);
size += TypeSizes.sizeof(header.sstableLevel);
@@ -299,7 +280,6 @@ public class CassandraStreamHeader
private SSTableFormat.Type format;
private long estimatedKeys;
private List<SSTableReader.PartitionPositionBounds> sections;
- private CompressionMetadata compressionMetadata;
private CompressionInfo compressionInfo;
private int sstableLevel;
private SerializationHeader.Component serializationHeader;
@@ -338,12 +318,6 @@ public class CassandraStreamHeader
return this;
}
- public Builder withCompressionMetadata(CompressionMetadata compressionMetadata)
- {
- this.compressionMetadata = compressionMetadata;
- return this;
- }
-
public Builder withCompressionInfo(CompressionInfo compressionInfo)
{
this.compressionInfo = compressionInfo;
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
index 8382f0a..10296fb 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
@@ -59,13 +59,15 @@ public class CassandraStreamWriter
protected final Collection<SSTableReader.PartitionPositionBounds> sections;
protected final StreamRateLimiter limiter;
protected final StreamSession session;
+ private final long totalSize;
- public CassandraStreamWriter(SSTableReader sstable, Collection<SSTableReader.PartitionPositionBounds> sections, StreamSession session)
+ public CassandraStreamWriter(SSTableReader sstable, CassandraStreamHeader header, StreamSession session)
{
this.session = session;
this.sstable = sstable;
- this.sections = sections;
+ this.sections = header.sections;
this.limiter = StreamManager.getRateLimiter(session.peer);
+ this.totalSize = header.size();
}
/**
@@ -127,10 +129,7 @@ public class CassandraStreamWriter
protected long totalSize()
{
- long size = 0;
- for (SSTableReader.PartitionPositionBounds section : sections)
- size += section.upperPosition - section.lowerPosition;
- return size;
+ return totalSize;
}
/**
diff --git a/src/java/org/apache/cassandra/db/streaming/ComponentContext.java b/src/java/org/apache/cassandra/db/streaming/ComponentContext.java
new file mode 100644
index 0000000..b9c60b9
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/streaming/ComponentContext.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.util.FileUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Mutable SSTable components and their hardlinks to avoid concurrent sstable component modification
+ * during entire-sstable-streaming.
+ */
+public class ComponentContext implements AutoCloseable
+{
+ private static final Logger logger = LoggerFactory.getLogger(ComponentContext.class);
+
+ private static final Set<Component> MUTABLE_COMPONENTS = ImmutableSet.of(Component.STATS, Component.SUMMARY);
+
+ private final Map<Component, File> hardLinks;
+ private final ComponentManifest manifest;
+
+ private ComponentContext(Map<Component, File> hardLinks, ComponentManifest manifest)
+ {
+ this.hardLinks = hardLinks;
+ this.manifest = manifest;
+ }
+
+ public static ComponentContext create(Descriptor descriptor)
+ {
+ Map<Component, File> hardLinks = new HashMap<>(1);
+
+ for (Component component : MUTABLE_COMPONENTS)
+ {
+ File file = new File(descriptor.filenameFor(component));
+ if (!file.exists())
+ continue;
+
+ File hardlink = new File(descriptor.tmpFilenameForStreaming(component));
+ FileUtils.createHardLink(file, hardlink);
+ hardLinks.put(component, hardlink);
+ }
+
+ return new ComponentContext(hardLinks, ComponentManifest.create(descriptor));
+ }
+
+ public ComponentManifest manifest()
+ {
+ return manifest;
+ }
+
+ /**
+ * @return file channel to be streamed, either original component or hardlinked component.
+ */
+ public FileChannel channel(Descriptor descriptor, Component component, long size) throws IOException
+ {
+ String toTransfer = hardLinks.containsKey(component) ? hardLinks.get(component).getPath() : descriptor.filenameFor(component);
+ @SuppressWarnings("resource") // file channel will be closed by Caller
+ FileChannel channel = new RandomAccessFile(toTransfer, "r").getChannel();
+
+ assert size == channel.size() : String.format("Entire sstable streaming expects %s file size to be %s but got %s.",
+ component, size, channel.size());
+ return channel;
+ }
+
+ @Override
+ public void close()
+ {
+ Throwable accumulate = null;
+ for (File file : hardLinks.values())
+ accumulate = FileUtils.deleteWithConfirm(file, accumulate);
+
+ hardLinks.clear();
+
+ if (accumulate != null)
+ logger.warn("Failed to remove hard link files", accumulate);
+ }
+}
diff --git a/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java b/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java
index 90e3dbd..bb896ca 100644
--- a/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java
+++ b/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java
@@ -18,23 +18,29 @@
package org.apache.cassandra.db.streaming;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
-
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * SSTable components and their sizes to be tranfered via entire-sstable-streaming
+ */
public final class ComponentManifest implements Iterable<Component>
{
+ private static final List<Component> STREAM_COMPONENTS = ImmutableList.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS,
+ Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY,
+ Component.DIGEST, Component.CRC);
+
private final LinkedHashMap<Component, Long> components;
public ComponentManifest(Map<Component, Long> components)
@@ -42,6 +48,23 @@ public final class ComponentManifest implements Iterable<Component>
this.components = new LinkedHashMap<>(components);
}
+ @VisibleForTesting
+ public static ComponentManifest create(Descriptor descriptor)
+ {
+ LinkedHashMap<Component, Long> components = new LinkedHashMap<>(STREAM_COMPONENTS.size());
+
+ for (Component component : STREAM_COMPONENTS)
+ {
+ File file = new File(descriptor.filenameFor(component));
+ if (!file.exists())
+ continue;
+
+ components.put(component, file.length());
+ }
+
+ return new ComponentManifest(components);
+ }
+
public long sizeOf(Component component)
{
Long size = components.get(component);
diff --git a/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java b/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java
index b8626ff..d0b1e4c 100644
--- a/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java
@@ -75,15 +75,15 @@ public class CompressedInputStream extends RebufferingInputStream implements Aut
ChecksumType checksumType,
DoubleSupplier validateChecksumChance)
{
- super(ByteBuffer.allocateDirect(compressionInfo.parameters.chunkLength()));
+ super(ByteBuffer.allocateDirect(compressionInfo.parameters().chunkLength()));
buffer.limit(0);
this.input = input;
this.checksumType = checksumType;
this.validateChecksumChance = validateChecksumChance;
- compressionParams = compressionInfo.parameters;
- compressedChunks = Iterators.forArray(compressionInfo.chunks);
+ compressionParams = compressionInfo.parameters();
+ compressedChunks = Iterators.forArray(compressionInfo.chunks());
compressedChunk = ByteBuffer.allocateDirect(compressionParams.chunkLength());
}
diff --git a/src/java/org/apache/cassandra/db/streaming/CompressionInfo.java b/src/java/org/apache/cassandra/db/streaming/CompressionInfo.java
index aef57e3..84563da 100644
--- a/src/java/org/apache/cassandra/db/streaming/CompressionInfo.java
+++ b/src/java/org/apache/cassandra/db/streaming/CompressionInfo.java
@@ -18,11 +18,14 @@
package org.apache.cassandra.db.streaming;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.compress.CompressionMetadata.Chunk;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.io.util.DataInputPlus;
@@ -31,31 +34,133 @@ import org.apache.cassandra.io.util.DataOutputPlus;
/**
* Container that carries compression parameters and chunks to decompress data from stream.
*/
-public class CompressionInfo
+public abstract class CompressionInfo
{
public static final IVersionedSerializer<CompressionInfo> serializer = new CompressionInfoSerializer();
- public final CompressionMetadata.Chunk[] chunks;
- public final CompressionParams parameters;
+ /**
+ * Returns the compression parameters.
+ *
+ * @return the compression parameters.
+ */
+ public abstract CompressionParams parameters();
- public CompressionInfo(CompressionMetadata.Chunk[] chunks, CompressionParams parameters)
+ /**
+ * Returns the offset and length of the file chunks.
+ *
+ * @return the offset and length of the file chunks.
+ */
+ public abstract CompressionMetadata.Chunk[] chunks();
+
+ /**
+ * Computes the size of the file to transfer.
+ *
+ * @return the size of the file in bytes
+ */
+ public long getTotalSize()
+ {
+ long size = 0;
+ for (CompressionMetadata.Chunk chunk : chunks())
+ {
+ size += chunk.length + 4; // 4 bytes for CRC
+ }
+ return size;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ return true;
+
+ if (!(o instanceof CompressionInfo))
+ return false;
+
+ CompressionInfo that = (CompressionInfo) o;
+
+ return Objects.equals(parameters(), that.parameters())
+ && Arrays.equals(chunks(), that.chunks());
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(parameters(), chunks());
+ }
+
+ /**
+ * Create a {@code CompressionInfo} instance which is fully initialized.
+ *
+ * @param chunks the file chunks
+ * @param parameters the compression parameters
+ */
+ public static CompressionInfo newInstance(CompressionMetadata.Chunk[] chunks, CompressionParams parameters)
{
assert chunks != null && parameters != null;
- this.chunks = chunks;
- this.parameters = parameters;
+
+ return new CompressionInfo()
+ {
+ @Override
+ public Chunk[] chunks()
+ {
+ return chunks;
+ }
+
+ @Override
+ public CompressionParams parameters()
+ {
+ return parameters;
+ }
+ };
}
- static CompressionInfo fromCompressionMetadata(CompressionMetadata metadata, List<SSTableReader.PartitionPositionBounds> sections)
+ /**
+ * Create a {@code CompressionInfo} that will computes the file chunks only upon request.
+ *
+ * <p>The instance returned by that method will only computes the file chunks when the {@code chunks},
+ * {@code equals} or {@code hashcode} methods are called for the first time. This is done to reduce the GC
+ * pressure. See CASSANDRA-10680 for more details</p>.
+ *
+ * @param metadata the compression metadata
+ * @param sections the file sections
+ * @return a {@code CompressionInfo} that will computes the file chunks only upon request.
+ */
+ static CompressionInfo newLazyInstance(CompressionMetadata metadata, List<SSTableReader.PartitionPositionBounds> sections)
{
if (metadata == null)
{
return null;
}
- else
+
+ return new CompressionInfo()
{
- return new CompressionInfo(metadata.getChunksForSections(sections), metadata.parameters);
- }
+ private volatile Chunk[] chunks;
+
+ @Override
+ public synchronized Chunk[] chunks()
+ {
+ if (chunks == null)
+ chunks = metadata.getChunksForSections(sections);
+ return chunks;
+ }
+
+ @Override
+ public CompressionParams parameters()
+ {
+ return metadata.parameters;
+ }
+
+ @Override
+ public long getTotalSize()
+ {
+ // If the chunks have not been loaded yet we avoid to compute them.
+ if (chunks == null)
+ return metadata.getTotalSizeForSections(sections);
+
+ return super.getTotalSize();
+ }
+ };
}
static class CompressionInfoSerializer implements IVersionedSerializer<CompressionInfo>
@@ -68,12 +173,13 @@ public class CompressionInfo
return;
}
- int chunkCount = info.chunks.length;
+ Chunk[] chunks = info.chunks();
+ int chunkCount = chunks.length;
out.writeInt(chunkCount);
for (int i = 0; i < chunkCount; i++)
- CompressionMetadata.Chunk.serializer.serialize(info.chunks[i], out, version);
+ CompressionMetadata.Chunk.serializer.serialize(chunks[i], out, version);
// compression params
- CompressionParams.serializer.serialize(info.parameters, out, version);
+ CompressionParams.serializer.serialize(info.parameters(), out, version);
}
public CompressionInfo deserialize(DataInputPlus in, int version) throws IOException
@@ -89,7 +195,7 @@ public class CompressionInfo
// compression params
CompressionParams parameters = CompressionParams.serializer.deserialize(in, version);
- return new CompressionInfo(chunks, parameters);
+ return CompressionInfo.newInstance(chunks, parameters);
}
public long serializedSize(CompressionInfo info, int version)
@@ -98,12 +204,13 @@ public class CompressionInfo
return TypeSizes.sizeof(-1);
// chunks
- int chunkCount = info.chunks.length;
+ Chunk[] chunks = info.chunks();
+ int chunkCount = chunks.length;
long size = TypeSizes.sizeof(chunkCount);
for (int i = 0; i < chunkCount; i++)
- size += CompressionMetadata.Chunk.serializer.serializedSize(info.chunks[i], version);
+ size += CompressionMetadata.Chunk.serializer.serializedSize(chunks[i], version);
// compression params
- size += CompressionParams.serializer.serializedSize(info.parameters, version);
+ size += CompressionParams.serializer.serializedSize(info.parameters(), version);
return size;
}
}
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index fca0f8d..dc67451 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.sstable.metadata.IMetadataSerializer;
import org.apache.cassandra.io.sstable.metadata.MetadataSerializer;
import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.UUIDGen;
import static org.apache.cassandra.io.sstable.Component.separator;
@@ -114,6 +115,16 @@ public class Descriptor
return filenameFor(component) + TMP_EXT;
}
+ /**
+ * @return a unique temporary file name for given component during entire-sstable-streaming.
+ */
+ public String tmpFilenameForStreaming(Component component)
+ {
+ // Use UUID to handle concurrent streamings on the same sstable.
+ // TMP_EXT allows temp file to be removed by {@link ColumnFamilyStore#scrubDataDirectories}
+ return String.format("%s.%s%s", filenameFor(component), UUIDGen.getTimeUUID(), TMP_EXT);
+ }
+
public String filenameFor(Component component)
{
return baseFilename() + separator + component.name();
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index 880b738..a85d855 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -209,14 +209,14 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
for (ColumnFamilyStore cfStore: ks.getColumnFamilyStores())
{
Set<SSTableReader> nonCompacting, allSSTables;
- LifecycleTransaction txn = null;
+ LifecycleTransaction txn;
do
{
View view = cfStore.getTracker().getView();
allSSTables = ImmutableSet.copyOf(view.select(SSTableSet.CANONICAL));
nonCompacting = ImmutableSet.copyOf(view.getUncompacting(allSSTables));
}
- while (null == (txn = cfStore.getTracker().tryModify(nonCompacting, OperationType.UNKNOWN)));
+ while (null == (txn = cfStore.getTracker().tryModify(nonCompacting, OperationType.INDEX_SUMMARY)));
allNonCompacting.put(cfStore.metadata.id, txn);
allCompacting.addAll(Sets.difference(allSSTables, nonCompacting));
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index 353b624..0471be3 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -265,7 +265,7 @@ public abstract class SSTable
}
/** @return An estimate of the number of keys contained in the given index file. */
- protected long estimateRowsFromIndex(RandomAccessReader ifile) throws IOException
+ public static long estimateRowsFromIndex(RandomAccessReader ifile, Descriptor descriptor) throws IOException
{
// collect sizes for the first 10000 keys, or first 10 megabytes of data
final int SAMPLES_CAP = 10000, BYTES_CAP = (int)Math.min(10000000, ifile.length());
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index cd929ba..3e14422 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -17,8 +17,6 @@
*/
package org.apache.cassandra.io.sstable.format;
-import java.nio.file.Files;
-import java.nio.file.Paths;
import java.io.*;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
@@ -39,13 +37,11 @@ import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
import com.clearspring.analytics.stream.cardinality.ICardinality;
-import org.apache.cassandra.cache.ChunkCache;
import org.apache.cassandra.cache.InstrumentingCache;
import org.apache.cassandra.cache.KeyCacheKey;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.concurrent.ScheduledExecutors;
-import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnFilter;
@@ -68,7 +64,6 @@ import org.apache.cassandra.schema.CachingParams;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.CacheService;
@@ -204,12 +199,12 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
public final UniqueIdentifier instanceId = new UniqueIdentifier();
// indexfile and datafile: might be null before a call to load()
- protected FileHandle ifile;
- protected FileHandle dfile;
- protected IndexSummary indexSummary;
- protected IFilter bf;
+ protected final FileHandle ifile;
+ protected final FileHandle dfile;
+ protected final IFilter bf;
+ public final IndexSummary indexSummary;
- protected final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
+ protected final RowIndexEntry.IndexSerializer<?> rowIndexEntrySerializer;
protected InstrumentingCache<KeyCacheKey, RowIndexEntry> keyCache;
@@ -430,52 +425,14 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
System.exit(1);
}
- long fileLength = new File(descriptor.filenameFor(Component.DATA)).length();
- logger.info("Opening {} ({})", descriptor, FBUtilities.prettyPrintMemory(fileLength));
-
- final SSTableReader sstable;
try
{
- sstable = internalOpen(descriptor,
- components,
- metadata,
- System.currentTimeMillis(),
- statsMetadata,
- OpenReason.NORMAL,
- header.toHeader(metadata.get()));
+ return new SSTableReaderBuilder.ForBatch(descriptor, metadata, components, statsMetadata, header.toHeader(metadata.get())).build();
}
catch (UnknownColumnException e)
{
throw new IllegalStateException(e);
}
-
- try(FileHandle.Builder ibuilder = new FileHandle.Builder(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX))
- .mmapped(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap)
- .withChunkCache(ChunkCache.instance);
- FileHandle.Builder dbuilder = new FileHandle.Builder(sstable.descriptor.filenameFor(Component.DATA)).compressed(sstable.compression)
- .mmapped(DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap)
- .withChunkCache(ChunkCache.instance))
- {
- if (!sstable.loadSummary())
- {
- try
- {
- sstable.buildSummary(false, false, Downsampling.BASE_SAMPLING_LEVEL);
- }
- catch(IOException e)
- {
- throw new CorruptSSTableException(e, sstable.getFilename());
- }
- }
- long indexFileLength = new File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length();
- int dataBufferSize = sstable.optimizationStrategy.bufferSize(statsMetadata.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
- int indexBufferSize = sstable.optimizationStrategy.bufferSize(indexFileLength / sstable.indexSummary.size());
- sstable.ifile = ibuilder.bufferSize(indexBufferSize).complete();
- sstable.dfile = dbuilder.bufferSize(dataBufferSize).complete();
- sstable.bf = FilterFactory.AlwaysPresent;
- sstable.setup(false);
- return sstable;
- }
}
/**
@@ -531,19 +488,16 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
System.exit(1);
}
- long fileLength = new File(descriptor.filenameFor(Component.DATA)).length();
- logger.info("Opening {} ({})", descriptor, FBUtilities.prettyPrintMemory(fileLength));
-
- final SSTableReader sstable;
+ SSTableReader sstable;
try
{
- sstable = internalOpen(descriptor,
- components,
- metadata,
- System.currentTimeMillis(),
- statsMetadata,
- OpenReason.NORMAL,
- header.toHeader(metadata.get()));
+ sstable = new SSTableReaderBuilder.ForRead(descriptor,
+ metadata,
+ validationMetadata,
+ isOffline,
+ components,
+ statsMetadata,
+ header.toHeader(metadata.get())).build();
}
catch (UnknownColumnException e)
{
@@ -552,12 +506,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
try
{
- // load index and filter
- long start = System.nanoTime();
- sstable.load(validationMetadata, isOffline);
- logger.trace("INDEX LOAD TIME for {}: {} ms.", descriptor, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
-
- sstable.setup(!isOffline); // Don't track hotness if we're offline.
if (validate)
sstable.validate();
@@ -626,42 +574,21 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
* Open a RowIndexedReader which already has its state initialized (by SSTableWriter).
*/
public static SSTableReader internalOpen(Descriptor desc,
- Set<Component> components,
- TableMetadataRef metadata,
- FileHandle ifile,
- FileHandle dfile,
- IndexSummary isummary,
- IFilter bf,
- long maxDataAge,
- StatsMetadata sstableMetadata,
- OpenReason openReason,
- SerializationHeader header)
- {
- assert desc != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null;
-
- SSTableReader reader = internalOpen(desc, components, metadata, maxDataAge, sstableMetadata, openReason, header);
-
- reader.bf = bf;
- reader.ifile = ifile;
- reader.dfile = dfile;
- reader.indexSummary = isummary;
- reader.setup(true);
-
- return reader;
- }
-
-
- private static SSTableReader internalOpen(final Descriptor descriptor,
- Set<Component> components,
- TableMetadataRef metadata,
- Long maxDataAge,
- StatsMetadata sstableMetadata,
- OpenReason openReason,
- SerializationHeader header)
+ Set<Component> components,
+ TableMetadataRef metadata,
+ FileHandle ifile,
+ FileHandle dfile,
+ IndexSummary summary,
+ IFilter bf,
+ long maxDataAge,
+ StatsMetadata sstableMetadata,
+ OpenReason openReason,
+ SerializationHeader header)
{
- Factory readerFactory = descriptor.getFormat().getReaderFactory();
+ assert desc != null && ifile != null && dfile != null && summary != null && bf != null && sstableMetadata != null;
- return readerFactory.open(descriptor, components, metadata, maxDataAge, sstableMetadata, openReason, header);
+ return new SSTableReaderBuilder.ForWriter(desc, metadata, maxDataAge, components, sstableMetadata, openReason, header)
+ .bf(bf).ifile(ifile).dfile(dfile).summary(summary).build();
}
/**
@@ -695,17 +622,40 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
}
}
+ protected SSTableReader(SSTableReaderBuilder builder)
+ {
+ this(builder.descriptor,
+ builder.components,
+ builder.metadataRef,
+ builder.maxDataAge,
+ builder.statsMetadata,
+ builder.openReason,
+ builder.header,
+ builder.summary,
+ builder.dfile,
+ builder.ifile,
+ builder.bf);
+ }
+
protected SSTableReader(final Descriptor desc,
Set<Component> components,
TableMetadataRef metadata,
long maxDataAge,
StatsMetadata sstableMetadata,
OpenReason openReason,
- SerializationHeader header)
+ SerializationHeader header,
+ IndexSummary summary,
+ FileHandle dfile,
+ FileHandle ifile,
+ IFilter bf)
{
super(desc, components, metadata, DatabaseDescriptor.getDiskOptimizationStrategy());
this.sstableMetadata = sstableMetadata;
this.header = header;
+ this.indexSummary = summary;
+ this.dfile = dfile;
+ this.ifile = ifile;
+ this.bf = bf;
this.maxDataAge = maxDataAge;
this.openReason = openReason;
this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata.get(), desc.version, header);
@@ -760,240 +710,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
}
/**
- * See {@link #load(boolean, boolean)}
- * @param validation Metadata for SSTable being loaded
- * @param isOffline Whether the SSTable is being loaded by an offline tool (sstabledump, scrub, etc)
- * @throws IOException
- */
- private void load(ValidationMetadata validation, boolean isOffline) throws IOException
- {
- if (metadata().params.bloomFilterFpChance == 1.0)
- {
- // bf is disabled.
- load(false, !isOffline);
- bf = FilterFactory.AlwaysPresent;
- }
- else if (!components.contains(Component.PRIMARY_INDEX)) // What happens if filter component and primary index is missing?
- {
- // avoid any reading of the missing primary index component.
- // this should only happen during StandaloneScrubber
- load(false, !isOffline);
- }
- else if (!components.contains(Component.FILTER) || validation == null)
- {
- // bf is enabled, but filter component is missing.
- load(!isOffline, !isOffline);
- if (isOffline)
- bf = FilterFactory.AlwaysPresent;
- }
- else
- {
- // bf is enabled and fp chance matches the currently configured value.
- load(false, !isOffline);
- loadBloomFilter(descriptor.version.hasOldBfFormat());
- }
- }
-
- /**
- * Load bloom filter from Filter.db file.
- *
- * @throws IOException
- * @param oldBfFormat
- */
- private void loadBloomFilter(boolean oldBfFormat) throws IOException
- {
- try (DataInputStream stream = new DataInputStream(new BufferedInputStream(Files.newInputStream(Paths.get(descriptor.filenameFor(Component.FILTER))))))
- {
- bf = BloomFilterSerializer.deserialize(stream, oldBfFormat);
- }
- }
-
- /**
- * Loads ifile, dfile and indexSummary, and optionally recreates and persists the bloom filter.
- * @param recreateBloomFilter Recreate the bloomfilter.
- * @param saveSummaryIfCreated for bulk loading purposes, if the summary was absent and needed to be built, you can
- * avoid persisting it to disk by setting this to false
- */
- private void load(boolean recreateBloomFilter, boolean saveSummaryIfCreated) throws IOException
- {
- try(FileHandle.Builder ibuilder = new FileHandle.Builder(descriptor.filenameFor(Component.PRIMARY_INDEX))
- .mmapped(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap)
- .withChunkCache(ChunkCache.instance);
- FileHandle.Builder dbuilder = new FileHandle.Builder(descriptor.filenameFor(Component.DATA)).compressed(compression)
- .mmapped(DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap)
- .withChunkCache(ChunkCache.instance))
- {
- boolean summaryLoaded = loadSummary();
- boolean buildSummary = !summaryLoaded || recreateBloomFilter;
- if (buildSummary)
- buildSummary(recreateBloomFilter, summaryLoaded, Downsampling.BASE_SAMPLING_LEVEL);
-
- int dataBufferSize = optimizationStrategy.bufferSize(sstableMetadata.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
-
- if (components.contains(Component.PRIMARY_INDEX))
- {
- long indexFileLength = new File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length();
- int indexBufferSize = optimizationStrategy.bufferSize(indexFileLength / indexSummary.size());
- ifile = ibuilder.bufferSize(indexBufferSize).complete();
- }
-
- dfile = dbuilder.bufferSize(dataBufferSize).complete();
-
- if (buildSummary)
- {
- if (saveSummaryIfCreated)
- saveSummary();
- if (recreateBloomFilter)
- saveBloomFilter();
- }
- }
- catch (Throwable t)
- { // Because the tidier has not been set-up yet in SSTableReader.open(), we must release the files in case of error
- if (ifile != null)
- {
- ifile.close();
- ifile = null;
- }
-
- if (dfile != null)
- {
- dfile.close();
- dfile = null;
- }
-
- if (indexSummary != null)
- {
- indexSummary.close();
- indexSummary = null;
- }
-
- throw t;
- }
- }
-
- /**
- * Build index summary(and optionally bloom filter) by reading through Index.db file.
- *
- * @param recreateBloomFilter true if recreate bloom filter
- * @param summaryLoaded true if index summary is already loaded and not need to build again
- * @throws IOException
- */
- private void buildSummary(boolean recreateBloomFilter, boolean summaryLoaded, int samplingLevel) throws IOException
- {
- if (!components.contains(Component.PRIMARY_INDEX))
- return;
-
- if (logger.isDebugEnabled())
- logger.debug("Attempting to build summary for {}", descriptor);
-
- // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
- try (RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))))
- {
- long indexSize = primaryIndex.length();
- long histogramCount = sstableMetadata.estimatedPartitionSize.count();
- long estimatedKeys = histogramCount > 0 && !sstableMetadata.estimatedPartitionSize.isOverflowed()
- ? histogramCount
- : estimateRowsFromIndex(primaryIndex); // statistics is supposed to be optional
-
- if (recreateBloomFilter)
- bf = FilterFactory.getFilter(estimatedKeys, metadata().params.bloomFilterFpChance);
-
- try (IndexSummaryBuilder summaryBuilder = summaryLoaded ? null : new IndexSummaryBuilder(estimatedKeys, metadata().params.minIndexInterval, samplingLevel))
- {
- long indexPosition;
-
- while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
- {
- ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
- RowIndexEntry.Serializer.skip(primaryIndex, descriptor.version);
- DecoratedKey decoratedKey = decorateKey(key);
- if (first == null)
- first = decoratedKey;
- last = decoratedKey;
-
- if (recreateBloomFilter)
- bf.add(decoratedKey);
-
- // if summary was already read from disk we don't want to re-populate it using primary index
- if (!summaryLoaded)
- {
- summaryBuilder.maybeAddEntry(decoratedKey, indexPosition);
- }
- }
-
- if (!summaryLoaded)
- indexSummary = summaryBuilder.build(getPartitioner());
- }
- }
-
- first = getMinimalKey(first);
- last = getMinimalKey(last);
- }
-
- /**
- * Load index summary from Summary.db file if it exists.
- *
- * if loaded index summary has different index interval from current value stored in schema,
- * then Summary.db file will be deleted and this returns false to rebuild summary.
- *
- * @return true if index summary is loaded successfully from Summary.db file.
- */
- @SuppressWarnings("resource")
- public boolean loadSummary()
- {
- File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
- if (!summariesFile.exists())
- {
- if (logger.isDebugEnabled())
- logger.debug("SSTable Summary File {} does not exist", summariesFile.getAbsolutePath());
- return false;
- }
-
- DataInputStream iStream = null;
- try
- {
- TableMetadata metadata = metadata();
- iStream = new DataInputStream(Files.newInputStream(summariesFile.toPath()));
- indexSummary = IndexSummary.serializer.deserialize(
- iStream, getPartitioner(),
- metadata.params.minIndexInterval, metadata.params.maxIndexInterval);
- first = decorateKey(ByteBufferUtil.readWithLength(iStream));
- last = decorateKey(ByteBufferUtil.readWithLength(iStream));
- }
- catch (IOException e)
- {
- if (indexSummary != null)
- indexSummary.close();
- logger.trace("Cannot deserialize SSTable Summary File {}: {}", summariesFile.getPath(), e.getMessage());
- // corrupted; delete it and fall back to creating a new summary
- FileUtils.closeQuietly(iStream);
- // delete it and fall back to creating a new summary
- FileUtils.deleteWithConfirm(summariesFile);
- return false;
- }
- finally
- {
- FileUtils.closeQuietly(iStream);
- }
-
- return true;
- }
-
- /**
- * Save index summary to Summary.db file.
- */
-
- public void saveSummary()
- {
- saveSummary(this.descriptor, this.first, this.last, indexSummary);
- }
-
- private void saveSummary(IndexSummary newSummary)
- {
- saveSummary(this.descriptor, this.first, this.last, newSummary);
- }
-
- /**
* Save index summary to Summary.db file.
*/
public static void saveSummary(Descriptor descriptor, DecoratedKey first, DecoratedKey last, IndexSummary summary)
@@ -1002,7 +718,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
if (summariesFile.exists())
FileUtils.deleteWithConfirm(summariesFile);
- try (DataOutputStreamPlus oStream = new BufferedDataOutputStreamPlus(new FileOutputStream(summariesFile));)
+ try (DataOutputStreamPlus oStream = new BufferedDataOutputStreamPlus(new FileOutputStream(summariesFile)))
{
IndexSummary.serializer.serialize(summary, oStream);
ByteBufferUtil.writeWithLength(first.getKey(), oStream);
@@ -1018,11 +734,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
}
}
- public void saveBloomFilter()
- {
- saveBloomFilter(this.descriptor, bf);
- }
-
public static void saveBloomFilter(Descriptor descriptor, IFilter filter)
{
File filterFile = new File(descriptor.filenameFor(Component.FILTER));
@@ -1042,6 +753,19 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
}
+ /**
+ * Execute provided task with sstable lock to avoid racing with index summary redistribution, SEE CASSANDRA-15861.
+ *
+ * @param task to be guarded by sstable lock
+ */
+ public <R> R runWithLock(CheckedFunction<Descriptor, R, IOException> task) throws IOException
+ {
+ synchronized (tidy.global)
+ {
+ return task.apply(descriptor);
+ }
+ }
+
public void setReplaced()
{
synchronized (tidy.global)
@@ -1131,12 +855,41 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
sstableMetadata,
reason,
header);
+
replacement.first = newFirst;
replacement.last = last;
replacement.isSuspect.set(isSuspect.get());
return replacement;
}
+ /**
+ * Clone this reader with the new values and set the clone as replacement.
+ *
+ * @param newBloomFilter for the replacement
+ *
+ * @return the cloned reader. That reader is set as a replacement by the method.
+ */
+ @VisibleForTesting
+ public SSTableReader cloneAndReplace(IFilter newBloomFilter)
+ {
+ SSTableReader replacement = internalOpen(descriptor,
+ components,
+ metadata,
+ ifile.sharedCopy(),
+ dfile.sharedCopy(),
+ indexSummary,
+ newBloomFilter,
+ maxDataAge,
+ sstableMetadata,
+ openReason,
+ header);
+
+ replacement.first = first;
+ replacement.last = last;
+ replacement.isSuspect.set(isSuspect.get());
+ return replacement;
+ }
+
public SSTableReader cloneWithRestoredStart(DecoratedKey restoredStart)
{
synchronized (tidy.global)
@@ -1202,39 +955,38 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
@SuppressWarnings("resource")
public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore parent, int samplingLevel) throws IOException
{
- synchronized (tidy.global)
- {
- assert openReason != OpenReason.EARLY;
+ assert openReason != OpenReason.EARLY;
- int minIndexInterval = metadata().params.minIndexInterval;
- int maxIndexInterval = metadata().params.maxIndexInterval;
- double effectiveInterval = indexSummary.getEffectiveIndexInterval();
+ int minIndexInterval = metadata().params.minIndexInterval;
+ int maxIndexInterval = metadata().params.maxIndexInterval;
+ double effectiveInterval = indexSummary.getEffectiveIndexInterval();
- IndexSummary newSummary;
+ IndexSummary newSummary;
- // We have to rebuild the summary from the on-disk primary index in three cases:
- // 1. The sampling level went up, so we need to read more entries off disk
- // 2. The min_index_interval changed (in either direction); this changes what entries would be in the summary
- // at full sampling (and consequently at any other sampling level)
- // 3. The max_index_interval was lowered, forcing us to raise the sampling level
- if (samplingLevel > indexSummary.getSamplingLevel() || indexSummary.getMinIndexInterval() != minIndexInterval || effectiveInterval > maxIndexInterval)
- {
- newSummary = buildSummaryAtLevel(samplingLevel);
- }
- else if (samplingLevel < indexSummary.getSamplingLevel())
- {
- // we can use the existing index summary to make a smaller one
- newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, getPartitioner());
- }
- else
- {
- throw new AssertionError("Attempted to clone SSTableReader with the same index summary sampling level and " +
- "no adjustments to min/max_index_interval");
- }
-
- // Always save the resampled index
- saveSummary(newSummary);
+ // We have to rebuild the summary from the on-disk primary index in three cases:
+ // 1. The sampling level went up, so we need to read more entries off disk
+ // 2. The min_index_interval changed (in either direction); this changes what entries would be in the summary
+ // at full sampling (and consequently at any other sampling level)
+ // 3. The max_index_interval was lowered, forcing us to raise the sampling level
+ if (samplingLevel > indexSummary.getSamplingLevel() || indexSummary.getMinIndexInterval() != minIndexInterval || effectiveInterval > maxIndexInterval)
+ {
+ newSummary = buildSummaryAtLevel(samplingLevel);
+ }
+ else if (samplingLevel < indexSummary.getSamplingLevel())
+ {
+ // we can use the existing index summary to make a smaller one
+ newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, getPartitioner());
+ }
+ else
+ {
+ throw new AssertionError("Attempted to clone SSTableReader with the same index summary sampling level and " +
+ "no adjustments to min/max_index_interval");
+ }
+ // Always save the resampled index with lock to avoid racing with entire-sstable streaming
+ synchronized (tidy.global)
+ {
+ saveSummary(descriptor, first, last, newSummary);
return cloneAndReplace(first, OpenReason.METADATA_CHANGE, newSummary);
}
}
@@ -1292,7 +1044,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
public void releaseSummary()
{
tidy.releaseSummary();
- indexSummary = null;
}
private void validate()
@@ -1365,14 +1116,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
return getCompressionMetadata().offHeapSize();
}
- /**
- * For testing purposes only.
- */
- public void forceFilterFailures()
- {
- bf = FilterFactory.AlwaysPresent;
- }
-
public IFilter getBloomFilter()
{
return bf;
@@ -2068,6 +1811,30 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
}
/**
+ * Mutate sstable level with a lock to avoid racing with entire-sstable-streaming and then reload sstable metadata
+ */
+ public void mutateLevelAndReload(int newLevel) throws IOException
+ {
+ synchronized (tidy.global)
+ {
+ descriptor.getMetadataSerializer().mutateLevel(descriptor, newLevel);
+ reloadSSTableMetadata();
+ }
+ }
+
+ /**
+ * Mutate sstable repair metadata with a lock to avoid racing with entire-sstable-streaming and then reload sstable metadata
+ */
+ public void mutateRepairedAndReload(long newRepairedAt, UUID newPendingRepair, boolean isTransient) throws IOException
+ {
+ synchronized (tidy.global)
+ {
+ descriptor.getMetadataSerializer().mutateRepairMetadata(descriptor, newRepairedAt, newPendingRepair, isTransient);
+ reloadSSTableMetadata();
+ }
+ }
+
+ /**
* Reloads the sstable metadata from disk.
*
* Called after level is changed on sstable, for example if the sstable is dropped to L0
@@ -2320,7 +2087,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
*/
static final class GlobalTidy implements Tidy
{
- static WeakReference<ScheduledFuture<?>> NULL = new WeakReference<>(null);
+ static final WeakReference<ScheduledFuture<?>> NULL = new WeakReference<>(null);
// keyed by descriptor, mapping to the shared GlobalTidy for that descriptor
static final ConcurrentMap<Descriptor, Ref<GlobalTidy>> lookup = new ConcurrentHashMap<>();
@@ -2424,14 +2191,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
public static abstract class Factory
{
- public abstract SSTableReader open(final Descriptor descriptor,
- Set<Component> components,
- TableMetadataRef metadata,
- Long maxDataAge,
- StatsMetadata sstableMetadata,
- OpenReason openReason,
- SerializationHeader header);
-
+ public abstract SSTableReader open(SSTableReaderBuilder builder);
}
public static class PartitionPositionBounds
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReaderBuilder.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReaderBuilder.java
new file mode 100644
index 0000000..8fe1def
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReaderBuilder.java
@@ -0,0 +1,475 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable.format;
+
+import org.apache.cassandra.cache.ChunkCache;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.sstable.metadata.ValidationMetadata;
+import org.apache.cassandra.io.util.DiskOptimizationStrategy;
+import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.utils.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public abstract class SSTableReaderBuilder
+{
+ private static final Logger logger = LoggerFactory.getLogger(SSTableReaderBuilder.class);
+
+ protected final SSTableReader.Factory readerFactory;
+ protected final Descriptor descriptor;
+ protected final TableMetadataRef metadataRef;
+ protected final TableMetadata metadata;
+ protected final long maxDataAge;
+ protected final Set<Component> components;
+ protected final StatsMetadata statsMetadata;
+ protected final SSTableReader.OpenReason openReason;
+ protected final SerializationHeader header;
+
+ protected IndexSummary summary;
+ protected DecoratedKey first;
+ protected DecoratedKey last;
+ protected IFilter bf;
+ protected FileHandle ifile;
+ protected FileHandle dfile;
+
+ public SSTableReaderBuilder(Descriptor descriptor,
+ TableMetadataRef metadataRef,
+ long maxDataAge,
+ Set<Component> components,
+ StatsMetadata statsMetadata,
+ SSTableReader.OpenReason openReason,
+ SerializationHeader header)
+ {
+ this.descriptor = descriptor;
+ this.metadataRef = metadataRef;
+ this.metadata = metadataRef.get();
+ this.maxDataAge = maxDataAge;
+ this.components = components;
+ this.statsMetadata = statsMetadata;
+ this.openReason = openReason;
+ this.header = header;
+ this.readerFactory = descriptor.getFormat().getReaderFactory();
+ }
+
+ public abstract SSTableReader build();
+
+ public SSTableReaderBuilder dfile(FileHandle dfile)
+ {
+ this.dfile = dfile;
+ return this;
+ }
+
+ public SSTableReaderBuilder ifile(FileHandle ifile)
+ {
+ this.ifile = ifile;
+ return this;
+ }
+
+ public SSTableReaderBuilder bf(IFilter bf)
+ {
+ this.bf = bf;
+ return this;
+ }
+
+ public SSTableReaderBuilder summary(IndexSummary summary)
+ {
+ this.summary = summary;
+ return this;
+ }
+
+ /**
+ * Load index summary, first key and last key from Summary.db file if it exists.
+ *
+ * if loaded index summary has different index interval from current value stored in schema,
+ * then Summary.db file will be deleted and need to be rebuilt.
+ */
+ void loadSummary()
+ {
+ File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
+ if (!summariesFile.exists())
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("SSTable Summary File {} does not exist", summariesFile.getAbsolutePath());
+ return;
+ }
+
+ DataInputStream iStream = null;
+ try
+ {
+ iStream = new DataInputStream(Files.newInputStream(summariesFile.toPath()));
+ summary = IndexSummary.serializer.deserialize(iStream,
+ metadata.partitioner,
+ metadata.params.minIndexInterval,
+ metadata.params.maxIndexInterval);
+ first = metadata.partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
+ last = metadata.partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
+ }
+ catch (IOException e)
+ {
+ if (summary != null)
+ summary.close();
+ logger.trace("Cannot deserialize SSTable Summary File {}: {}", summariesFile.getPath(), e.getMessage());
+ // corrupted; delete it and fall back to creating a new summary
+ FileUtils.closeQuietly(iStream);
+ // delete it and fall back to creating a new summary
+ FileUtils.deleteWithConfirm(summariesFile);
+ }
+ finally
+ {
+ FileUtils.closeQuietly(iStream);
+ }
+ }
+
+ /**
+ * Build index summary, first key, last key if {@code summaryLoaded} is false and recreate bloom filter if
+ * {@code recreteBloomFilter} is true by reading through Index.db file.
+ *
+ * @param recreateBloomFilter true if recreate bloom filter
+ * @param summaryLoaded true if index summary, first key and last key are already loaded and not need to build again
+ */
+ void buildSummaryAndBloomFilter(boolean recreateBloomFilter,
+ boolean summaryLoaded,
+ Set<Component> components,
+ StatsMetadata statsMetadata) throws IOException
+ {
+ if (!components.contains(Component.PRIMARY_INDEX))
+ return;
+
+ if (logger.isDebugEnabled())
+ logger.debug("Attempting to build summary for {}", descriptor);
+
+
+ // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
+ try (RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))))
+ {
+ long indexSize = primaryIndex.length();
+ long histogramCount = statsMetadata.estimatedPartitionSize.count();
+ long estimatedKeys = histogramCount > 0 && !statsMetadata.estimatedPartitionSize.isOverflowed()
+ ? histogramCount
+ : SSTable.estimateRowsFromIndex(primaryIndex, descriptor); // statistics is supposed to be optional
+
+ if (recreateBloomFilter)
+ bf = FilterFactory.getFilter(estimatedKeys, metadata.params.bloomFilterFpChance);
+
+ try (IndexSummaryBuilder summaryBuilder = summaryLoaded ? null : new IndexSummaryBuilder(estimatedKeys, metadata.params.minIndexInterval, Downsampling.BASE_SAMPLING_LEVEL))
+ {
+ long indexPosition;
+
+ while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
+ {
+ ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
+ RowIndexEntry.Serializer.skip(primaryIndex, descriptor.version);
+ DecoratedKey decoratedKey = metadata.partitioner.decorateKey(key);
+
+ if (!summaryLoaded)
+ {
+ if (first == null)
+ first = decoratedKey;
+ last = decoratedKey;
+ }
+
+ if (recreateBloomFilter)
+ bf.add(decoratedKey);
+
+ // if summary was already read from disk we don't want to re-populate it using primary index
+ if (!summaryLoaded)
+ {
+ summaryBuilder.maybeAddEntry(decoratedKey, indexPosition);
+ }
+ }
+
+ if (!summaryLoaded)
+ summary = summaryBuilder.build(metadata.partitioner);
+ }
+ }
+
+ if (!summaryLoaded)
+ {
+ first = SSTable.getMinimalKey(first);
+ last = SSTable.getMinimalKey(last);
+ }
+ }
+
+ /**
+ * Load bloom filter from Filter.db file.
+ *
+ * @throws IOException
+ */
+ IFilter loadBloomFilter() throws IOException
+ {
+ try (DataInputStream stream = new DataInputStream(new BufferedInputStream(Files.newInputStream(Paths.get(descriptor.filenameFor(Component.FILTER))))))
+ {
+ return BloomFilterSerializer.deserialize(stream, descriptor.version.hasOldBfFormat());
+ }
+ }
+
+ public static class ForWriter extends SSTableReaderBuilder
+ {
+ public ForWriter(Descriptor descriptor,
+ TableMetadataRef metadataRef,
+ long maxDataAge,
+ Set<Component> components,
+ StatsMetadata statsMetadata,
+ SSTableReader.OpenReason openReason,
+ SerializationHeader header)
+ {
+ super(descriptor, metadataRef, maxDataAge, components, statsMetadata, openReason, header);
+ }
+
+ @Override
+ public SSTableReader build()
+ {
+ SSTableReader reader = readerFactory.open(this);
+
+ reader.setup(true);
+ return reader;
+ }
+ }
+
+ public static class ForBatch extends SSTableReaderBuilder
+ {
+ public ForBatch(Descriptor descriptor,
+ TableMetadataRef metadataRef,
+ Set<Component> components,
+ StatsMetadata statsMetadata,
+ SerializationHeader header)
+ {
+ super(descriptor, metadataRef, System.currentTimeMillis(), components, statsMetadata, SSTableReader.OpenReason.NORMAL, header);
+ }
+
+ @Override
+ public SSTableReader build()
+ {
+ String dataFilePath = descriptor.filenameFor(Component.DATA);
+ long fileLength = new File(dataFilePath).length();
+ logger.info("Opening {} ({})", descriptor, FBUtilities.prettyPrintMemory(fileLength));
+
+ initSummary(dataFilePath, components, statsMetadata);
+
+ boolean compression = components.contains(Component.COMPRESSION_INFO);
+ try (FileHandle.Builder ibuilder = new FileHandle.Builder(descriptor.filenameFor(Component.PRIMARY_INDEX))
+ .mmapped(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap)
+ .withChunkCache(ChunkCache.instance);
+ FileHandle.Builder dbuilder = new FileHandle.Builder(descriptor.filenameFor(Component.DATA)).compressed(compression)
+ .mmapped(DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap)
+ .withChunkCache(ChunkCache.instance))
+ {
+ long indexFileLength = new File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length();
+ DiskOptimizationStrategy optimizationStrategy = DatabaseDescriptor.getDiskOptimizationStrategy();
+ int dataBufferSize = optimizationStrategy.bufferSize(statsMetadata.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
+ int indexBufferSize = optimizationStrategy.bufferSize(indexFileLength / summary.size());
+ ifile = ibuilder.bufferSize(indexBufferSize).complete();
+ dfile = dbuilder.bufferSize(dataBufferSize).complete();
+ bf = FilterFactory.AlwaysPresent;
+
+ SSTableReader sstable = readerFactory.open(this);
+
+ sstable.first = first;
+ sstable.last = last;
+
+ sstable.setup(false);
+ return sstable;
+ }
+ }
+
+ void initSummary(String dataFilePath, Set<Component> components, StatsMetadata statsMetadata)
+ {
+ loadSummary();
+ if (summary == null)
+ {
+ try
+ {
+ buildSummaryAndBloomFilter(false, false, components, statsMetadata);
+ }
+ catch (IOException e)
+ {
+ throw new CorruptSSTableException(e, dataFilePath);
+ }
+ }
+ }
+ }
+
+ public static class ForRead extends SSTableReaderBuilder
+ {
+ private final ValidationMetadata validationMetadata;
+ private final boolean isOffline;
+
+ public ForRead(Descriptor descriptor,
+ TableMetadataRef metadataRef,
+ ValidationMetadata validationMetadata,
+ boolean isOffline,
+ Set<Component> components,
+ StatsMetadata statsMetadata,
+ SerializationHeader header)
+ {
+ super(descriptor, metadataRef, System.currentTimeMillis(), components, statsMetadata, SSTableReader.OpenReason.NORMAL, header);
+ this.validationMetadata = validationMetadata;
+ this.isOffline = isOffline;
+ }
+
+ @Override
+ public SSTableReader build()
+ {
+ String dataFilePath = descriptor.filenameFor(Component.DATA);
+ long fileLength = new File(dataFilePath).length();
+ logger.info("Opening {} ({})", descriptor, FBUtilities.prettyPrintMemory(fileLength));
+
+ try
+ {
+ // load index and filter
+ long start = System.nanoTime();
+ load(validationMetadata, isOffline, components, DatabaseDescriptor.getDiskOptimizationStrategy(), statsMetadata);
+ logger.trace("INDEX LOAD TIME for {}: {} ms.", descriptor, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+ }
+ catch (IOException t)
+ {
+ throw new CorruptSSTableException(t, dataFilePath);
+ }
+
+ SSTableReader sstable = readerFactory.open(this);
+
+ sstable.first = first;
+ sstable.last = last;
+
+ sstable.setup(!isOffline); // Don't track hotness if we're offline.
+ return sstable;
+ }
+
+ /**
+ * @param validation Metadata for SSTable being loaded
+ * @param isOffline Whether the SSTable is being loaded by an offline tool (sstabledump, scrub, etc)
+ */
+ private void load(ValidationMetadata validation,
+ boolean isOffline,
+ Set<Component> components,
+ DiskOptimizationStrategy optimizationStrategy,
+ StatsMetadata statsMetadata) throws IOException
+ {
+ if (metadata.params.bloomFilterFpChance == 1.0)
+ {
+ // bf is disabled.
+ load(false, !isOffline, optimizationStrategy, statsMetadata, components);
+ bf = FilterFactory.AlwaysPresent;
+ }
+ else if (!components.contains(Component.PRIMARY_INDEX)) // What happens if filter component and primary index is missing?
+ {
+ // avoid any reading of the missing primary index component.
+ // this should only happen during StandaloneScrubber
+ load(false, !isOffline, optimizationStrategy, statsMetadata, components);
+ }
+ else if (!components.contains(Component.FILTER) || validation == null)
+ {
+ // bf is enabled, but filter component is missing.
+ load(!isOffline, !isOffline, optimizationStrategy, statsMetadata, components);
+ if (isOffline)
+ bf = FilterFactory.AlwaysPresent;
+ }
+ else
+ {
+ // bf is enabled and fp chance matches the currently configured value.
+ load(false, !isOffline, optimizationStrategy, statsMetadata, components);
+ bf = loadBloomFilter();
+ }
+ }
+
+ /**
+ * Loads ifile, dfile and indexSummary, and optionally recreates and persists the bloom filter.
+ * @param recreateBloomFilter Recreate the bloomfilter.
+ * @param saveSummaryIfCreated for bulk loading purposes, if the summary was absent and needed to be built, you can
+ * avoid persisting it to disk by setting this to false
+ */
+ void load(boolean recreateBloomFilter,
+ boolean saveSummaryIfCreated,
+ DiskOptimizationStrategy optimizationStrategy,
+ StatsMetadata statsMetadata,
+ Set<Component> components) throws IOException
+ {
+ try(FileHandle.Builder ibuilder = new FileHandle.Builder(descriptor.filenameFor(Component.PRIMARY_INDEX))
+ .mmapped(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap)
+ .withChunkCache(ChunkCache.instance);
+ FileHandle.Builder dbuilder = new FileHandle.Builder(descriptor.filenameFor(Component.DATA)).compressed(components.contains(Component.COMPRESSION_INFO))
+ .mmapped(DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap)
+ .withChunkCache(ChunkCache.instance))
+ {
+ loadSummary();
+ boolean buildSummary = summary == null || recreateBloomFilter;
+ if (buildSummary)
+ buildSummaryAndBloomFilter(recreateBloomFilter, summary != null, components, statsMetadata);
+
+ int dataBufferSize = optimizationStrategy.bufferSize(statsMetadata.estimatedPartitionSize.percentile(DatabaseDescriptor.getDiskOptimizationEstimatePercentile()));
+
+ if (components.contains(Component.PRIMARY_INDEX))
+ {
+ long indexFileLength = new File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length();
+ int indexBufferSize = optimizationStrategy.bufferSize(indexFileLength / summary.size());
+ ifile = ibuilder.bufferSize(indexBufferSize).complete();
+ }
+
+ dfile = dbuilder.bufferSize(dataBufferSize).complete();
+
+ if (buildSummary)
+ {
+ if (saveSummaryIfCreated)
+ SSTableReader.saveSummary(descriptor, first, last, summary);
+ if (recreateBloomFilter)
+ SSTableReader.saveBloomFilter(descriptor, bf);
+ }
+ }
+ catch (Throwable t)
+ { // Because the tidier has not been set-up yet in SSTableReader.open(), we must release the files in case of error
+ if (ifile != null)
+ {
+ ifile.close();
+ }
+
+ if (dfile != null)
+ {
+ dfile.close();
+ }
+
+ if (summary != null)
+ {
+ summary.close();
+ }
+
+ throw t;
+ }
+ }
+ }
+}
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index 448808c..6f3ba39 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -18,22 +18,17 @@
package org.apache.cassandra.io.sstable.format.big;
import java.util.Collection;
-import java.util.Set;
import java.util.UUID;
-import com.google.common.base.Preconditions;
-
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
-import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.*;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.net.MessagingService;
/**
@@ -103,9 +98,9 @@ public class BigFormat implements SSTableFormat
static class ReaderFactory extends SSTableReader.Factory
{
@Override
- public SSTableReader open(Descriptor descriptor, Set<Component> components, TableMetadataRef metadata, Long maxDataAge, StatsMetadata sstableMetadata, SSTableReader.OpenReason openReason, SerializationHeader header)
+ public SSTableReader open(SSTableReaderBuilder builder)
{
- return new BigTableReader(descriptor, components, metadata, maxDataAge, sstableMetadata, openReason, header);
+ return new BigTableReader(builder);
}
}
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index 03d7562..a333df0 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
+import org.apache.cassandra.io.sstable.format.SSTableReaderBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,9 +40,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
import org.apache.cassandra.io.sstable.format.SSTableReadsListener.SelectionReason;
import org.apache.cassandra.io.sstable.format.SSTableReadsListener.SkippingReason;
-import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -53,9 +52,9 @@ public class BigTableReader extends SSTableReader
{
private static final Logger logger = LoggerFactory.getLogger(BigTableReader.class);
- BigTableReader(Descriptor desc, Set<Component> components, TableMetadataRef metadata, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason, SerializationHeader header)
+ BigTableReader(SSTableReaderBuilder builder)
{
- super(desc, components, metadata, maxDataAge, sstableMetadata, openReason, header);
+ super(builder);
}
public UnfilteredRowIterator iterator(DecoratedKey key,
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 04ab08a..eb98662 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -333,8 +333,13 @@ public class BigTableWriter extends SSTableWriter
invalidateCacheAtBoundary(dfile);
SSTableReader sstable = SSTableReader.internalOpen(descriptor,
components, metadata,
- ifile, dfile, indexSummary,
- iwriter.bf.sharedCopy(), maxDataAge, stats, SSTableReader.OpenReason.EARLY, header);
+ ifile, dfile,
+ indexSummary,
+ iwriter.bf.sharedCopy(),
+ maxDataAge,
+ stats,
+ SSTableReader.OpenReason.EARLY,
+ header);
// now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed)
sstable.first = getMinimalKey(first);
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
index db9f161..fc1ce42 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
@@ -21,9 +21,10 @@ import java.io.IOException;
import java.util.EnumSet;
import java.util.Map;
import java.util.UUID;
-import java.util.function.Function;
+import java.util.function.UnaryOperator;
import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -65,15 +66,22 @@ public interface IMetadataSerializer
/**
* Mutate SSTable Metadata
*
+ * NOTE: mutating stats metadata of a live sstable will race with entire-sstable-streaming, please use
+ * {@link SSTableReader#mutateLevelAndReload} instead on live sstable.
+ *
* @param descriptor SSTable descriptor
+ * @param description on changed attributions
* @param transform function to mutate sstable metadata
* @throws IOException
*/
- public void mutate(Descriptor descriptor, Function<StatsMetadata, StatsMetadata> transform) throws IOException;
+ public void mutate(Descriptor descriptor, String description, UnaryOperator<StatsMetadata> transform) throws IOException;
/**
* Mutate SSTable level
*
+ * NOTE: mutating stats metadata of a live sstable will race with entire-sstable-streaming, please use
+ * {@link SSTableReader#mutateLevelAndReload} instead on live sstable.
+ *
* @param descriptor SSTable descriptor
* @param newLevel new SSTable level
* @throws IOException
@@ -81,7 +89,10 @@ public interface IMetadataSerializer
void mutateLevel(Descriptor descriptor, int newLevel) throws IOException;
/**
- * Mutate the repairedAt time, pendingRepair ID, and transient status
+ * Mutate the repairedAt time, pendingRepair ID, and transient status.
+ *
+ * NOTE: mutating stats metadata of a live sstable will race with entire-sstable-streaming, please use
+ * {@link SSTableReader#mutateLevelAndReload} instead on live sstable.
*/
public void mutateRepairMetadata(Descriptor descriptor, long newRepairedAt, UUID newPendingRepair, boolean isTransient) throws IOException;
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
index d886338..042103e 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.io.sstable.metadata;
import java.io.*;
import java.util.*;
-import java.util.function.Function;
+import java.util.function.UnaryOperator;
import java.util.zip.CRC32;
import com.google.common.base.Throwables;
@@ -114,7 +114,7 @@ public class MetadataSerializer implements IMetadataSerializer
out.writeInt((int) crc.getValue());
}
- public Map<MetadataType, MetadataComponent> deserialize( Descriptor descriptor, EnumSet<MetadataType> types) throws IOException
+ public Map<MetadataType, MetadataComponent> deserialize(Descriptor descriptor, EnumSet<MetadataType> types) throws IOException
{
Map<MetadataType, MetadataComponent> components;
logger.trace("Load metadata for {}", descriptor);
@@ -224,15 +224,15 @@ public class MetadataSerializer implements IMetadataSerializer
}
@Override
- public void mutate(Descriptor descriptor, Function<StatsMetadata, StatsMetadata> transform) throws IOException
+ public void mutate(Descriptor descriptor, String description, UnaryOperator<StatsMetadata> transform) throws IOException
{
- Map<MetadataType, MetadataComponent> currentComponents = deserialize(descriptor, EnumSet.allOf(MetadataType.class));
- StatsMetadata stats = (StatsMetadata) currentComponents.remove(MetadataType.STATS);
+ if (logger.isTraceEnabled() )
+ logger.trace("Mutating {} to {}", descriptor.filenameFor(Component.STATS), description);
- currentComponents.put(MetadataType.STATS, transform.apply(stats));
- rewriteSSTableMetadata(descriptor, currentComponents);
+ mutate(descriptor, transform);
}
+ @Override
public void mutateLevel(Descriptor descriptor, int newLevel) throws IOException
{
if (logger.isTraceEnabled())
@@ -241,6 +241,7 @@ public class MetadataSerializer implements IMetadataSerializer
mutate(descriptor, stats -> stats.mutateLevel(newLevel));
}
+ @Override
public void mutateRepairMetadata(Descriptor descriptor, long newRepairedAt, UUID newPendingRepair, boolean isTransient) throws IOException
{
if (logger.isTraceEnabled())
@@ -250,6 +251,15 @@ public class MetadataSerializer implements IMetadataSerializer
mutate(descriptor, stats -> stats.mutateRepairedMetadata(newRepairedAt, newPendingRepair, isTransient));
}
+ private void mutate(Descriptor descriptor, UnaryOperator<StatsMetadata> transform) throws IOException
+ {
+ Map<MetadataType, MetadataComponent> currentComponents = deserialize(descriptor, EnumSet.allOf(MetadataType.class));
+ StatsMetadata stats = (StatsMetadata) currentComponents.remove(MetadataType.STATS);
+
+ currentComponents.put(MetadataType.STATS, transform.apply(stats));
+ rewriteSSTableMetadata(descriptor, currentComponents);
+ }
+
public void rewriteSSTableMetadata(Descriptor descriptor, Map<MetadataType, MetadataComponent> currentComponents) throws IOException
{
String filePath = descriptor.tmpFilenameFor(Component.STATS);
diff --git a/src/java/org/apache/cassandra/net/BufferPoolAllocator.java b/src/java/org/apache/cassandra/net/BufferPoolAllocator.java
index 8782c03..224f690 100644
--- a/src/java/org/apache/cassandra/net/BufferPoolAllocator.java
+++ b/src/java/org/apache/cassandra/net/BufferPoolAllocator.java
@@ -30,7 +30,7 @@ import org.apache.cassandra.utils.memory.BufferPool;
* A trivial wrapper around BufferPool for integrating with Netty, but retaining ownership of pooling behaviour
* that is integrated into Cassandra's other pooling.
*/
-abstract class BufferPoolAllocator extends AbstractByteBufAllocator
+public abstract class BufferPoolAllocator extends AbstractByteBufAllocator
{
BufferPoolAllocator()
{
diff --git a/src/java/org/apache/cassandra/streaming/OutgoingStream.java b/src/java/org/apache/cassandra/streaming/OutgoingStream.java
index 4a58cae..546462d 100644
--- a/src/java/org/apache/cassandra/streaming/OutgoingStream.java
+++ b/src/java/org/apache/cassandra/streaming/OutgoingStream.java
@@ -47,7 +47,12 @@ public interface OutgoingStream
UUID getPendingRepair();
String getName();
- long getSize();
+
+ /**
+ * @return estimated file size to be streamed. This should only be used for metrics, because concurrent
+ * stats metadata update and index redistribution will change file sizes.
+ */
+ long getEstimatedSize();
TableId getTableId();
int getNumFiles();
}
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 0281952..ff3ff5a 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -763,7 +763,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
*/
public void streamSent(OutgoingStreamMessage message)
{
- long headerSize = message.stream.getSize();
+ long headerSize = message.stream.getEstimatedSize();
StreamingMetrics.totalOutgoingBytes.inc(headerSize);
metrics.outgoingBytes.inc(headerSize);
// schedule timeout for receiving ACK
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index 0f7a834..d12ecfd 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -70,7 +70,7 @@ public class StreamTransferTask extends StreamTask
OutgoingStreamMessage message = new OutgoingStreamMessage(tableId, session, stream, sequenceNumber.getAndIncrement());
message = StreamHook.instance.reportOutgoingStream(session, stream, message);
streams.put(message.header.sequenceNumber, message);
- totalSize += message.stream.getSize();
+ totalSize += message.stream.getEstimatedSize();
totalFiles += message.stream.getNumFiles();
}
diff --git a/test/distributed/org/apache/cassandra/io/sstable/format/ForwardingSSTableReader.java b/test/distributed/org/apache/cassandra/io/sstable/format/ForwardingSSTableReader.java
index 7e21253..9a76661 100644
--- a/test/distributed/org/apache/cassandra/io/sstable/format/ForwardingSSTableReader.java
+++ b/test/distributed/org/apache/cassandra/io/sstable/format/ForwardingSSTableReader.java
@@ -19,8 +19,6 @@
package org.apache.cassandra.io.sstable.format;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Iterator;
@@ -62,31 +60,13 @@ import org.apache.cassandra.utils.concurrent.Ref;
public abstract class ForwardingSSTableReader extends SSTableReader
{
- // This method is only accessiable via extension and not for calling directly;
- // to work around this, rely on reflection if the method gets called
- private static final Method ESTIMATE_ROWS_FROM_INDEX;
-
- static
- {
- try
- {
- Method m = SSTable.class.getDeclaredMethod("estimateRowsFromIndex", RandomAccessReader.class);
- m.setAccessible(true);
- ESTIMATE_ROWS_FROM_INDEX = m;
- }
- catch (NoSuchMethodException e)
- {
- throw new AssertionError(e);
- }
- }
-
private final SSTableReader delegate;
public ForwardingSSTableReader(SSTableReader delegate)
{
super(delegate.descriptor, SSTable.componentsFor(delegate.descriptor),
TableMetadataRef.forOfflineTools(delegate.metadata()), delegate.maxDataAge, delegate.getSSTableMetadata(),
- delegate.openReason, delegate.header);
+ delegate.openReason, delegate.header, delegate.indexSummary, delegate.dfile, delegate.ifile, delegate.bf);
this.delegate = delegate;
this.first = delegate.first;
this.last = delegate.last;
@@ -117,24 +97,6 @@ public abstract class ForwardingSSTableReader extends SSTableReader
}
@Override
- public boolean loadSummary()
- {
- return delegate.loadSummary();
- }
-
- @Override
- public void saveSummary()
- {
- delegate.saveSummary();
- }
-
- @Override
- public void saveBloomFilter()
- {
- delegate.saveBloomFilter();
- }
-
- @Override
public void setReplaced()
{
delegate.setReplaced();
@@ -225,12 +187,6 @@ public abstract class ForwardingSSTableReader extends SSTableReader
}
@Override
- public void forceFilterFailures()
- {
- delegate.forceFilterFailures();
- }
-
- @Override
public IFilter getBloomFilter()
{
return delegate.getBloomFilter();
@@ -777,30 +733,6 @@ public abstract class ForwardingSSTableReader extends SSTableReader
}
@Override
- protected long estimateRowsFromIndex(RandomAccessReader ifile) throws IOException
- {
- try
- {
- return (Long) ESTIMATE_ROWS_FROM_INDEX.invoke(delegate, ifile);
- }
- catch (IllegalAccessException e)
- {
- throw new AssertionError(e);
- }
- catch (InvocationTargetException e)
- {
- Throwable cause = e.getCause();
- if (cause instanceof IOException)
- throw (IOException) cause;
- if (cause instanceof Error)
- throw (Error) cause;
- if (cause instanceof RuntimeException)
- throw (RuntimeException) cause;
- throw new RuntimeException(cause);
- }
- }
-
- @Override
public long bytesOnDisk()
{
return delegate.bytesOnDisk();
diff --git a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
index 8ecf6cb..9e84e0a 100644
--- a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
+++ b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
@@ -42,10 +42,10 @@ import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.streaming.CassandraEntireSSTableStreamReader;
import org.apache.cassandra.db.streaming.CassandraEntireSSTableStreamWriter;
-import org.apache.cassandra.db.streaming.CassandraOutgoingFile;
import org.apache.cassandra.db.streaming.CassandraStreamHeader;
import org.apache.cassandra.db.streaming.CassandraStreamReader;
import org.apache.cassandra.db.streaming.CassandraStreamWriter;
+import org.apache.cassandra.db.streaming.ComponentContext;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
@@ -103,6 +103,7 @@ public class ZeroCopyStreamingBenchmark
private static ColumnFamilyStore store;
private StreamSession session;
private CassandraEntireSSTableStreamWriter blockStreamWriter;
+ private ComponentContext context;
private ByteBuf serializedBlockStream;
private InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
private CassandraEntireSSTableStreamReader blockStreamReader;
@@ -119,7 +120,8 @@ public class ZeroCopyStreamingBenchmark
sstable = store.getLiveSSTables().iterator().next();
session = setupStreamingSessionForTest();
- blockStreamWriter = new CassandraEntireSSTableStreamWriter(sstable, session, CassandraOutgoingFile.getComponentManifest(sstable));
+ context = ComponentContext.create(sstable.descriptor);
+ blockStreamWriter = new CassandraEntireSSTableStreamWriter(sstable, session, context);
CapturingNettyChannel blockStreamCaptureChannel = new CapturingNettyChannel(STREAM_SIZE);
AsyncStreamingOutputPlus out = new AsyncStreamingOutputPlus(blockStreamCaptureChannel);
@@ -137,7 +139,7 @@ public class ZeroCopyStreamingBenchmark
.withEstimatedKeys(sstable.estimatedKeys())
.withSections(Collections.emptyList())
.withSerializationHeader(sstable.header.toComponent())
- .withComponentManifest(CassandraOutgoingFile.getComponentManifest(sstable))
+ .withComponentManifest(context.manifest())
.isEntireSSTable(true)
.withFirstKey(sstable.first)
.withTableId(sstable.metadata().id)
@@ -149,23 +151,23 @@ public class ZeroCopyStreamingBenchmark
null), entireSSTableStreamHeader, session);
List<Range<Token>> requestedRanges = Arrays.asList(new Range<>(sstable.first.minValue().getToken(), sstable.last.getToken()));
- partialStreamWriter = new CassandraStreamWriter(sstable, sstable.getPositionsForRanges(requestedRanges), session);
+ CassandraStreamHeader partialSSTableStreamHeader =
+ CassandraStreamHeader.builder()
+ .withSSTableFormat(sstable.descriptor.formatType)
+ .withSSTableVersion(sstable.descriptor.version)
+ .withSSTableLevel(0)
+ .withEstimatedKeys(sstable.estimatedKeys())
+ .withSections(sstable.getPositionsForRanges(requestedRanges))
+ .withSerializationHeader(sstable.header.toComponent())
+ .withTableId(sstable.metadata().id)
+ .build();
+
+ partialStreamWriter = new CassandraStreamWriter(sstable, partialSSTableStreamHeader, session);
CapturingNettyChannel partialStreamChannel = new CapturingNettyChannel(STREAM_SIZE);
partialStreamWriter.write(new AsyncStreamingOutputPlus(partialStreamChannel));
serializedPartialStream = partialStreamChannel.getSerializedStream();
- CassandraStreamHeader partialSSTableStreamHeader =
- CassandraStreamHeader.builder()
- .withSSTableFormat(sstable.descriptor.formatType)
- .withSSTableVersion(sstable.descriptor.version)
- .withSSTableLevel(0)
- .withEstimatedKeys(sstable.estimatedKeys())
- .withSections(sstable.getPositionsForRanges(requestedRanges))
- .withSerializationHeader(sstable.header.toComponent())
- .withTableId(sstable.metadata().id)
- .build();
-
partialStreamReader = new CassandraStreamReader(new StreamMessageHeader(sstable.metadata().id,
peer, session.planId(), false,
0, 0, 0,
@@ -207,6 +209,7 @@ public class ZeroCopyStreamingBenchmark
@TearDown
public void tearDown() throws IOException
{
+ context.close();
SchemaLoader.cleanupAndLeaveDirs();
CommitLog.instance.stopUnsafe(true);
}
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 7880276..4d6e2e8 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -45,6 +45,7 @@ import org.slf4j.LoggerFactory;
import afu.org.checkerframework.checker.oigj.qual.O;
import org.apache.cassandra.db.compaction.ActiveCompactionsTracker;
import org.apache.cassandra.db.compaction.CompactionTasks;
+import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.ReplicaCollection;
@@ -79,6 +80,7 @@ import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.CounterId;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.FilterFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -773,4 +775,25 @@ public class Util
}
assertEquals(expectedSSTableCount, fileCount);
}
+
+ /**
+ * Disable bloom filter on all sstables of given table
+ */
+ public static void disableBloomFilter(ColumnFamilyStore cfs)
+ {
+ Collection<SSTableReader> sstables = cfs.getLiveSSTables();
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN))
+ {
+ for (SSTableReader sstable : sstables)
+ {
+ sstable = sstable.cloneAndReplace(FilterFactory.AlwaysPresent);
+ txn.update(sstable, true);
+ txn.checkpoint();
+ }
+ txn.finish();
+ }
+
+ for (SSTableReader reader : cfs.getLiveSSTables())
+ assertEquals(FilterFactory.AlwaysPresent, reader.getBloomFilter());
+ }
}
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 7913679..54c2b35 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -129,7 +129,7 @@ public class ColumnFamilyStoreTest
List<SSTableReader> ssTables = keyspace.getAllSSTables(SSTableSet.LIVE);
assertEquals(1, ssTables.size());
- ssTables.get(0).forceFilterFailures();
+ Util.disableBloomFilter(cfs);
Util.assertEmpty(Util.cmd(cfs, "key2").build());
}
diff --git a/test/unit/org/apache/cassandra/db/KeyspaceTest.java b/test/unit/org/apache/cassandra/db/KeyspaceTest.java
index 3e088fb..e7d7814 100644
--- a/test/unit/org/apache/cassandra/db/KeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyspaceTest.java
@@ -142,7 +142,7 @@ public class KeyspaceTest extends CQLTester
Collection<SSTableReader> sstables = cfs.getLiveSSTables();
assertEquals(1, sstables.size());
- sstables.iterator().next().forceFilterFailures();
+ Util.disableBloomFilter(cfs);
for (String key : new String[]{"0", "2"})
Util.assertEmpty(Util.cmd(cfs, key).build());
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
index 2703e44..a41365b 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/LogTransactionTest.java
@@ -1241,8 +1241,8 @@ public class LogTransactionTest extends AbstractTransactionalTest
SSTableReader reader = SSTableReader.internalOpen(descriptor,
components,
cfs.metadata,
- dFile,
iFile,
+ dFile,
MockSchema.indexSummary.sharedCopy(),
new AlwaysPresentFilter(),
1L,
diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
index 00a48d1..58d26c1 100644
--- a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
@@ -25,6 +25,7 @@ import java.util.Collections;
import java.util.Queue;
import java.util.UUID;
+import org.apache.cassandra.io.sstable.Descriptor;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -63,7 +64,6 @@ import org.apache.cassandra.utils.FBUtilities;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
public class CassandraEntireSSTableStreamWriterTest
{
@@ -73,6 +73,7 @@ public class CassandraEntireSSTableStreamWriterTest
public static final String CF_STANDARDLOWINDEXINTERVAL = "StandardLowIndexInterval";
private static SSTableReader sstable;
+ private static Descriptor descriptor;
private static ColumnFamilyStore store;
@BeforeClass
@@ -105,6 +106,7 @@ public class CassandraEntireSSTableStreamWriterTest
CompactionManager.instance.performMaximal(store, false);
sstable = store.getLiveSSTables().iterator().next();
+ descriptor = sstable.descriptor;
}
@Test
@@ -112,15 +114,18 @@ public class CassandraEntireSSTableStreamWriterTest
{
StreamSession session = setupStreamingSessionForTest();
- CassandraEntireSSTableStreamWriter writer = new CassandraEntireSSTableStreamWriter(sstable, session, CassandraOutgoingFile.getComponentManifest(sstable));
-
EmbeddedChannel channel = new EmbeddedChannel();
- AsyncStreamingOutputPlus out = new AsyncStreamingOutputPlus(channel);
- writer.write(out);
+ try (AsyncStreamingOutputPlus out = new AsyncStreamingOutputPlus(channel);
+ ComponentContext context = ComponentContext.create(descriptor))
+ {
+ CassandraEntireSSTableStreamWriter writer = new CassandraEntireSSTableStreamWriter(sstable, session, context);
- Queue msgs = channel.outboundMessages();
+ writer.write(out);
- assertTrue(msgs.peek() instanceof DefaultFileRegion);
+ Queue msgs = channel.outboundMessages();
+
+ assertTrue(msgs.peek() instanceof DefaultFileRegion);
+ }
}
@Test
@@ -129,18 +134,19 @@ public class CassandraEntireSSTableStreamWriterTest
StreamSession session = setupStreamingSessionForTest();
InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
- CassandraEntireSSTableStreamWriter writer = new CassandraEntireSSTableStreamWriter(sstable, session, CassandraOutgoingFile.getComponentManifest(sstable));
// This is needed as Netty releases the ByteBuffers as soon as the channel is flushed
ByteBuf serializedFile = Unpooled.buffer(8192);
EmbeddedChannel channel = createMockNettyChannel(serializedFile);
- AsyncStreamingOutputPlus out = new AsyncStreamingOutputPlus(channel);
-
- writer.write(out);
+ try (AsyncStreamingOutputPlus out = new AsyncStreamingOutputPlus(channel);
+ ComponentContext context = ComponentContext.create(descriptor))
+ {
+ CassandraEntireSSTableStreamWriter writer = new CassandraEntireSSTableStreamWriter(sstable, session, context);
+ writer.write(out);
- session.prepareReceiving(new StreamSummary(sstable.metadata().id, 1, 5104));
+ session.prepareReceiving(new StreamSummary(sstable.metadata().id, 1, 5104));
- CassandraStreamHeader header =
+ CassandraStreamHeader header =
CassandraStreamHeader.builder()
.withSSTableFormat(sstable.descriptor.formatType)
.withSSTableVersion(sstable.descriptor.version)
@@ -148,18 +154,19 @@ public class CassandraEntireSSTableStreamWriterTest
.withEstimatedKeys(sstable.estimatedKeys())
.withSections(Collections.emptyList())
.withSerializationHeader(sstable.header.toComponent())
- .withComponentManifest(CassandraOutgoingFile.getComponentManifest(sstable))
+ .withComponentManifest(context.manifest())
.isEntireSSTable(true)
.withFirstKey(sstable.first)
.withTableId(sstable.metadata().id)
.build();
- CassandraEntireSSTableStreamReader reader = new CassandraEntireSSTableStreamReader(new StreamMessageHeader(sstable.metadata().id, peer, session.planId(), false, 0, 0, 0, null), header, session);
+ CassandraEntireSSTableStreamReader reader = new CassandraEntireSSTableStreamReader(new StreamMessageHeader(sstable.metadata().id, peer, session.planId(), false, 0, 0, 0, null), header, session);
- SSTableMultiWriter sstableWriter = reader.read(new DataInputBuffer(serializedFile.nioBuffer(), false));
- Collection<SSTableReader> newSstables = sstableWriter.finished();
+ SSTableMultiWriter sstableWriter = reader.read(new DataInputBuffer(serializedFile.nioBuffer(), false));
+ Collection<SSTableReader> newSstables = sstableWriter.finished();
- assertEquals(1, newSstables.size());
+ assertEquals(1, newSstables.size());
+ }
}
private EmbeddedChannel createMockNettyChannel(ByteBuf serializedFile) throws Exception
diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java
index e48abf6..999a44e 100644
--- a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java
@@ -20,23 +20,144 @@ package org.apache.cassandra.db.streaming;
import java.io.IOException;
import java.util.Collections;
import java.util.LinkedHashMap;
+import java.util.List;
+import org.junit.BeforeClass;
import org.junit.Test;
+import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.streaming.CassandraStreamHeader.CassandraStreamHeaderSerializer;
import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.big.BigFormat;
import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.serializers.SerializationUtils;
import org.apache.cassandra.utils.ByteBufferUtil;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
public class CassandraStreamHeaderTest
{
+ public static final String KEYSPACE = "CassandraStreamHeaderTest";
+ public static final String CF_COMPRESSED = "compressed";
+
+ private static SSTableReader sstable;
+ private static ColumnFamilyStore store;
+
+ @BeforeClass
+ public static void defineSchemaAndPrepareSSTable()
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE, CF_COMPRESSED).compression(CompressionParams.DEFAULT));
+
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ store = keyspace.getColumnFamilyStore(CF_COMPRESSED);
+
+ // insert data and compact to a single sstable
+ CompactionManager.instance.disableAutoCompaction();
+ for (int j = 0; j < 10; j++)
+ {
+ new RowUpdateBuilder(store.metadata(), j, String.valueOf(j))
+ .clustering("0")
+ .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
+ .build()
+ .applyUnsafe();
+ }
+ store.forceBlockingFlush();
+ CompactionManager.instance.performMaximal(store, false);
+
+ sstable = store.getLiveSSTables().iterator().next();
+ }
+
+ @Test
+ public void transferedSizeWithCompressionTest()
+ {
+ // compression info is lazily initialized to reduce GC, compute size based on compressionMetadata
+ CassandraStreamHeader header = header(false, true);
+ long transferedSize = header.size();
+ assertEquals(transferedSize, header.calculateSize());
+
+ // computing file chunks before sending over network, and verify size is the same
+ header.compressionInfo.chunks();
+ assertEquals(transferedSize, header.calculateSize());
+
+ SerializationUtils.assertSerializationCycle(header, CassandraStreamHeader.serializer);
+ }
+
+ @Test
+ public void transferedSizeWithZeroCopyStreamingTest()
+ {
+ // verify all component on-disk length is used for ZCS
+ CassandraStreamHeader header = header(true, true);
+ long transferedSize = header.size();
+ assertEquals(ComponentManifest.create(sstable.descriptor).totalSize(), transferedSize);
+ assertEquals(transferedSize, header.calculateSize());
+
+ // verify that computing file chunks doesn't change transferred size for ZCS
+ header.compressionInfo.chunks();
+ assertEquals(transferedSize, header.calculateSize());
+
+ SerializationUtils.assertSerializationCycle(header, CassandraStreamHeader.serializer);
+ }
+
+ @Test
+ public void transferedSizeWithoutCompressionTest()
+ {
+ // verify section size is used as transferred size
+ CassandraStreamHeader header = header(false, false);
+ long transferedSize = header.size();
+ assertNull(header.compressionInfo);
+ assertEquals(sstable.uncompressedLength(), transferedSize);
+ assertEquals(transferedSize, header.calculateSize());
+
+ SerializationUtils.assertSerializationCycle(header, CassandraStreamHeader.serializer);
+ }
+
+ private CassandraStreamHeader header(boolean entireSSTable, boolean compressed)
+ {
+ List<Range<Token>> requestedRanges = Collections.singletonList(new Range<>(store.getPartitioner().getMinimumToken(), sstable.last.getToken()));
+ requestedRanges = Range.normalize(requestedRanges);
+
+ List<SSTableReader.PartitionPositionBounds> sections = sstable.getPositionsForRanges(requestedRanges);
+ CompressionInfo compressionInfo = compressed ? CompressionInfo.newLazyInstance(sstable.getCompressionMetadata(), sections)
+ : null;
+
+ TableMetadata metadata = store.metadata();
+ SerializationHeader.Component serializationHeader = SerializationHeader.makeWithoutStats(metadata).toComponent();
+ ComponentManifest componentManifest = entireSSTable ? ComponentManifest.create(sstable.descriptor) : null;
+ DecoratedKey firstKey = entireSSTable ? sstable.first : null;
+ return CassandraStreamHeader.builder()
+ .withSSTableFormat(SSTableFormat.Type.BIG)
+ .withSSTableVersion(BigFormat.latestVersion)
+ .withSSTableLevel(0)
+ .withEstimatedKeys(10)
+ .withCompressionInfo(compressionInfo)
+ .withSections(sections)
+ .isEntireSSTable(entireSSTable)
+ .withComponentManifest(componentManifest)
+ .withFirstKey(firstKey)
+ .withSerializationHeader(serializationHeader)
+ .withTableId(metadata.id)
+ .build();
+ }
+
@Test
public void serializerTest()
{
diff --git a/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java b/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java
new file mode 100644
index 0000000..3cc8943
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.IndexSummaryManager;
+import org.apache.cassandra.io.sstable.IndexSummaryRedistribution;
+import org.apache.cassandra.io.sstable.SSTableUtils;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.net.AsyncStreamingOutputPlus;
+import org.apache.cassandra.net.BufferPoolAllocator;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.SharedDefaultFileRegion;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.MigrationManager;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.DefaultConnectionFactory;
+import org.apache.cassandra.streaming.OutgoingStream;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.SessionInfo;
+import org.apache.cassandra.streaming.StreamCoordinator;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.StreamResultFuture;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.StreamSummary;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Throwables;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+
+import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(BMUnitRunner.class)
+public class EntireSSTableStreamConcurrentComponentMutationTest
+{
+ public static final String KEYSPACE = "CassandraEntireSSTableStreamLockTest";
+ public static final String CF_STANDARD = "Standard1";
+
+ private static final Callable<?> NO_OP = () -> null;
+
+ private static SSTableReader sstable;
+ private static Descriptor descriptor;
+ private static ColumnFamilyStore store;
+ private static RangesAtEndpoint rangesAtEndpoint;
+
+ private static ExecutorService service;
+
+ private static CountDownLatch latch = new CountDownLatch(1);
+
+ @BeforeClass
+ public static void defineSchemaAndPrepareSSTable()
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE, CF_STANDARD));
+
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ store = keyspace.getColumnFamilyStore("Standard1");
+
+ // insert data and compact to a single sstable
+ CompactionManager.instance.disableAutoCompaction();
+ for (int j = 0; j < 10; j++)
+ {
+ new RowUpdateBuilder(store.metadata(), j, String.valueOf(j))
+ .clustering("0")
+ .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
+ .build()
+ .applyUnsafe();
+ }
+ store.forceBlockingFlush();
+ CompactionManager.instance.performMaximal(store, false);
+
+ Token start = ByteOrderedPartitioner.instance.getTokenFactory().fromString(Long.toHexString(0));
+ Token end = ByteOrderedPartitioner.instance.getTokenFactory().fromString(Long.toHexString(100));
+ rangesAtEndpoint = RangesAtEndpoint.toDummyList(Collections.singleton(new Range<>(start, end)));
+
+ service = Executors.newFixedThreadPool(2);
+ }
+
+ @AfterClass
+ public static void cleanup()
+ {
+ service.shutdown();
+ }
+
+ @Before
+ public void init()
+ {
+ sstable = store.getLiveSSTables().iterator().next();
+ descriptor = sstable.descriptor;
+ }
+
+ @After
+ public void reset() throws IOException
+ {
+ latch = new CountDownLatch(1);
+ // reset repair info to avoid test interfering each other
+ descriptor.getMetadataSerializer().mutateRepairMetadata(descriptor, 0, ActiveRepairService.NO_PENDING_REPAIR, false);
+ }
+
+ @Test
+ public void testStream() throws Exception
+ {
+ testStreamWithConcurrentComponentMutation(NO_OP, NO_OP);
+ }
+
+ /**
+ * Entire-sstable-streaming receiver will throw checksum validation failure because concurrent stats metadata
+ * update causes the actual transfered file size to be different from the one in {@link ComponentManifest}
+ */
+ @Test
+ public void testStreamWithStatsMutation() throws Exception
+ {
+ testStreamWithConcurrentComponentMutation(() -> {
+
+ Descriptor desc = sstable.descriptor;
+ desc.getMetadataSerializer().mutate(desc, "testing", stats -> stats.mutateRepairedMetadata(0, UUID.randomUUID(), false));
+
+ return null;
+ }, NO_OP);
+ }
+
+ @Test
+ @BMRule(name = "Delay saving index summary, manifest may link partially written file if there is no lock",
+ targetClass = "SSTableReader",
+ targetMethod = "saveSummary(Descriptor, DecoratedKey, DecoratedKey, IndexSummary)",
+ targetLocation = "AFTER INVOKE serialize",
+ condition = "$descriptor.cfname.contains(\"Standard1\")",
+ action = "org.apache.cassandra.db.streaming.EntireSSTableStreamConcurrentComponentMutationTest.countDown();Thread.sleep(5000);")
+ public void testStreamWithIndexSummaryRedistributionDelaySavingSummary() throws Exception
+ {
+ testStreamWithConcurrentComponentMutation(() -> {
+ // wait until new index summary is partially written
+ latch.await(1, TimeUnit.MINUTES);
+ return null;
+ }, this::indexSummaryRedistribution);
+ }
+
+ // used by byteman
+ private static void countDown()
+ {
+ latch.countDown();
+ }
+
+ private void testStreamWithConcurrentComponentMutation(Callable<?> runBeforeStreaming, Callable<?> runConcurrentWithStreaming) throws Exception
+ {
+ ByteBuf serializedFile = Unpooled.buffer(8192);
+ InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
+ StreamSession session = setupStreamingSessionForTest();
+ Collection<OutgoingStream> outgoingStreams = store.getStreamManager().createOutgoingStreams(session, rangesAtEndpoint, NO_PENDING_REPAIR, PreviewKind.NONE);
+ CassandraOutgoingFile outgoingFile = (CassandraOutgoingFile) Iterables.getOnlyElement(outgoingStreams);
+
+ Future<?> streaming = executeAsync(() -> {
+ runBeforeStreaming.call();
+
+ try (AsyncStreamingOutputPlus out = new AsyncStreamingOutputPlus(createMockNettyChannel(serializedFile)))
+ {
+ outgoingFile.write(session, out, MessagingService.current_version);
+ assertTrue(sstable.descriptor.getTemporaryFiles().isEmpty());
+ }
+ return null;
+ });
+
+ Future<?> concurrentMutations = executeAsync(runConcurrentWithStreaming);
+
+ streaming.get(3, TimeUnit.MINUTES);
+ concurrentMutations.get(3, TimeUnit.MINUTES);
+
+ session.prepareReceiving(new StreamSummary(sstable.metadata().id, 1, 5104));
+ StreamMessageHeader messageHeader = new StreamMessageHeader(sstable.metadata().id, peer, session.planId(), false, 0, 0, 0, null);
+
+ try (DataInputBuffer in = new DataInputBuffer(serializedFile.nioBuffer(), false))
+ {
+ CassandraStreamHeader header = CassandraStreamHeader.serializer.deserialize(in, MessagingService.current_version);
+ CassandraEntireSSTableStreamReader reader = new CassandraEntireSSTableStreamReader(messageHeader, header, session);
+ SSTableReader streamedSSTable = Iterables.getOnlyElement(reader.read(in).finished());
+
+ SSTableUtils.assertContentEquals(sstable, streamedSSTable);
+ }
+ }
+
+ private boolean indexSummaryRedistribution() throws IOException
+ {
+ long nonRedistributingOffHeapSize = 0;
+ long memoryPoolBytes = 1024 * 1024;
+
+ // rewrite index summary file with new min/max index interval
+ TableMetadata origin = store.metadata();
+ MigrationManager.announceTableUpdate(origin.unbuild().minIndexInterval(1).maxIndexInterval(2).build(), true);
+
+ try (LifecycleTransaction txn = store.getTracker().tryModify(sstable, OperationType.INDEX_SUMMARY))
+ {
+ IndexSummaryManager.redistributeSummaries(new IndexSummaryRedistribution(ImmutableMap.of(store.metadata().id, txn),
+ nonRedistributingOffHeapSize,
+ memoryPoolBytes));
+ }
+
+ // reset min/max index interval
+ MigrationManager.announceTableUpdate(origin, true);
+ return true;
+ }
+
+ private Future<?> executeAsync(Callable<?> task)
+ {
+ return service.submit(() -> {
+ try
+ {
+ task.call();
+ }
+ catch (Exception e)
+ {
+ throw Throwables.unchecked(e);
+ }
+ });
+ }
+
+ private EmbeddedChannel createMockNettyChannel(ByteBuf serializedFile)
+ {
+ WritableByteChannel wbc = new WritableByteChannel()
+ {
+ private boolean isOpen = true;
+ public int write(ByteBuffer src)
+ {
+ int size = src.limit();
+ serializedFile.writeBytes(src);
+ return size;
+ }
+
+ public boolean isOpen()
+ {
+ return isOpen;
+ }
+
+ public void close()
+ {
+ isOpen = false;
+ }
+ };
+
+ return new EmbeddedChannel(new ChannelOutboundHandlerAdapter() {
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception
+ {
+ if (msg instanceof BufferPoolAllocator.Wrapped)
+ {
+
+ ByteBuffer buf = ((BufferPoolAllocator.Wrapped) msg).adopt();
+ wbc.write(buf);
+ }
+ else
+ {
+ ((SharedDefaultFileRegion) msg).transferTo(wbc, 0);
+ }
+ super.write(ctx, msg, promise);
+ }
+ });
+ }
+
+ private StreamSession setupStreamingSessionForTest()
+ {
+ StreamCoordinator streamCoordinator = new StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(), false, false, null, PreviewKind.NONE);
+ StreamResultFuture future = StreamResultFuture.createInitiator(UUID.randomUUID(), StreamOperation.BOOTSTRAP, Collections.emptyList(), streamCoordinator);
+
+ InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
+ streamCoordinator.addSessionInfo(new SessionInfo(peer, 0, peer, Collections.emptyList(), Collections.emptyList(), StreamSession.State.INITIALIZED));
+
+ StreamSession session = streamCoordinator.getOrCreateNextSession(peer);
+ session.init(future);
+ return session;
+ }
+}
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index 36221ed..bfb3e03 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -18,12 +18,10 @@
package org.apache.cassandra.io.sstable;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.attribute.FileAttribute;
import java.util.*;
import java.util.concurrent.*;
@@ -480,7 +478,7 @@ public class SSTableReaderTest
SSTableReader sstable = indexCfs.getLiveSSTables().iterator().next();
assert sstable.first.getToken() instanceof LocalToken;
- sstable.saveSummary();
+ SSTableReader.saveSummary(sstable.descriptor, sstable.first, sstable.last, sstable.indexSummary);
SSTableReader reopened = SSTableReader.open(sstable.descriptor);
assert reopened.first.getToken() instanceof LocalToken;
reopened.selfRef().release();
diff --git a/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java b/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
index 262a200..850e05c 100644
--- a/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
+++ b/test/unit/org/apache/cassandra/streaming/EntireSSTableStreamingCorrectFilesCountTest.java
@@ -41,9 +41,11 @@ import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.streaming.CassandraOutgoingFile;
+import org.apache.cassandra.db.streaming.ComponentManifest;
import org.apache.cassandra.dht.ByteOrderedPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.locator.InetAddressAndPort;
@@ -60,6 +62,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class EntireSSTableStreamingCorrectFilesCountTest
@@ -120,18 +123,23 @@ public class EntireSSTableStreamingCorrectFilesCountTest
PreviewKind.NONE);
session.addTransferStreams(outgoingStreams);
- DataOutputStreamPlus out = constructDataOutputStream();
+ AsyncStreamingOutputPlus out = constructDataOutputStream();
for (OutgoingStream outgoingStream : outgoingStreams)
+ {
outgoingStream.write(session, out, MessagingService.VERSION_40);
+ // verify hardlinks are removed after streaming
+ Descriptor descriptor = ((CassandraOutgoingFile) outgoingStream).getRef().get().descriptor;
+ assertTrue(descriptor.getTemporaryFiles().isEmpty());
+ }
int totalNumberOfFiles = session.transfers.get(store.metadata.id).getTotalNumberOfFiles();
- assertEquals(CassandraOutgoingFile.getComponentManifest(sstable).components().size(), totalNumberOfFiles);
+ assertEquals(ComponentManifest.create(sstable.descriptor).components().size(), totalNumberOfFiles);
assertEquals(streamEventHandler.fileNames.size(), totalNumberOfFiles);
}
- private DataOutputStreamPlus constructDataOutputStream()
+ private AsyncStreamingOutputPlus constructDataOutputStream()
{
// This is needed as Netty releases the ByteBuffers as soon as the channel is flushed
ByteBuf serializedFile = Unpooled.buffer(8192);
diff --git a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
index be443b5..11f6b55 100644
--- a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
@@ -172,7 +172,7 @@ public class CompressedInputStreamTest
}
// read buffer using CompressedInputStream
- CompressionInfo info = new CompressionInfo(chunks, param);
+ CompressionInfo info = CompressionInfo.newInstance(chunks, param);
if (testException)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org