You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2018/03/15 23:42:52 UTC
[4/5] cassandra git commit: Abstract streaming for pluggable storage
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java b/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java
new file mode 100644
index 0000000..2f56786
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.streaming;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Iterator;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.DoubleSupplier;
+
+import com.google.common.collect.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.util.concurrent.FastThreadLocalThread;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RebufferingInputStream;
+import org.apache.cassandra.db.streaming.CassandraStreamReader.StreamDeserializer;
+import org.apache.cassandra.utils.ChecksumType;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+/**
+ * InputStream which reads data from underlining source with given {@link CompressionInfo}. Uses {@link #buffer} as a buffer
+ * for uncompressed data (which is read by stream consumers - {@link StreamDeserializer} in this case).
+ */
+public class CompressedInputStream extends RebufferingInputStream implements AutoCloseable
+{
+
+ private static final Logger logger = LoggerFactory.getLogger(CompressedInputStream.class);
+
+ private final CompressionInfo info;
+ // chunk buffer
+ private final BlockingQueue<ByteBuffer> dataBuffer;
+ private final DoubleSupplier crcCheckChanceSupplier;
+
+ /**
+ * The base offset of the current {@link #buffer} from the beginning of the stream.
+ */
+ private long bufferOffset = 0;
+
+ /**
+ * The current {@link CompressedCassandraStreamReader#sections} offset in the stream.
+ */
+ private long current = 0;
+
+ private final ChecksumType checksumType;
+
+ private static final int CHECKSUM_LENGTH = 4;
+
+ /**
+ * Indicates there was a problem when reading from source stream.
+ * When this is added to the <code>dataBuffer</code> by the stream Reader,
+ * it is expected that the <code>readException</code> variable is populated
+ * with the cause of the error when reading from source stream, so it is
+ * thrown to the consumer on subsequent read operation.
+ */
+ private static final ByteBuffer POISON_PILL = ByteBuffer.wrap(new byte[0]);
+
+ private volatile IOException readException = null;
+
+ private long totalCompressedBytesRead;
+
+ /**
+ * @param source Input source to read compressed data from
+ * @param info Compression info
+ */
+ public CompressedInputStream(DataInputPlus source, CompressionInfo info, ChecksumType checksumType, DoubleSupplier crcCheckChanceSupplier)
+ {
+ super(ByteBuffer.allocateDirect(info.parameters.chunkLength()));
+ buffer.limit(buffer.position()); // force the buffer to appear "consumed" so that it triggers reBuffer on the first read
+ this.info = info;
+ this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
+ this.crcCheckChanceSupplier = crcCheckChanceSupplier;
+ this.checksumType = checksumType;
+
+ new FastThreadLocalThread(new Reader(source, info, dataBuffer)).start();
+ }
+
+ /**
+ * Invoked when crossing into the next stream boundary in {@link CompressedCassandraStreamReader#sections}.
+ */
+ public void position(long position) throws IOException
+ {
+ if (readException != null)
+ throw readException;
+
+ assert position >= current : "stream can only read forward.";
+ current = position;
+
+ if (current > bufferOffset + buffer.limit())
+ reBuffer(false);
+
+ buffer.position((int)(current - bufferOffset));
+ }
+
+ protected void reBuffer() throws IOException
+ {
+ reBuffer(true);
+ }
+
+ private void reBuffer(boolean updateCurrent) throws IOException
+ {
+ if (readException != null)
+ {
+ FileUtils.clean(buffer);
+ buffer = null;
+ throw readException;
+ }
+
+ // increment the offset into the stream based on the current buffer's read count
+ if (updateCurrent)
+ current += buffer.position();
+
+ try
+ {
+ ByteBuffer compressedWithCRC = dataBuffer.take();
+ if (compressedWithCRC == POISON_PILL)
+ {
+ assert readException != null;
+ throw readException;
+ }
+
+ decompress(compressedWithCRC);
+ }
+ catch (InterruptedException e)
+ {
+ throw new EOFException("No chunk available");
+ }
+ }
+
+ private void decompress(ByteBuffer compressed) throws IOException
+ {
+ int length = compressed.remaining();
+
+ // uncompress if the buffer size is less than the max chunk size. else, if the buffer size is greater than or equal to the maxCompressedLength,
+ // we assume the buffer is not compressed. see CASSANDRA-10520
+ final boolean releaseCompressedBuffer;
+ if (length - CHECKSUM_LENGTH < info.parameters.maxCompressedLength())
+ {
+ buffer.clear();
+ compressed.limit(length - CHECKSUM_LENGTH);
+ info.parameters.getSstableCompressor().uncompress(compressed, buffer);
+ buffer.flip();
+ releaseCompressedBuffer = true;
+ }
+ else
+ {
+ FileUtils.clean(buffer);
+ buffer = compressed;
+ buffer.limit(length - CHECKSUM_LENGTH);
+ releaseCompressedBuffer = false;
+ }
+ totalCompressedBytesRead += length;
+
+ // validate crc randomly
+ double crcCheckChance = this.crcCheckChanceSupplier.getAsDouble();
+ if (crcCheckChance >= 1d ||
+ (crcCheckChance > 0d && crcCheckChance > ThreadLocalRandom.current().nextDouble()))
+ {
+ ByteBuffer crcBuf = compressed.duplicate();
+ crcBuf.limit(length - CHECKSUM_LENGTH).position(0);
+ int checksum = (int) checksumType.of(crcBuf);
+
+ crcBuf.limit(length);
+ if (crcBuf.getInt() != checksum)
+ throw new IOException("CRC unmatched");
+ }
+
+ if (releaseCompressedBuffer)
+ FileUtils.clean(compressed);
+
+ // buffer offset is always aligned
+ final int compressedChunkLength = info.parameters.chunkLength();
+ bufferOffset = current & ~(compressedChunkLength - 1);
+ }
+
+ public long getTotalCompressedBytesRead()
+ {
+ return totalCompressedBytesRead;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * Releases the resources specific to this instance, but not the {@link DataInputPlus} that is used by the {@link Reader}.
+ */
+ @Override
+ public void close()
+ {
+ if (buffer != null)
+ {
+ FileUtils.clean(buffer);
+ buffer = null;
+ }
+ }
+
+ class Reader extends WrappedRunnable
+ {
+ private final DataInputPlus source;
+ private final Iterator<CompressionMetadata.Chunk> chunks;
+ private final BlockingQueue<ByteBuffer> dataBuffer;
+
+ Reader(DataInputPlus source, CompressionInfo info, BlockingQueue<ByteBuffer> dataBuffer)
+ {
+ this.source = source;
+ this.chunks = Iterators.forArray(info.chunks);
+ this.dataBuffer = dataBuffer;
+ }
+
+ protected void runMayThrow() throws Exception
+ {
+ byte[] tmp = null;
+ while (chunks.hasNext())
+ {
+ CompressionMetadata.Chunk chunk = chunks.next();
+
+ int readLength = chunk.length + 4; // read with CRC
+ ByteBuffer compressedWithCRC = null;
+ try
+ {
+ final int r;
+ if (source instanceof ReadableByteChannel)
+ {
+ compressedWithCRC = ByteBuffer.allocateDirect(readLength);
+ r = ((ReadableByteChannel)source).read(compressedWithCRC);
+ compressedWithCRC.flip();
+ }
+ else
+ {
+ // read into an on-heap araay, then copy over to an off-heap buffer. at a minumum snappy requires
+ // off-heap buffers for decompression, else we could have just wrapped the plain byte array in a ByteBuffer
+ if (tmp == null || tmp.length < info.parameters.chunkLength() + CHECKSUM_LENGTH)
+ tmp = new byte[info.parameters.chunkLength() + CHECKSUM_LENGTH];
+ source.readFully(tmp, 0, readLength);
+ compressedWithCRC = ByteBuffer.allocateDirect(readLength);
+ compressedWithCRC.put(tmp, 0, readLength);
+ compressedWithCRC.position(0);
+ r = readLength;
+ }
+
+ if (r < 0)
+ {
+ FileUtils.clean(compressedWithCRC);
+ readException = new EOFException("No chunk available");
+ dataBuffer.put(POISON_PILL);
+ return; // throw exception where we consume dataBuffer
+ }
+ }
+ catch (IOException e)
+ {
+ if (!(e instanceof EOFException))
+ logger.warn("Error while reading compressed input stream.", e);
+ if (compressedWithCRC != null)
+ FileUtils.clean(compressedWithCRC);
+
+ readException = e;
+ dataBuffer.put(POISON_PILL);
+ return; // throw exception where we consume dataBuffer
+ }
+ dataBuffer.put(compressedWithCRC);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/db/streaming/CompressionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CompressionInfo.java b/src/java/org/apache/cassandra/db/streaming/CompressionInfo.java
new file mode 100644
index 0000000..0f0d5c7
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/streaming/CompressionInfo.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * Container that carries compression parameters and chunks to decompress data from stream.
+ */
+public class CompressionInfo
+{
+ public static final IVersionedSerializer<CompressionInfo> serializer = new CompressionInfoSerializer();
+
+ public final CompressionMetadata.Chunk[] chunks;
+ public final CompressionParams parameters;
+
+ public CompressionInfo(CompressionMetadata.Chunk[] chunks, CompressionParams parameters)
+ {
+ assert chunks != null && parameters != null;
+ this.chunks = chunks;
+ this.parameters = parameters;
+ }
+
+ static CompressionInfo fromCompressionMetadata(CompressionMetadata metadata, List<Pair<Long, Long>> sections)
+ {
+ if (metadata == null)
+ {
+ return null;
+ }
+ else
+ {
+ return new CompressionInfo(metadata.getChunksForSections(sections), metadata.parameters);
+ }
+
+ }
+
+ static class CompressionInfoSerializer implements IVersionedSerializer<CompressionInfo>
+ {
+ public void serialize(CompressionInfo info, DataOutputPlus out, int version) throws IOException
+ {
+ if (info == null)
+ {
+ out.writeInt(-1);
+ return;
+ }
+
+ int chunkCount = info.chunks.length;
+ out.writeInt(chunkCount);
+ for (int i = 0; i < chunkCount; i++)
+ CompressionMetadata.Chunk.serializer.serialize(info.chunks[i], out, version);
+ // compression params
+ CompressionParams.serializer.serialize(info.parameters, out, version);
+ }
+
+ public CompressionInfo deserialize(DataInputPlus in, int version) throws IOException
+ {
+ // chunks
+ int chunkCount = in.readInt();
+ if (chunkCount < 0)
+ return null;
+
+ CompressionMetadata.Chunk[] chunks = new CompressionMetadata.Chunk[chunkCount];
+ for (int i = 0; i < chunkCount; i++)
+ chunks[i] = CompressionMetadata.Chunk.serializer.deserialize(in, version);
+
+ // compression params
+ CompressionParams parameters = CompressionParams.serializer.deserialize(in, version);
+ return new CompressionInfo(chunks, parameters);
+ }
+
+ public long serializedSize(CompressionInfo info, int version)
+ {
+ if (info == null)
+ return TypeSizes.sizeof(-1);
+
+ // chunks
+ int chunkCount = info.chunks.length;
+ long size = TypeSizes.sizeof(chunkCount);
+ for (int i = 0; i < chunkCount; i++)
+ size += CompressionMetadata.Chunk.serializer.serializedSize(info.chunks[i], version);
+ // compression params
+ size += CompressionParams.serializer.serializedSize(info.parameters, version);
+ return size;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/db/streaming/package-info.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/package-info.java b/src/java/org/apache/cassandra/db/streaming/package-info.java
new file mode 100644
index 0000000..1e117aa
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/streaming/package-info.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <h2>File transfer</h2>
+ *
+ * When tranferring whole or subsections of an sstable, only the DATA component is shipped. To that end,
+ * there are three "modes" of an sstable transfer that need to be handled somewhat differently:
+ *
+ * 1) uncompressed sstable - data needs to be read into user space so it can be manipulated: checksum validation,
+ * apply stream compression (see next section), and/or TLS encryption.
+ *
+ * 2) compressed sstable, transferred with SSL/TLS - data needs to be read into user space as that is where the TLS encryption
+ * needs to happen. Netty does not allow the pretense of doing zero-copy transfers when TLS is in the pipeline;
+ * data must explicitly be pulled into user-space memory for TLS encryption to work.
+ *
+ * 3) compressed sstable, transferred without SSL/TLS - data can be streamed via zero-copy transfer as the data does not
+ * need to be manipulated (it can be sent "as-is").
+ *
+ * <h3>Compressing the data</h3>
+ * We always want to transfer as few bytes as possible of the wire when streaming a file. If the
+ * sstable is not already compressed via table compression options, we apply an on-the-fly stream compression
+ * to the data. The stream compression format is documented in
+ * {@link org.apache.cassandra.streaming.async.StreamCompressionSerializer}
+ *
+ * You may be wondering: why implement your own compression scheme? why not use netty's built-in compression codecs,
+ * like {@link io.netty.handler.codec.compression.Lz4FrameEncoder}? That makes complete sense if all the sstables
+ * to be streamed are non using sstable compression (and obviously you wouldn't use stream compression when the sstables
+ * are using sstable compression). The problem is when you have a mix of files, some using sstable compression
+ * and some not. You can either:
+ *
+ * - send the files of one type over one kind of socket, and the others over another socket
+ * - send them both over the same socket, but then auto-adjust per each file type.
+ *
+ * I've opted for the latter to keep socket/channel management simpler and cleaner.
+ *
+ */
+package org.apache.cassandra.db.streaming;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index 439ebc6..dfabac2 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.dht;
import java.util.*;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
@@ -155,11 +156,12 @@ public class RangeStreamer
boolean connectSequentially,
int connectionsPerHost)
{
+ Preconditions.checkArgument(streamOperation == StreamOperation.BOOTSTRAP || streamOperation == StreamOperation.REBUILD, streamOperation);
this.metadata = metadata;
this.tokens = tokens;
this.address = address;
this.description = streamOperation.getDescription();
- this.streamPlan = new StreamPlan(streamOperation, connectionsPerHost, true, connectSequentially, null, PreviewKind.NONE);
+ this.streamPlan = new StreamPlan(streamOperation, connectionsPerHost, connectSequentially, null, PreviewKind.NONE);
this.useStrictConsistency = useStrictConsistency;
this.snitch = snitch;
this.stateStore = stateStore;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 7d77ad5..980fdf1 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -18,12 +18,12 @@
package org.apache.cassandra.io.sstable;
import java.io.File;
-import java.io.IOException;
import java.util.*;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
+import org.apache.cassandra.db.streaming.CassandraOutgoingFile;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.io.FSError;
import org.apache.cassandra.schema.TableMetadataRef;
@@ -52,7 +52,7 @@ public class SSTableLoader implements StreamEventHandler
private final Set<InetAddressAndPort> failedHosts = new HashSet<>();
private final List<SSTableReader> sstables = new ArrayList<>();
- private final Multimap<InetAddressAndPort, StreamSession.SSTableStreamingSections> streamingDetails = HashMultimap.create();
+ private final Multimap<InetAddressAndPort, OutgoingStream> streamingDetails = HashMultimap.create();
public SSTableLoader(File directory, Client client, OutputHandler outputHandler)
{
@@ -131,8 +131,8 @@ public class SSTableLoader implements StreamEventHandler
List<Pair<Long, Long>> sstableSections = sstable.getPositionsForRanges(tokenRanges);
long estimatedKeys = sstable.estimatedKeysForRanges(tokenRanges);
Ref<SSTableReader> ref = sstable.ref();
- StreamSession.SSTableStreamingSections details = new StreamSession.SSTableStreamingSections(ref, sstableSections, estimatedKeys);
- streamingDetails.put(endpoint, details);
+ OutgoingStream stream = new CassandraOutgoingFile(StreamOperation.BULK_LOAD, ref, sstableSections, estimatedKeys);
+ streamingDetails.put(endpoint, stream);
}
// to conserve heap space when bulk loading
@@ -160,7 +160,7 @@ public class SSTableLoader implements StreamEventHandler
client.init(keyspace);
outputHandler.output("Established connection to initial hosts");
- StreamPlan plan = new StreamPlan(StreamOperation.BULK_LOAD, connectionsPerHost, false, false, null, PreviewKind.NONE).connectionFactory(client.getConnectionFactory());
+ StreamPlan plan = new StreamPlan(StreamOperation.BULK_LOAD, connectionsPerHost, false, null, PreviewKind.NONE).connectionFactory(client.getConnectionFactory());
Map<InetAddressAndPort, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap();
openSSTables(endpointToRanges);
@@ -178,15 +178,15 @@ public class SSTableLoader implements StreamEventHandler
if (toIgnore.contains(remote))
continue;
- List<StreamSession.SSTableStreamingSections> endpointDetails = new LinkedList<>();
+ List<OutgoingStream> streams = new LinkedList<>();
// references are acquired when constructing the SSTableStreamingSections above
- for (StreamSession.SSTableStreamingSections details : streamingDetails.get(remote))
+ for (OutgoingStream stream : streamingDetails.get(remote))
{
- endpointDetails.add(details);
+ streams.add(stream);
}
- plan.transferFiles(remote, endpointDetails);
+ plan.transferStreams(remote, streams);
}
plan.listeners(this, listeners);
return plan.execute();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
index 2c4fae4..8d58673 100644
--- a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
@@ -52,7 +52,6 @@ public class AsymmetricLocalSyncTask extends AsymmetricSyncTask implements Strea
InetAddressAndPort preferred = SystemKeyspace.getPreferredIP(fetchFrom);
StreamPlan plan = new StreamPlan(StreamOperation.REPAIR,
1, false,
- false,
pendingRepair,
previewKind)
.listeners(this)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/repair/LocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
index 60d571b..3901c75 100644
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@ -61,7 +61,7 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler
@VisibleForTesting
StreamPlan createStreamPlan(InetAddressAndPort dst, InetAddressAndPort preferred, List<Range<Token>> differences)
{
- StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, false, pendingRepair, previewKind)
+ StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, pendingRepair, previewKind)
.listeners(this)
.flushBeforeTransfer(pendingRepair == null)
.requestRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily); // request ranges from the remote node
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index 0122b31..725e84d 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -79,7 +79,7 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
@VisibleForTesting
StreamPlan createStreamPlan(InetAddressAndPort dest, InetAddressAndPort preferred)
{
- StreamPlan sp = new StreamPlan(StreamOperation.REPAIR, 1, false, false, pendingRepair, previewKind)
+ StreamPlan sp = new StreamPlan(StreamOperation.REPAIR, 1, false, pendingRepair, previewKind)
.listeners(this)
.flushBeforeTransfer(pendingRepair == null) // sstables are isolated at the beginning of an incremental repair session, so flushing isn't neccessary
.requestRanges(dest, preferred, desc.keyspace, ranges, desc.columnFamily); // request ranges from the remote node
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/IncomingStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/IncomingStream.java b/src/java/org/apache/cassandra/streaming/IncomingStream.java
new file mode 100644
index 0000000..18bebf5
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/IncomingStream.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.schema.TableId;
+
+/**
+ * The counterpart of {@link OutgoingStream} on the receiving side.
+ *
+ * Data streamed in can (and should) be persisted, but must not be included in the table's
+ * live data set until added by {@link StreamReceiver}. If the stream fails, the stream receiver will
+ * delete the streamed data, but implementations still need to handle the case where it's process dies
+ * during streaming, and it has data left around on startup, in which case it should be deleted.
+ */
+public interface IncomingStream
+{
+
+ /**
+ * Read in the stream data.
+ */
+ void read(DataInputPlus inputPlus, int version) throws IOException;
+
+ String getName();
+ long getSize();
+ TableId getTableId();
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/OutgoingStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/OutgoingStream.java b/src/java/org/apache/cassandra/streaming/OutgoingStream.java
new file mode 100644
index 0000000..e71b985
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/OutgoingStream.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.schema.TableId;
+
+/**
+ * Some subset of data to be streamed. Implementations handle writing out their data via the write method.
+ * On the receiving end, {@link IncomingStream} streams the data in.
+ *
+ * All the data contained in a given stream needs to have the same repairedAt timestamp (or 0) and pendingRepair
+ * id (or null).
+ */
+public interface OutgoingStream
+{
+ /**
+ * Write the streams data into the socket
+ */
+ void write(StreamSession session, DataOutputStreamPlus output, int version) throws IOException;
+
+ /**
+ * Release any resources held by the stream
+ */
+ void finish();
+
+ long getRepairedAt();
+ UUID getPendingRepair();
+
+ String getName();
+ long getSize();
+ TableId getTableId();
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/PreviewKind.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/PreviewKind.java b/src/java/org/apache/cassandra/streaming/PreviewKind.java
index 3b4d2a0..51c5746 100644
--- a/src/java/org/apache/cassandra/streaming/PreviewKind.java
+++ b/src/java/org/apache/cassandra/streaming/PreviewKind.java
@@ -21,26 +21,19 @@ package org.apache.cassandra.streaming;
import java.util.UUID;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-
public enum PreviewKind
{
- NONE(0, null),
- ALL(1, Predicates.alwaysTrue()),
- UNREPAIRED(2, Predicates.not(SSTableReader::isRepaired)),
- REPAIRED(3, SSTableReader::isRepaired);
+ NONE(0),
+ ALL(1),
+ UNREPAIRED(2),
+ REPAIRED(3);
private final int serializationVal;
- private final Predicate<SSTableReader> streamingPredicate;
- PreviewKind(int serializationVal, Predicate<SSTableReader> streamingPredicate)
+ PreviewKind(int serializationVal)
{
assert ordinal() == serializationVal;
this.serializationVal = serializationVal;
- this.streamingPredicate = streamingPredicate;
}
public int getSerializationVal()
@@ -53,10 +46,6 @@ public enum PreviewKind
return values()[serializationVal];
}
- public Predicate<SSTableReader> getStreamingPredicate()
- {
- return streamingPredicate;
- }
public boolean isPreview()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/ProgressInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ProgressInfo.java b/src/java/org/apache/cassandra/streaming/ProgressInfo.java
index 2334599..ac91855 100644
--- a/src/java/org/apache/cassandra/streaming/ProgressInfo.java
+++ b/src/java/org/apache/cassandra/streaming/ProgressInfo.java
@@ -24,7 +24,7 @@ import com.google.common.base.Objects;
import org.apache.cassandra.locator.InetAddressAndPort;
/**
- * ProgressInfo contains file transfer progress.
+ * ProgressInfo contains stream transfer progress.
*/
public class ProgressInfo implements Serializable
{
@@ -69,7 +69,7 @@ public class ProgressInfo implements Serializable
}
/**
- * @return true if file transfer is completed
+ * @return true if transfer is completed
*/
public boolean isCompleted()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/SessionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/SessionInfo.java b/src/java/org/apache/cassandra/streaming/SessionInfo.java
index bbca753..4b4bbed 100644
--- a/src/java/org/apache/cassandra/streaming/SessionInfo.java
+++ b/src/java/org/apache/cassandra/streaming/SessionInfo.java
@@ -70,7 +70,7 @@ public final class SessionInfo implements Serializable
}
/**
- * Update progress of receiving/sending file.
+ * Update progress of receiving/sending stream.
*
* @param newProgress new progress info
*/
@@ -157,11 +157,11 @@ public final class SessionInfo implements Serializable
return getTotalSizes(sendingSummaries);
}
- private long getTotalSizeInProgress(Collection<ProgressInfo> files)
+ private long getTotalSizeInProgress(Collection<ProgressInfo> streams)
{
long total = 0;
- for (ProgressInfo file : files)
- total += file.currentBytes;
+ for (ProgressInfo stream : streams)
+ total += stream.currentBytes;
return total;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index a22e07d..139488d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@ -46,18 +46,18 @@ public class StreamCoordinator
private final boolean connectSequentially;
private Map<InetAddressAndPort, HostStreamingData> peerSessions = new HashMap<>();
+ private final StreamOperation streamOperation;
private final int connectionsPerHost;
private StreamConnectionFactory factory;
- private final boolean keepSSTableLevel;
private Iterator<StreamSession> sessionsToConnect = null;
private final UUID pendingRepair;
private final PreviewKind previewKind;
- public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, StreamConnectionFactory factory,
+ public StreamCoordinator(StreamOperation streamOperation, int connectionsPerHost, StreamConnectionFactory factory,
boolean connectSequentially, UUID pendingRepair, PreviewKind previewKind)
{
+ this.streamOperation = streamOperation;
this.connectionsPerHost = connectionsPerHost;
- this.keepSSTableLevel = keepSSTableLevel;
this.factory = factory;
this.connectSequentially = connectSequentially;
this.pendingRepair = pendingRepair;
@@ -191,51 +191,47 @@ public class StreamCoordinator
return result;
}
- public synchronized void transferFiles(InetAddressAndPort to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)
+ public synchronized void transferStreams(InetAddressAndPort to, Collection<OutgoingStream> streams)
{
HostStreamingData sessionList = getOrCreateHostData(to);
if (connectionsPerHost > 1)
{
- List<List<StreamSession.SSTableStreamingSections>> buckets = sliceSSTableDetails(sstableDetails);
+ List<Collection<OutgoingStream>> buckets = bucketStreams(streams);
- for (List<StreamSession.SSTableStreamingSections> subList : buckets)
+ for (Collection<OutgoingStream> bucket : buckets)
{
StreamSession session = sessionList.getOrCreateNextSession(to, to);
- session.addTransferFiles(subList);
+ session.addTransferStreams(bucket);
}
}
else
{
StreamSession session = sessionList.getOrCreateNextSession(to, to);
- session.addTransferFiles(sstableDetails);
+ session.addTransferStreams(streams);
}
}
- private List<List<StreamSession.SSTableStreamingSections>> sliceSSTableDetails(Collection<StreamSession.SSTableStreamingSections> sstableDetails)
+ private List<Collection<OutgoingStream>> bucketStreams(Collection<OutgoingStream> streams)
{
// There's no point in divvying things up into more buckets than we have sstableDetails
- int targetSlices = Math.min(sstableDetails.size(), connectionsPerHost);
- int step = Math.round((float) sstableDetails.size() / (float) targetSlices);
+ int targetSlices = Math.min(streams.size(), connectionsPerHost);
+ int step = Math.round((float) streams.size() / (float) targetSlices);
int index = 0;
- List<List<StreamSession.SSTableStreamingSections>> result = new ArrayList<>();
- List<StreamSession.SSTableStreamingSections> slice = null;
- Iterator<StreamSession.SSTableStreamingSections> iter = sstableDetails.iterator();
- while (iter.hasNext())
- {
- StreamSession.SSTableStreamingSections streamSession = iter.next();
+ List<Collection<OutgoingStream>> result = new ArrayList<>();
+ List<OutgoingStream> slice = null;
+ for (OutgoingStream stream: streams)
+ {
if (index % step == 0)
{
slice = new ArrayList<>();
result.add(slice);
}
- slice.add(streamSession);
+ slice.add(stream);
++index;
- iter.remove();
}
-
return result;
}
@@ -302,7 +298,7 @@ public class StreamCoordinator
// create
if (streamSessions.size() < connectionsPerHost)
{
- StreamSession session = new StreamSession(peer, connecting, factory, streamSessions.size(), keepSSTableLevel, pendingRepair, previewKind);
+ StreamSession session = new StreamSession(streamOperation, peer, connecting, factory, streamSessions.size(), pendingRepair, previewKind);
streamSessions.put(++lastReturned, session);
return session;
}
@@ -334,7 +330,7 @@ public class StreamCoordinator
StreamSession session = streamSessions.get(id);
if (session == null)
{
- session = new StreamSession(peer, connecting, factory, id, keepSSTableLevel, pendingRepair, previewKind);
+ session = new StreamSession(streamOperation, peer, connecting, factory, id, pendingRepair, previewKind);
streamSessions.put(id, session);
}
return session;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/StreamHook.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamHook.java b/src/java/org/apache/cassandra/streaming/StreamHook.java
index d610297..86b5182 100644
--- a/src/java/org/apache/cassandra/streaming/StreamHook.java
+++ b/src/java/org/apache/cassandra/streaming/StreamHook.java
@@ -18,19 +18,17 @@
package org.apache.cassandra.streaming;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.io.sstable.SSTableMultiWriter;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.streaming.messages.OutgoingStreamMessage;
import org.apache.cassandra.utils.FBUtilities;
public interface StreamHook
{
public static final StreamHook instance = createHook();
- public OutgoingFileMessage reportOutgoingFile(StreamSession session, SSTableReader sstable, OutgoingFileMessage message);
+ public OutgoingStreamMessage reportOutgoingStream(StreamSession session, OutgoingStream stream, OutgoingStreamMessage message);
public void reportStreamFuture(StreamSession session, StreamResultFuture future);
- public void reportIncomingFile(ColumnFamilyStore cfs, SSTableMultiWriter writer, StreamSession session, int sequenceNumber);
+ public void reportIncomingStream(TableId tableId, IncomingStream stream, StreamSession session, int sequenceNumber);
static StreamHook createHook()
{
@@ -43,14 +41,14 @@ public interface StreamHook
{
return new StreamHook()
{
- public OutgoingFileMessage reportOutgoingFile(StreamSession session, SSTableReader sstable, OutgoingFileMessage message)
+ public OutgoingStreamMessage reportOutgoingStream(StreamSession session, OutgoingStream stream, OutgoingStreamMessage message)
{
return message;
}
public void reportStreamFuture(StreamSession session, StreamResultFuture future) {}
- public void reportIncomingFile(ColumnFamilyStore cfs, SSTableMultiWriter writer, StreamSession session, int sequenceNumber) {}
+ public void reportIncomingStream(TableId tableId, IncomingStream stream, StreamSession session, int sequenceNumber) {}
};
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 43e9068..98d68ce 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -48,19 +48,19 @@ public class StreamPlan
*/
public StreamPlan(StreamOperation streamOperation)
{
- this(streamOperation, 1, false, false, NO_PENDING_REPAIR, PreviewKind.NONE);
+ this(streamOperation, 1, false, NO_PENDING_REPAIR, PreviewKind.NONE);
}
- public StreamPlan(StreamOperation streamOperation, boolean keepSSTableLevels, boolean connectSequentially)
+ public StreamPlan(StreamOperation streamOperation, boolean connectSequentially)
{
- this(streamOperation, 1, keepSSTableLevels, connectSequentially, NO_PENDING_REPAIR, PreviewKind.NONE);
+ this(streamOperation, 1, connectSequentially, NO_PENDING_REPAIR, PreviewKind.NONE);
}
- public StreamPlan(StreamOperation streamOperation, int connectionsPerHost, boolean keepSSTableLevels,
+ public StreamPlan(StreamOperation streamOperation, int connectionsPerHost,
boolean connectSequentially, UUID pendingRepair, PreviewKind previewKind)
{
this.streamOperation = streamOperation;
- this.coordinator = new StreamCoordinator(connectionsPerHost, keepSSTableLevels, new DefaultConnectionFactory(),
+ this.coordinator = new StreamCoordinator(streamOperation, connectionsPerHost, new DefaultConnectionFactory(),
connectSequentially, pendingRepair, previewKind);
}
@@ -137,18 +137,16 @@ public class StreamPlan
}
/**
- * Add transfer task to send given SSTable files.
+ * Add transfer task to send given streams
*
* @param to endpoint address of receiver
- * @param sstableDetails sstables with file positions and estimated key count.
- * this collection will be modified to remove those files that are successfully handed off
+ * @param streams streams to send
* @return this object for chaining
*/
- public StreamPlan transferFiles(InetAddressAndPort to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)
+ public StreamPlan transferStreams(InetAddressAndPort to, Collection<OutgoingStream> streams)
{
- coordinator.transferFiles(to, sstableDetails);
+ coordinator.transferStreams(to, streams);
return this;
-
}
public StreamPlan listeners(StreamEventHandler handler, StreamEventHandler... handlers)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
deleted file mode 100644
index f4eb9c4..0000000
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-import java.io.*;
-import java.util.Collection;
-import java.util.UUID;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.UnmodifiableIterator;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.io.util.TrackedDataInputPlus;
-import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.io.sstable.SSTableMultiWriter;
-import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
-import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
-import org.apache.cassandra.io.sstable.format.SSTableFormat;
-import org.apache.cassandra.io.sstable.format.Version;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.streaming.compress.StreamCompressionInputStream;
-import org.apache.cassandra.streaming.messages.FileMessageHeader;
-import org.apache.cassandra.streaming.messages.StreamMessage;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
-
-/**
- * StreamReader reads from stream and writes to SSTable.
- */
-public class StreamReader
-{
- private static final Logger logger = LoggerFactory.getLogger(StreamReader.class);
- protected final TableId tableId;
- protected final long estimatedKeys;
- protected final Collection<Pair<Long, Long>> sections;
- protected final StreamSession session;
- protected final Version inputVersion;
- protected final long repairedAt;
- protected final UUID pendingRepair;
- protected final SSTableFormat.Type format;
- protected final int sstableLevel;
- protected final SerializationHeader.Component header;
- protected final int fileSeqNum;
-
- public StreamReader(FileMessageHeader header, StreamSession session)
- {
- if (session.getPendingRepair() != null)
- {
- // we should only ever be streaming pending repair
- // sstables if the session has a pending repair id
- assert session.getPendingRepair().equals(header.pendingRepair);
- }
- this.session = session;
- this.tableId = header.tableId;
- this.estimatedKeys = header.estimatedKeys;
- this.sections = header.sections;
- this.inputVersion = header.version;
- this.repairedAt = header.repairedAt;
- this.pendingRepair = header.pendingRepair;
- this.format = header.format;
- this.sstableLevel = header.sstableLevel;
- this.header = header.header;
- this.fileSeqNum = header.sequenceNumber;
- }
-
- /**
- * @param inputPlus where this reads data from
- * @return SSTable transferred
- * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
- */
- @SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed
- public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException
- {
- long totalSize = totalSize();
-
- ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
- if (cfs == null)
- {
- // schema was dropped during streaming
- throw new IOException("CF " + tableId + " was dropped during streaming");
- }
-
- logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}', pendingRepair = '{}'.",
- session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(),
- cfs.getTableName(), pendingRepair);
-
- StreamDeserializer deserializer = null;
- SSTableMultiWriter writer = null;
- try (StreamCompressionInputStream streamCompressionInputStream = new StreamCompressionInputStream(inputPlus, StreamMessage.CURRENT_VERSION))
- {
- TrackedDataInputPlus in = new TrackedDataInputPlus(streamCompressionInputStream);
- deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata()));
- writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, format);
- while (in.getBytesRead() < totalSize)
- {
- writePartition(deserializer, writer);
- // TODO move this to BytesReadTracker
- session.progress(writer.getFilename(), ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
- }
- logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}",
- session.planId(), fileSeqNum, session.peer, FBUtilities.prettyPrintMemory(in.getBytesRead()), FBUtilities.prettyPrintMemory(totalSize));
- return writer;
- }
- catch (Throwable e)
- {
- Object partitionKey = deserializer != null ? deserializer.partitionKey() : "";
- logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
- session.planId(), partitionKey, cfs.keyspace.getName(), cfs.getTableName(), e);
- if (writer != null)
- {
- writer.abort(e);
- }
- throw Throwables.propagate(e);
- }
- }
-
- protected SerializationHeader getHeader(TableMetadata metadata)
- {
- return header != null? header.toHeader(metadata) : null; //pre-3.0 sstable have no SerializationHeader
- }
-
- protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, UUID pendingRepair, SSTableFormat.Type format) throws IOException
- {
- Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
- if (localDir == null)
- throw new IOException(String.format("Insufficient disk space to store %s", FBUtilities.prettyPrintMemory(totalSize)));
-
- RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, pendingRepair, format, sstableLevel, totalSize, session.getTransaction(tableId), getHeader(cfs.metadata()));
- StreamHook.instance.reportIncomingFile(cfs, writer, session, fileSeqNum);
- return writer;
- }
-
- protected long totalSize()
- {
- long size = 0;
- for (Pair<Long, Long> section : sections)
- size += section.right - section.left;
- return size;
- }
-
- protected void writePartition(StreamDeserializer deserializer, SSTableMultiWriter writer) throws IOException
- {
- writer.append(deserializer.newPartition());
- deserializer.checkForExceptions();
- }
-
- public static class StreamDeserializer extends UnmodifiableIterator<Unfiltered> implements UnfilteredRowIterator
- {
- private final TableMetadata metadata;
- private final DataInputPlus in;
- private final SerializationHeader header;
- private final SerializationHelper helper;
-
- private DecoratedKey key;
- private DeletionTime partitionLevelDeletion;
- private SSTableSimpleIterator iterator;
- private Row staticRow;
- private IOException exception;
-
- public StreamDeserializer(TableMetadata metadata, DataInputPlus in, Version version, SerializationHeader header) throws IOException
- {
- this.metadata = metadata;
- this.in = in;
- this.helper = new SerializationHelper(metadata, version.correspondingMessagingVersion(), SerializationHelper.Flag.PRESERVE_SIZE);
- this.header = header;
- }
-
- public StreamDeserializer newPartition() throws IOException
- {
- key = metadata.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in));
- partitionLevelDeletion = DeletionTime.serializer.deserialize(in);
- iterator = SSTableSimpleIterator.create(metadata, in, header, helper, partitionLevelDeletion);
- staticRow = iterator.readStaticRow();
- return this;
- }
-
- public TableMetadata metadata()
- {
- return metadata;
- }
-
- public RegularAndStaticColumns columns()
- {
- // We don't know which columns we'll get so assume it can be all of them
- return metadata.regularAndStaticColumns();
- }
-
- public boolean isReverseOrder()
- {
- return false;
- }
-
- public DecoratedKey partitionKey()
- {
- return key;
- }
-
- public DeletionTime partitionLevelDeletion()
- {
- return partitionLevelDeletion;
- }
-
- public Row staticRow()
- {
- return staticRow;
- }
-
- public EncodingStats stats()
- {
- return header.stats();
- }
-
- public boolean hasNext()
- {
- try
- {
- return iterator.hasNext();
- }
- catch (IOError e)
- {
- if (e.getCause() != null && e.getCause() instanceof IOException)
- {
- exception = (IOException)e.getCause();
- return false;
- }
- throw e;
- }
- }
-
- public Unfiltered next()
- {
- // Note that in practice we know that IOException will be thrown by hasNext(), because that's
- // where the actual reading happens, so we don't bother catching RuntimeException here (contrarily
- // to what we do in hasNext)
- Unfiltered unfiltered = iterator.next();
- return metadata.isCounter() && unfiltered.kind() == Unfiltered.Kind.ROW
- ? maybeMarkLocalToBeCleared((Row) unfiltered)
- : unfiltered;
- }
-
- private Row maybeMarkLocalToBeCleared(Row row)
- {
- return metadata.isCounter() ? row.markCounterLocalToBeCleared() : row;
- }
-
- public void checkForExceptions() throws IOException
- {
- if (exception != null)
- throw exception;
- }
-
- public void close()
- {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index b823311..49beba1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -17,38 +17,16 @@
*/
package org.apache.cassandra.streaming;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.rows.ThrottledUnfilteredIterator;
-import org.apache.cassandra.db.rows.UnfilteredRowIterator;
-import org.apache.cassandra.db.view.View;
-import org.apache.cassandra.dht.Bounds;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.ISSTableScanner;
-import org.apache.cassandra.io.sstable.SSTableMultiWriter;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.Throwables;
-import org.apache.cassandra.utils.concurrent.Refs;
/**
* Task that manages receiving files for the session for certain ColumnFamily.
@@ -59,69 +37,51 @@ public class StreamReceiveTask extends StreamTask
private static final ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("StreamReceiveTask"));
- private static final int MAX_ROWS_PER_BATCH = Integer.getInteger("cassandra.repair.mutation_repair_rows_per_batch", 100);
+ private final StreamReceiver receiver;
- // number of files to receive
- private final int totalFiles;
- // total size of files to receive
- private final long totalSize;
+ // number of streams to receive
+ private final int totalStreams;
- // Transaction tracking new files received
- private final LifecycleTransaction txn;
+ // total size of streams to receive
+ private final long totalSize;
// true if task is done (either completed or aborted)
private volatile boolean done = false;
- // holds references to SSTables received
- protected Collection<SSTableReader> sstables;
-
- private int remoteSSTablesReceived = 0;
+ private int remoteStreamsReceived = 0;
- public StreamReceiveTask(StreamSession session, TableId tableId, int totalFiles, long totalSize)
+ public StreamReceiveTask(StreamSession session, TableId tableId, int totalStreams, long totalSize)
{
super(session, tableId);
- this.totalFiles = totalFiles;
+ this.receiver = ColumnFamilyStore.getIfExists(tableId).getStreamManager().createStreamReceiver(session, totalStreams);
+ this.totalStreams = totalStreams;
this.totalSize = totalSize;
- // this is an "offline" transaction, as we currently manually expose the sstables once done;
- // this should be revisited at a later date, so that LifecycleTransaction manages all sstable state changes
- this.txn = LifecycleTransaction.offline(OperationType.STREAM);
- this.sstables = new ArrayList<>(totalFiles);
}
/**
- * Process received file.
+ * Process received stream.
*
- * @param sstable SSTable file received.
+ * @param stream Stream received.
*/
- public synchronized void received(SSTableMultiWriter sstable)
+ public synchronized void received(IncomingStream stream)
{
Preconditions.checkState(!session.isPreview(), "we should never receive sstables when previewing");
if (done)
{
- logger.warn("[{}] Received sstable {} on already finished stream received task. Aborting sstable.", session.planId(),
- sstable.getFilename());
- Throwables.maybeFail(sstable.abort(null));
+ logger.warn("[{}] Received stream {} on already finished stream received task. Aborting stream.", session.planId(),
+ stream.getName());
+ receiver.discardStream(stream);
return;
}
- remoteSSTablesReceived++;
- assert tableId.equals(sstable.getTableId());
- logger.debug("recevied {} of {} total files", remoteSSTablesReceived, totalFiles);
+ remoteStreamsReceived++;
+ Preconditions.checkArgument(tableId.equals(stream.getTableId()));
+ logger.debug("recevied {} of {} total files", remoteStreamsReceived, totalStreams);
- Collection<SSTableReader> finished = null;
- try
- {
- finished = sstable.finish(true);
- }
- catch (Throwable t)
- {
- Throwables.maybeFail(sstable.abort(t));
- }
- txn.update(finished, false);
- sstables.addAll(finished);
+ receiver.received(stream);
- if (remoteSSTablesReceived == totalFiles)
+ if (remoteStreamsReceived == totalStreams)
{
done = true;
executor.submit(new OnCompletionRunnable(this));
@@ -130,7 +90,7 @@ public class StreamReceiveTask extends StreamTask
public int getTotalNumberOfFiles()
{
- return totalFiles;
+ return totalStreams;
}
public long getTotalSize()
@@ -138,11 +98,11 @@ public class StreamReceiveTask extends StreamTask
return totalSize;
}
- public synchronized LifecycleTransaction getTransaction()
+ public synchronized StreamReceiver getReceiver()
{
if (done)
throw new RuntimeException(String.format("Stream receive task %s of cf %s already finished.", session.planId(), tableId));
- return txn;
+ return receiver;
}
private static class OnCompletionRunnable implements Runnable
@@ -154,116 +114,19 @@ public class StreamReceiveTask extends StreamTask
this.task = task;
}
- /*
- * We have a special path for views and for CDC.
- *
- * For views, since the view requires cleaning up any pre-existing state, we must put all partitions
- * through the same write path as normal mutations. This also ensures any 2is are also updated.
- *
- * For CDC-enabled tables, we want to ensure that the mutations are run through the CommitLog so they
- * can be archived by the CDC process on discard.
- */
- private boolean requiresWritePath(ColumnFamilyStore cfs) {
- return hasCDC(cfs) || (task.session.streamOperation().requiresViewBuild() && hasViews(cfs));
- }
-
- private boolean hasViews(ColumnFamilyStore cfs)
- {
- return !Iterables.isEmpty(View.findAll(cfs.metadata.keyspace, cfs.getTableName()));
- }
-
- private boolean hasCDC(ColumnFamilyStore cfs)
- {
- return cfs.metadata().params.cdc;
- }
-
- private void sendThroughWritePath(ColumnFamilyStore cfs, Collection<SSTableReader> readers) {
- boolean hasCdc = hasCDC(cfs);
- ColumnFilter filter = ColumnFilter.all(cfs.metadata());
- for (SSTableReader reader : readers)
- {
- Keyspace ks = Keyspace.open(reader.getKeyspaceName());
- // When doing mutation-based repair we split each partition into smaller batches
- // ({@link Stream MAX_ROWS_PER_BATCH}) to avoid OOMing and generating heap pressure
- try (ISSTableScanner scanner = reader.getScanner();
- CloseableIterator<UnfilteredRowIterator> throttledPartitions = ThrottledUnfilteredIterator.throttle(scanner, MAX_ROWS_PER_BATCH))
- {
- while (throttledPartitions.hasNext())
- {
- // MV *can* be applied unsafe if there's no CDC on the CFS as we flush
- // before transaction is done.
- //
- // If the CFS has CDC, however, these updates need to be written to the CommitLog
- // so they get archived into the cdc_raw folder
- ks.apply(new Mutation(PartitionUpdate.fromIterator(throttledPartitions.next(), filter)),
- hasCdc,
- true,
- false);
- }
- }
- }
- }
-
public void run()
{
- ColumnFamilyStore cfs = null;
- boolean requiresWritePath = false;
try
{
- cfs = ColumnFamilyStore.getIfExists(task.tableId);
- if (cfs == null)
+ if (ColumnFamilyStore.getIfExists(task.tableId) == null)
{
// schema was dropped during streaming
- task.sstables.clear();
- task.abortTransaction();
+ task.receiver.abort();
task.session.taskCompleted(task);
return;
}
- requiresWritePath = requiresWritePath(cfs);
- Collection<SSTableReader> readers = task.sstables;
-
- try (Refs<SSTableReader> refs = Refs.ref(readers))
- {
- if (requiresWritePath)
- {
- sendThroughWritePath(cfs, readers);
- }
- else
- {
- task.finishTransaction();
-
- // add sstables (this will build secondary indexes too, see CASSANDRA-10130)
- logger.debug("[Stream #{}] Received {} sstables from {} ({})", task.session.planId(), readers.size(), task.session.peer, readers);
- cfs.addSSTables(readers);
-
- //invalidate row and counter cache
- if (cfs.isRowCacheEnabled() || cfs.metadata().isCounter())
- {
- List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
- readers.forEach(sstable -> boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken())));
- Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
-
- if (cfs.isRowCacheEnabled())
- {
- int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getTableName());
- }
-
- if (cfs.metadata().isCounter())
- {
- int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
- if (invalidatedKeys > 0)
- logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
- "receive task completed.", task.session.planId(), invalidatedKeys,
- cfs.keyspace.getName(), cfs.getTableName());
- }
- }
- }
- }
+ task.receiver.finished();;
task.session.taskCompleted(task);
}
catch (Throwable t)
@@ -273,14 +136,7 @@ public class StreamReceiveTask extends StreamTask
}
finally
{
- // We don't keep the streamed sstables since we've applied them manually so we abort the txn and delete
- // the streamed sstables.
- if (requiresWritePath)
- {
- if (cfs != null)
- cfs.forceBlockingFlush();
- task.abortTransaction();
- }
+ task.receiver.cleanup();
}
}
}
@@ -297,17 +153,6 @@ public class StreamReceiveTask extends StreamTask
return;
done = true;
- abortTransaction();
- sstables.clear();
- }
-
- private synchronized void abortTransaction()
- {
- txn.abort();
- }
-
- private synchronized void finishTransaction()
- {
- txn.finish();
+ receiver.abort();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/StreamReceiver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiver.java b/src/java/org/apache/cassandra/streaming/StreamReceiver.java
new file mode 100644
index 0000000..bc357ef
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiver.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+/**
+ * StreamReceiver acts as a staging area for incoming data. Received data
+ * ends up here, and is kept separate from the live data until all streams
+ * for a session have been received successfully
+ */
+public interface StreamReceiver
+{
+ /**
+ * Called after we've finished receiving stream data. The data covered by the given stream should
+ * be kept isolated from the live dataset for it's table.
+ */
+ void received(IncomingStream stream);
+
+ /**
+ * This is called when we've received stream data we can't add to the received set for some reason,
+ * usually when we've received data for a session which has been closed. The data backing this stream
+ * should be deleted, and any resources associated with the given stream should be released.
+ */
+ void discardStream(IncomingStream stream);
+
+ /**
+ * Called when something went wrong with a stream session. All data associated with this receiver
+ * should be deleted, and any associated resources should be cleaned up
+ */
+ void abort();
+
+ /**
+ * Called when a stream session has succesfully completed. All stream data being held by this receiver
+ * should be added to the live data sets for their respective tables before this method returns.
+ */
+ void finished();
+
+ /**
+ * Called after finished has returned and we've sent any messages to other nodes. Mainly for
+ * signaling that mvs and cdc should cleanup.
+ */
+ void cleanup();
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index 544f37f..3b11fb6 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -73,9 +73,9 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
set(getCurrentState());
}
- private StreamResultFuture(UUID planId, StreamOperation streamOperation, boolean keepSSTableLevels, UUID pendingRepair, PreviewKind previewKind)
+ private StreamResultFuture(UUID planId, StreamOperation streamOperation, UUID pendingRepair, PreviewKind previewKind)
{
- this(planId, streamOperation, new StreamCoordinator(0, keepSSTableLevels, new DefaultConnectionFactory(), false, pendingRepair, previewKind));
+ this(planId, streamOperation, new StreamCoordinator(streamOperation, 0, new DefaultConnectionFactory(), false, pendingRepair, previewKind));
}
static StreamResultFuture init(UUID planId, StreamOperation streamOperation, Collection<StreamEventHandler> listeners,
@@ -106,7 +106,6 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
StreamOperation streamOperation,
InetAddressAndPort from,
Channel channel,
- boolean keepSSTableLevel,
UUID pendingRepair,
PreviewKind previewKind)
{
@@ -116,7 +115,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
logger.info("[Stream #{} ID#{}] Creating new streaming plan for {}", planId, sessionIndex, streamOperation.getDescription());
// The main reason we create a StreamResultFuture on the receiving side is for JMX exposure.
- future = new StreamResultFuture(planId, streamOperation, keepSSTableLevel, pendingRepair, previewKind);
+ future = new StreamResultFuture(planId, streamOperation, pendingRepair, previewKind);
StreamManager.instance.registerReceiving(future);
}
future.attachConnection(from, sessionIndex, channel);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org