You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2017/08/22 20:55:09 UTC
[02/11] cassandra git commit: move streaming to use netty
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
new file mode 100644
index 0000000..cc6f9e0
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming.async;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.concurrent.FastThreadLocalThread;
+import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus;
+import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.streaming.StreamReceiveException;
+import org.apache.cassandra.streaming.StreamResultFuture;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.messages.FileMessageHeader;
+import org.apache.cassandra.streaming.messages.IncomingFileMessage;
+import org.apache.cassandra.streaming.messages.KeepAliveMessage;
+import org.apache.cassandra.streaming.messages.StreamInitMessage;
+import org.apache.cassandra.streaming.messages.StreamMessage;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+
+import static org.apache.cassandra.streaming.async.NettyStreamingMessageSender.createLogTag;
+
+/**
+ * Handles the inbound side of streaming messages and sstable data. From the incoming data, we derserialize the message
+ * and potentially reify partitions and rows and write those out to new sstable files. Because deserialization is a blocking affair,
+ * we can't block the netty event loop. Thus we have a background thread perform all the blocking deserialization.
+ */
+public class StreamingInboundHandler extends ChannelInboundHandlerAdapter
+{
+ private static final Logger logger = LoggerFactory.getLogger(StreamingInboundHandler.class);
+ static final Function<SessionIdentifier, StreamSession> DEFAULT_SESSION_PROVIDER = sid -> StreamManager.instance.findSession(sid.from, sid.planId, sid.sessionIndex);
+
+ private static final int AUTO_READ_LOW_WATER_MARK = 1 << 15;
+ private static final int AUTO_READ_HIGH_WATER_MARK = 1 << 16;
+
+ private final InetSocketAddress remoteAddress;
+ private final int protocolVersion;
+
+ private final StreamSession session;
+
+ /**
+ * A collection of {@link ByteBuf}s that are yet to be processed. Incoming buffers are first dropped into this
+ * structure, and then consumed.
+ * <p>
+ * For thread safety, this structure's resources are released on the consuming thread
+ * (via {@link RebufferingByteBufDataInputPlus#close()},
+ * but the producing side calls {@link RebufferingByteBufDataInputPlus#markClose()} to notify the input that is should close.
+ */
+ private RebufferingByteBufDataInputPlus buffers;
+
+ private volatile boolean closed;
+
+ public StreamingInboundHandler(InetSocketAddress remoteAddress, int protocolVersion, @Nullable StreamSession session)
+ {
+ this.remoteAddress = remoteAddress;
+ this.protocolVersion = protocolVersion;
+ this.session = session;
+ }
+
+ @Override
+ @SuppressWarnings("resource")
+ public void handlerAdded(ChannelHandlerContext ctx)
+ {
+ buffers = new RebufferingByteBufDataInputPlus(AUTO_READ_LOW_WATER_MARK, AUTO_READ_HIGH_WATER_MARK, ctx.channel().config());
+ Thread blockingIOThread = new FastThreadLocalThread(new StreamDeserializingTask(DEFAULT_SESSION_PROVIDER, session, ctx.channel()),
+ String.format("Stream-Deserializer-%s-%s", remoteAddress.toString(), ctx.channel().id()));
+ blockingIOThread.setDaemon(true);
+ blockingIOThread.start();
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object message)
+ {
+ if (!closed && message instanceof ByteBuf)
+ buffers.append((ByteBuf) message);
+ else
+ ReferenceCountUtil.release(message);
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx)
+ {
+ close();
+ ctx.fireChannelInactive();
+ }
+
+ void close()
+ {
+ closed = true;
+ buffers.markClose();
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+ {
+ if (cause instanceof IOException)
+ logger.trace("connection problem while streaming", cause);
+ else
+ logger.warn("exception occurred while in processing streaming file", cause);
+ close();
+ }
+
+ /**
+ * For testing only!!
+ */
+ void setPendingBuffers(RebufferingByteBufDataInputPlus bufChannel)
+ {
+ this.buffers = bufChannel;
+ }
+
+ /**
+ * The task that performs the actual deserialization.
+ */
+ class StreamDeserializingTask implements Runnable
+ {
+ private final Function<SessionIdentifier, StreamSession> sessionProvider;
+ private final Channel channel;
+
+ @VisibleForTesting
+ StreamSession session;
+
+ StreamDeserializingTask(Function<SessionIdentifier, StreamSession> sessionProvider, StreamSession session, Channel channel)
+ {
+ this.sessionProvider = sessionProvider;
+ this.session = session;
+ this.channel = channel;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ while (true)
+ {
+ // do a check of available bytes and possibly sleep some amount of time (then continue).
+ // this way we can break out of run() sanely or we end up blocking indefintely in StreamMessage.deserialize()
+ while (buffers.available() == 0)
+ {
+ if (closed)
+ return;
+
+ Uninterruptibles.sleepUninterruptibly(400, TimeUnit.MILLISECONDS);
+ }
+
+ StreamMessage message = StreamMessage.deserialize(buffers, protocolVersion, null);
+
+ // keep-alives don't necessarily need to be tied to a session (they could be arrive before or after
+ // wrt session lifecycle, due to races), just log that we received the message and carry on
+ if (message instanceof KeepAliveMessage)
+ {
+ logger.debug("{} Received {}", createLogTag(session, channel), message);
+ continue;
+ }
+
+ if (session == null)
+ session = deriveSession(message);
+ logger.debug("{} Received {}", createLogTag(session, channel), message);
+ session.messageReceived(message);
+ }
+ }
+ catch (EOFException eof)
+ {
+ // ignore
+ }
+ catch (Throwable t)
+ {
+ JVMStabilityInspector.inspectThrowable(t);
+ if (session != null)
+ {
+ session.onError(t);
+ }
+ else if (t instanceof StreamReceiveException)
+ {
+ ((StreamReceiveException)t).session.onError(t);
+ }
+ else
+ {
+ logger.error("{} stream operation from {} failed", createLogTag(session, channel), remoteAddress, t);
+ }
+ }
+ finally
+ {
+ channel.close();
+ closed = true;
+
+ if (buffers != null)
+ buffers.close();
+ }
+ }
+
+ StreamSession deriveSession(StreamMessage message) throws IOException
+ {
+ StreamSession streamSession = null;
+ // StreamInitMessage starts a new channel, and IncomingFileMessage potentially, as well.
+ // IncomingFileMessage needs a session to be established a priori, though
+ if (message instanceof StreamInitMessage)
+ {
+ assert session == null : "initiator of stream session received a StreamInitMessage";
+ StreamInitMessage init = (StreamInitMessage) message;
+ StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.streamOperation, init.from, channel, init.keepSSTableLevel, init.pendingRepair, init.previewKind);
+ streamSession = sessionProvider.apply(new SessionIdentifier(init.from, init.planId, init.sessionIndex));
+ }
+ else if (message instanceof IncomingFileMessage)
+ {
+ // TODO: it'd be great to check if the session actually exists before slurping in the entire sstable,
+ // but that's a refactoring for another day
+ FileMessageHeader header = ((IncomingFileMessage) message).header;
+ streamSession = sessionProvider.apply(new SessionIdentifier(header.sender, header.planId, header.sessionIndex));
+ }
+
+ if (streamSession == null)
+ throw new IllegalStateException(createLogTag(null, channel) + " no session found for message " + message);
+
+ streamSession.attach(channel);
+ return streamSession;
+ }
+ }
+
+ /**
+ * A simple struct to wrap the data points required to lookup a {@link StreamSession}
+ */
+ static class SessionIdentifier
+ {
+ final InetAddress from;
+ final UUID planId;
+ final int sessionIndex;
+
+ SessionIdentifier(InetAddress from, UUID planId, int sessionIndex)
+ {
+ this.from = from;
+ this.planId = planId;
+ this.sessionIndex = sessionIndex;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/async/package-info.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/async/package-info.java b/src/java/org/apache/cassandra/streaming/async/package-info.java
new file mode 100644
index 0000000..ecf5115
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/async/package-info.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <h1>Non-blocking streaming with netty</h1>
+ * This document describes the implementation details of streaming protocol. A listener for a streaming
+ * session listens on the same socket as internode messaging, and participates in the same handshake protocol
+ * That protocol is described in the package-level documentation for {@link org.apache.cassandra.net.async}, and
+ * thus not here.
+ *
+ * Streaming 2.0 was implemented as CASSANDRA-5286. Streaming 2.0 used (the equivalent of) a single thread and
+ * a single socket to transfer sstables sequentially to a peer (either as part of a repair, bootstrap, and so on).
+ * Part of the motivation for switching to netty and a non-blocking model as to enable file transfers to occur
+ * in parallel for a given session.
+ *
+ * Thus, a more detailed approach is required for stream session management.
+ *
+ * <h2>Session setup and management</h2>
+ *
+ * The full details of the session lifecycle are documented in {@link org.apache.cassandra.streaming.StreamSession}.
+ *
+ *
+ * <h2>File transfer</h2>
+ *
+ * When tranferring whole or subsections of an sstable, only the DATA component is shipped. To that end,
+ * there are three "modes" of an sstable transfer that need to be handled somewhat differently:
+ *
+ * 1) uncompressed sstable - data needs to be read into user space so it can be manipulated: checksum validation,
+ * apply stream compression (see next section), and/or TLS encryption.
+ *
+ * 2) compressed sstable, transferred with SSL/TLS - data needs to be read into user space as that is where the TLS encryption
+ * needs to happen. Netty does not allow the pretense of doing zero-copy transfers when TLS is in the pipeline;
+ * data must explicitly be pulled into user-space memory for TLS encryption to work.
+ *
+ * 3) compressed sstable, transferred without SSL/TLS - data can be streamed via zero-copy transfer as the data does not
+ * need to be manipulated (it can be sent "as-is").
+ *
+ * <h3>Compressing the data</h3>
+ * We always want to transfer as few bytes as possible of the wire when streaming a file. If the
+ * sstable is not already compressed via table compression options, we apply an on-the-fly stream compression
+ * to the data. The stream compression format is documented in
+ * {@link org.apache.cassandra.streaming.async.StreamCompressionSerializer}
+ *
+ * You may be wondering: why implement your own compression scheme? why not use netty's built-in compression codecs,
+ * like {@link io.netty.handler.codec.compression.Lz4FrameEncoder}? That makes complete sense if all the sstables
+ * to be streamed are non using sstable compression (and obviously you wouldn't use stream compression when the sstables
+ * are using sstable compression). The problem is when you have a mix of files, some using sstable compression
+ * and some not. You can either:
+ *
+ * - send the files of one type over one kind of socket, and the others over another socket
+ * - send them both over the same socket, but then auto-adjust per each file type.
+ *
+ * I've opted for the latter to keep socket/channel management simpler and cleaner.
+ */
+package org.apache.cassandra.streaming.async;
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/compress/ByteBufCompressionDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/ByteBufCompressionDataOutputStreamPlus.java b/src/java/org/apache/cassandra/streaming/compress/ByteBufCompressionDataOutputStreamPlus.java
new file mode 100644
index 0000000..3f1b22b
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/compress/ByteBufCompressionDataOutputStreamPlus.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming.compress;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import io.netty.buffer.ByteBuf;
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
+import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
+import org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
+import org.apache.cassandra.streaming.async.StreamCompressionSerializer;
+import org.apache.cassandra.streaming.messages.StreamMessage;
+
+/**
+ * The intent of this class is to only be used in a very narrow use-case: on the stream compression path of streaming.
+ * This class should really only get calls to {@link #write(ByteBuffer)}, where the incoming buffer is compressed and sent
+ * downstream.
+ */
+public class ByteBufCompressionDataOutputStreamPlus extends WrappedDataOutputStreamPlus
+{
+ private final StreamRateLimiter limiter;
+ private final LZ4Compressor compressor;
+ private final StreamCompressionSerializer serializer;
+
+ public ByteBufCompressionDataOutputStreamPlus(DataOutputStreamPlus out, StreamRateLimiter limiter)
+ {
+ super(out);
+ assert out instanceof ByteBufDataOutputStreamPlus;
+ compressor = LZ4Factory.fastestInstance().fastCompressor();
+ serializer = new StreamCompressionSerializer(((ByteBufDataOutputStreamPlus)out).getAllocator());
+ this.limiter = limiter;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * Compress the incoming buffer and send the result downstream. The buffer parameter will not be used nor passed
+ * to downstream components, and thus callers can safely free the buffer upon return.
+ */
+ @Override
+ public void write(ByteBuffer buffer) throws IOException
+ {
+ ByteBuf compressed = serializer.serialize(compressor, buffer, StreamMessage.CURRENT_VERSION);
+
+ // this is a blocking call - you have been warned
+ limiter.acquire(compressed.readableBytes());
+
+ ((ByteBufDataOutputStreamPlus)out).writeToChannel(compressed);
+ }
+
+ @Override
+ public void close()
+ {
+ // explicitly overriding close() to avoid closing the wrapped stream; it will be closed via other means
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 003db61..76f76ea 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -19,7 +19,8 @@ package org.apache.cassandra.streaming.compress;
import java.io.EOFException;
import java.io.IOException;
-import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@@ -27,42 +28,45 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
import com.google.common.collect.Iterators;
-import com.google.common.primitives.Ints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.FastThreadLocalThread;
import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RebufferingInputStream;
+import org.apache.cassandra.streaming.StreamReader.StreamDeserializer;
import org.apache.cassandra.utils.ChecksumType;
import org.apache.cassandra.utils.WrappedRunnable;
/**
- * InputStream which reads data from underlining source with given {@link CompressionInfo}.
+ * InputStream which reads data from underlining source with given {@link CompressionInfo}. Uses {@link #buffer} as a buffer
+ * for uncompressed data (which is read by stream consumers - {@link StreamDeserializer} in this case).
*/
-public class CompressedInputStream extends InputStream
+public class CompressedInputStream extends RebufferingInputStream
{
private static final Logger logger = LoggerFactory.getLogger(CompressedInputStream.class);
private final CompressionInfo info;
// chunk buffer
- private final BlockingQueue<byte[]> dataBuffer;
+ private final BlockingQueue<ByteBuffer> dataBuffer;
private final Supplier<Double> crcCheckChanceSupplier;
- // uncompressed bytes
- private final byte[] buffer;
+ /**
+ * The base offset of the current {@link #buffer} from the beginning of the stream.
+ */
+ private long bufferOffset = 0;
- // offset from the beginning of the buffer
- protected long bufferOffset = 0;
- // current position in stream
+ /**
+ * The current {@link CompressedStreamReader#sections} offset in the stream.
+ */
private long current = 0;
- // number of bytes in the buffer that are actually valid
- protected int validBufferBytes = -1;
private final ChecksumType checksumType;
- // raw checksum bytes
- private final byte[] checksumBytes = new byte[4];
+ private static final int CHECKSUM_LENGTH = 4;
/**
* Indicates there was a problem when reading from source stream.
@@ -71,9 +75,9 @@ public class CompressedInputStream extends InputStream
* with the cause of the error when reading from source stream, so it is
* thrown to the consumer on subsequent read operation.
*/
- private static final byte[] POISON_PILL = new byte[0];
+ private static final ByteBuffer POISON_PILL = ByteBuffer.wrap(new byte[0]);
- protected volatile IOException readException = null;
+ private volatile IOException readException = null;
private long totalCompressedBytesRead;
@@ -81,11 +85,11 @@ public class CompressedInputStream extends InputStream
* @param source Input source to read compressed data from
* @param info Compression info
*/
- public CompressedInputStream(InputStream source, CompressionInfo info, ChecksumType checksumType, Supplier<Double> crcCheckChanceSupplier)
+ public CompressedInputStream(DataInputPlus source, CompressionInfo info, ChecksumType checksumType, Supplier<Double> crcCheckChanceSupplier)
{
+ super(ByteBuffer.allocateDirect(info.parameters.chunkLength()));
+ buffer.limit(buffer.position()); // force the buffer to appear "consumed" so that it triggers reBuffer on the first read
this.info = info;
- this.buffer = new byte[info.parameters.chunkLength()];
- // buffer is limited to store up to 1024 chunks
this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
this.crcCheckChanceSupplier = crcCheckChanceSupplier;
this.checksumType = checksumType;
@@ -93,19 +97,50 @@ public class CompressedInputStream extends InputStream
new FastThreadLocalThread(new Reader(source, info, dataBuffer)).start();
}
- private void decompressNextChunk() throws IOException
+ /**
+ * Invoked when crossing into the next stream boundary in {@link CompressedStreamReader#sections}.
+ */
+ public void position(long position) throws IOException
{
if (readException != null)
throw readException;
+ assert position >= current : "stream can only read forward.";
+ current = position;
+
+ if (current > bufferOffset + buffer.limit())
+ reBuffer(false);
+
+ buffer.position((int)(current - bufferOffset));
+ }
+
+ protected void reBuffer() throws IOException
+ {
+ reBuffer(true);
+ }
+
+ private void reBuffer(boolean updateCurrent) throws IOException
+ {
+ if (readException != null)
+ {
+ FileUtils.clean(buffer);
+ buffer = null;
+ throw readException;
+ }
+
+ // increment the offset into the stream based on the current buffer's read count
+ if (updateCurrent)
+ current += buffer.position();
+
try
{
- byte[] compressedWithCRC = dataBuffer.take();
+ ByteBuffer compressedWithCRC = dataBuffer.take();
if (compressedWithCRC == POISON_PILL)
{
assert readException != null;
throw readException;
}
+
decompress(compressedWithCRC);
}
catch (InterruptedException e)
@@ -114,74 +149,49 @@ public class CompressedInputStream extends InputStream
}
}
- @Override
- public int read() throws IOException
+ private void decompress(ByteBuffer compressed) throws IOException
{
- if (current >= bufferOffset + buffer.length || validBufferBytes == -1)
- decompressNextChunk();
-
- assert current >= bufferOffset && current < bufferOffset + validBufferBytes;
-
- return ((int) buffer[(int) (current++ - bufferOffset)]) & 0xff;
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException
- {
- long nextCurrent = current + len;
-
- if (current >= bufferOffset + buffer.length || validBufferBytes == -1)
- decompressNextChunk();
+ final int compressedChunkLength = info.parameters.chunkLength();
+ int length = compressed.remaining();
- assert nextCurrent >= bufferOffset;
-
- int read = 0;
- while (read < len)
+ // uncompress if the buffer size is less than chunk size. else, if the buffer size is equal to the compressedChunkLength,
+ // we assume the buffer is not compressed. see CASSANDRA-10520
+ final boolean releaseCompressedBuffer;
+ if (length - CHECKSUM_LENGTH < compressedChunkLength)
{
- int nextLen = Math.min((len - read), (int)((bufferOffset + validBufferBytes) - current));
-
- System.arraycopy(buffer, (int)(current - bufferOffset), b, off + read, nextLen);
- read += nextLen;
-
- current += nextLen;
- if (read != len)
- decompressNextChunk();
+ buffer.clear();
+ compressed.limit(length - CHECKSUM_LENGTH);
+ info.parameters.getSstableCompressor().uncompress(compressed, buffer);
+ buffer.flip();
+ releaseCompressedBuffer = true;
}
-
- return len;
- }
-
- public void position(long position)
- {
- assert position >= current : "stream can only read forward.";
- current = position;
- }
-
- private void decompress(byte[] compressed) throws IOException
- {
- // uncompress
- if (compressed.length - checksumBytes.length < info.parameters.maxCompressedLength())
- validBufferBytes = info.parameters.getSstableCompressor().uncompress(compressed, 0, compressed.length - checksumBytes.length, buffer, 0);
else
{
- validBufferBytes = compressed.length - checksumBytes.length;
- System.arraycopy(compressed, 0, buffer, 0, validBufferBytes);
+ FileUtils.clean(buffer);
+ buffer = compressed;
+ buffer.limit(length - CHECKSUM_LENGTH);
+ releaseCompressedBuffer = false;
}
- totalCompressedBytesRead += compressed.length;
+ totalCompressedBytesRead += length;
// validate crc randomly
double crcCheckChance = this.crcCheckChanceSupplier.get();
if (crcCheckChance > 0d && crcCheckChance > ThreadLocalRandom.current().nextDouble())
{
- int checksum = (int) checksumType.of(compressed, 0, compressed.length - checksumBytes.length);
+ ByteBuffer crcBuf = compressed.duplicate();
+ crcBuf.limit(length - CHECKSUM_LENGTH).position(0);
+ int checksum = (int) checksumType.of(crcBuf);
- System.arraycopy(compressed, compressed.length - checksumBytes.length, checksumBytes, 0, checksumBytes.length);
- if (Ints.fromByteArray(checksumBytes) != checksum)
+ crcBuf.limit(length);
+ if (crcBuf.getInt() != checksum)
throw new IOException("CRC unmatched");
}
+ if (releaseCompressedBuffer)
+ FileUtils.clean(compressed);
+
// buffer offset is always aligned
- bufferOffset = current & ~(buffer.length - 1);
+ bufferOffset = current & ~(compressedChunkLength - 1);
}
public long getTotalCompressedBytesRead()
@@ -189,13 +199,26 @@ public class CompressedInputStream extends InputStream
return totalCompressedBytesRead;
}
+ /**
+ * Releases the resources specific to this instance, but not the {@link DataInputPlus} that is used by the {@link Reader}.
+ */
+ @Override
+ public void close()
+ {
+ if (buffer != null)
+ {
+ FileUtils.clean(buffer);
+ buffer = null;
+ }
+ }
+
class Reader extends WrappedRunnable
{
- private final InputStream source;
+ private final DataInputPlus source;
private final Iterator<CompressionMetadata.Chunk> chunks;
- private final BlockingQueue<byte[]> dataBuffer;
+ private final BlockingQueue<ByteBuffer> dataBuffer;
- Reader(InputStream source, CompressionInfo info, BlockingQueue<byte[]> dataBuffer)
+ Reader(DataInputPlus source, CompressionInfo info, BlockingQueue<ByteBuffer> dataBuffer)
{
this.source = source;
this.chunks = Iterators.forArray(info.chunks);
@@ -204,36 +227,54 @@ public class CompressedInputStream extends InputStream
protected void runMayThrow() throws Exception
{
- byte[] compressedWithCRC;
+ byte[] tmp = null;
while (chunks.hasNext())
{
CompressionMetadata.Chunk chunk = chunks.next();
int readLength = chunk.length + 4; // read with CRC
- compressedWithCRC = new byte[readLength];
-
- int bufferRead = 0;
- while (bufferRead < readLength)
+ ByteBuffer compressedWithCRC = null;
+ try
{
- try
+ final int r;
+ if (source instanceof ReadableByteChannel)
{
- int r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
- if (r < 0)
- {
- readException = new EOFException("No chunk available");
- dataBuffer.put(POISON_PILL);
- return; // throw exception where we consume dataBuffer
- }
- bufferRead += r;
+ compressedWithCRC = ByteBuffer.allocateDirect(readLength);
+ r = ((ReadableByteChannel)source).read(compressedWithCRC);
+ compressedWithCRC.flip();
}
- catch (IOException e)
+ else
{
- logger.warn("Error while reading compressed input stream.", e);
- readException = e;
+ // read into an on-heap araay, then copy over to an off-heap buffer. at a minumum snappy requires
+ // off-heap buffers for decompression, else we could have just wrapped the plain byte array in a ByteBuffer
+ if (tmp == null || tmp.length < info.parameters.chunkLength() + CHECKSUM_LENGTH)
+ tmp = new byte[info.parameters.chunkLength() + CHECKSUM_LENGTH];
+ source.readFully(tmp, 0, readLength);
+ compressedWithCRC = ByteBuffer.allocateDirect(readLength);
+ compressedWithCRC.put(tmp, 0, readLength);
+ compressedWithCRC.position(0);
+ r = readLength;
+ }
+
+ if (r < 0)
+ {
+ FileUtils.clean(compressedWithCRC);
+ readException = new EOFException("No chunk available");
dataBuffer.put(POISON_PILL);
return; // throw exception where we consume dataBuffer
}
}
+ catch (IOException e)
+ {
+ if (!(e instanceof EOFException))
+ logger.warn("Error while reading compressed input stream.", e);
+ if (compressedWithCRC != null)
+ FileUtils.clean(compressedWithCRC);
+
+ readException = e;
+ dataBuffer.put(POISON_PILL);
+ return; // throw exception where we consume dataBuffer
+ }
dataBuffer.put(compressedWithCRC);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index f8e4b40..e40788b 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -18,8 +18,6 @@
package org.apache.cassandra.streaming.compress;
import java.io.IOException;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
import com.google.common.base.Throwables;
import org.slf4j.Logger;
@@ -28,7 +26,8 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
-import org.apache.cassandra.io.util.TrackedInputStream;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.TrackedDataInputPlus;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.StreamReader;
import org.apache.cassandra.streaming.StreamSession;
@@ -59,8 +58,8 @@ public class CompressedStreamReader extends StreamReader
* @throws java.io.IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
*/
@Override
- @SuppressWarnings("resource") // channel needs to remain open, streams on top of it can't be closed
- public SSTableMultiWriter read(ReadableByteChannel channel) throws IOException
+ @SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed
+ public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException
{
long totalSize = totalSize();
@@ -76,9 +75,9 @@ public class CompressedStreamReader extends StreamReader
session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(), pendingRepair,
cfs.getTableName());
- CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo,
+ CompressedInputStream cis = new CompressedInputStream(inputPlus, compressionInfo,
ChecksumType.CRC32, cfs::getCrcCheckChance);
- TrackedInputStream in = new TrackedInputStream(cis);
+ TrackedDataInputPlus in = new TrackedDataInputPlus(cis);
StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata()));
SSTableMultiWriter writer = null;
@@ -120,6 +119,10 @@ public class CompressedStreamReader extends StreamReader
throw e;
throw Throwables.propagate(e);
}
+ finally
+ {
+ cis.close();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
index 185ab22..0e78b7d 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.streaming.compress;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -30,6 +31,8 @@ import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.StreamWriter;
@@ -41,7 +44,7 @@ import org.apache.cassandra.utils.Pair;
*/
public class CompressedStreamWriter extends StreamWriter
{
- public static final int CHUNK_SIZE = 10 * 1024 * 1024;
+ private static final int CHUNK_SIZE = 1 << 16;
private static final Logger logger = LoggerFactory.getLogger(CompressedStreamWriter.class);
@@ -56,6 +59,8 @@ public class CompressedStreamWriter extends StreamWriter
@Override
public void write(DataOutputStreamPlus out) throws IOException
{
+ assert out instanceof ByteBufDataOutputStreamPlus;
+ ByteBufDataOutputStreamPlus output = (ByteBufDataOutputStreamPlus)out;
long totalSize = totalSize();
logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(),
sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize);
@@ -79,10 +84,24 @@ public class CompressedStreamWriter extends StreamWriter
long bytesTransferred = 0;
while (bytesTransferred < length)
{
- final long bytesTransferredFinal = bytesTransferred;
final int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred);
limiter.acquire(toTransfer);
- long lastWrite = out.applyToChannel((wbc) -> fc.transferTo(section.left + bytesTransferredFinal, toTransfer, wbc));
+
+ ByteBuffer outBuffer = ByteBuffer.allocateDirect(toTransfer);
+ long lastWrite;
+ try
+ {
+ lastWrite = fc.read(outBuffer, section.left + bytesTransferred);
+ assert lastWrite == toTransfer : String.format("could not read required number of bytes from file to be streamed: read %d bytes, wanted %d bytes", lastWrite, toTransfer);
+ outBuffer.flip();
+ output.writeToChannel(outBuffer);
+ }
+ catch (IOException e)
+ {
+ FileUtils.clean(outBuffer);
+ throw e;
+ }
+
bytesTransferred += lastWrite;
progress += lastWrite;
session.progress(sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress, totalSize);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/compress/StreamCompressionInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/StreamCompressionInputStream.java b/src/java/org/apache/cassandra/streaming/compress/StreamCompressionInputStream.java
new file mode 100644
index 0000000..4b1459d
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/compress/StreamCompressionInputStream.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming.compress;
+
+import java.io.IOException;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.RebufferingInputStream;
+import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus;
+import org.apache.cassandra.streaming.async.StreamCompressionSerializer;
+
+public class StreamCompressionInputStream extends RebufferingInputStream
+{
+ /**
+ * The stream which contains buffers of compressed data that came from the peer.
+ */
+ private final DataInputPlus dataInputPlus;
+
+ private final LZ4FastDecompressor decompressor;
+ private final int protocolVersion;
+ private final StreamCompressionSerializer deserializer;
+
+ /**
+ * The parent, or owning, buffer of the current buffer being read from ({@link super#buffer}).
+ */
+ private ByteBuf currentBuf;
+
+ public StreamCompressionInputStream(DataInputPlus dataInputPlus, int protocolVersion)
+ {
+ super(Unpooled.EMPTY_BUFFER.nioBuffer());
+ currentBuf = Unpooled.EMPTY_BUFFER;
+
+ this.dataInputPlus = dataInputPlus;
+ this.protocolVersion = protocolVersion;
+ this.decompressor = LZ4Factory.fastestInstance().fastDecompressor();
+
+ ByteBufAllocator allocator = dataInputPlus instanceof RebufferingByteBufDataInputPlus
+ ? ((RebufferingByteBufDataInputPlus)dataInputPlus).getAllocator()
+ : PooledByteBufAllocator.DEFAULT;
+ deserializer = new StreamCompressionSerializer(allocator);
+ }
+
+ @Override
+ public void reBuffer() throws IOException
+ {
+ currentBuf.release();
+ currentBuf = deserializer.deserialize(decompressor, dataInputPlus, protocolVersion);
+ buffer = currentBuf.nioBuffer(0, currentBuf.readableBytes());
+ }
+
+ @Override
+ public void close()
+ {
+ currentBuf.release();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java b/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
index 44ff553..81e16f7 100644
--- a/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
@@ -17,8 +17,7 @@
*/
package org.apache.cassandra.streaming.messages;
-import java.nio.channels.ReadableByteChannel;
-
+import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.streaming.StreamSession;
@@ -26,12 +25,17 @@ public class CompleteMessage extends StreamMessage
{
public static Serializer<CompleteMessage> serializer = new Serializer<CompleteMessage>()
{
- public CompleteMessage deserialize(ReadableByteChannel in, int version, StreamSession session)
+ public CompleteMessage deserialize(DataInputPlus in, int version, StreamSession session)
{
return new CompleteMessage();
}
public void serialize(CompleteMessage message, DataOutputStreamPlus out, int version, StreamSession session) {}
+
+ public long serializedSize(CompleteMessage message, int version)
+ {
+ return 0;
+ }
};
public CompleteMessage()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index c65e1d4..fedb971 100644
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.streaming.messages;
import java.io.IOException;
+import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@@ -29,7 +30,10 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.compress.CompressionInfo;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.UUIDSerializer;
@@ -42,6 +46,8 @@ public class FileMessageHeader
public static FileMessageHeaderSerializer serializer = new FileMessageHeaderSerializer();
public final TableId tableId;
+ public UUID planId;
+ public int sessionIndex;
public final int sequenceNumber;
/** SSTable version */
public final Version version;
@@ -61,11 +67,15 @@ public class FileMessageHeader
public final UUID pendingRepair;
public final int sstableLevel;
public final SerializationHeader.Component header;
+ public final InetAddress sender;
/* cached size value */
private transient final long size;
- public FileMessageHeader(TableId tableId,
+ private FileMessageHeader(TableId tableId,
+ InetAddress sender,
+ UUID planId,
+ int sessionIndex,
int sequenceNumber,
Version version,
SSTableFormat.Type format,
@@ -78,6 +88,9 @@ public class FileMessageHeader
SerializationHeader.Component header)
{
this.tableId = tableId;
+ this.sender = sender;
+ this.planId = planId;
+ this.sessionIndex = sessionIndex;
this.sequenceNumber = sequenceNumber;
this.version = version;
this.format = format;
@@ -93,6 +106,9 @@ public class FileMessageHeader
}
public FileMessageHeader(TableId tableId,
+ InetAddress sender,
+ UUID planId,
+ int sessionIndex,
int sequenceNumber,
Version version,
SSTableFormat.Type format,
@@ -105,6 +121,9 @@ public class FileMessageHeader
SerializationHeader.Component header)
{
this.tableId = tableId;
+ this.sender = sender;
+ this.planId = planId;
+ this.sessionIndex = sessionIndex;
this.sequenceNumber = sequenceNumber;
this.version = version;
this.format = format;
@@ -188,11 +207,20 @@ public class FileMessageHeader
return result;
}
+ public void addSessionInfo(StreamSession session)
+ {
+ planId = session.planId();
+ sessionIndex = session.sessionIndex();
+ }
+
static class FileMessageHeaderSerializer
{
public CompressionInfo serialize(FileMessageHeader header, DataOutputPlus out, int version) throws IOException
{
header.tableId.serialize(out);
+ CompactEndpointSerializationHelper.serialize(header.sender, out);
+ UUIDSerializer.serializer.serialize(header.planId, out, version);
+ out.writeInt(header.sessionIndex);
out.writeInt(header.sequenceNumber);
out.writeUTF(header.version.toString());
out.writeUTF(header.format.name);
@@ -224,6 +252,9 @@ public class FileMessageHeader
public FileMessageHeader deserialize(DataInputPlus in, int version) throws IOException
{
TableId tableId = TableId.deserialize(in);
+ InetAddress sender = CompactEndpointSerializationHelper.deserialize(in);
+ UUID planId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
+ int sessionIndex = in.readInt();
int sequenceNumber = in.readInt();
Version sstableVersion = SSTableFormat.Type.current().info.getVersion(in.readUTF());
SSTableFormat.Type format = SSTableFormat.Type.validate(in.readUTF());
@@ -239,12 +270,15 @@ public class FileMessageHeader
int sstableLevel = in.readInt();
SerializationHeader.Component header = SerializationHeader.serializer.deserialize(sstableVersion, in);
- return new FileMessageHeader(tableId, sequenceNumber, sstableVersion, format, estimatedKeys, sections, compressionInfo, repairedAt, pendingRepair, sstableLevel, header);
+ return new FileMessageHeader(tableId, sender, planId, sessionIndex, sequenceNumber, sstableVersion, format, estimatedKeys, sections, compressionInfo, repairedAt, pendingRepair, sstableLevel, header);
}
public long serializedSize(FileMessageHeader header, int version)
{
long size = header.tableId.serializedSize();
+ size += CompactEndpointSerializationHelper.serializedSize(header.sender);
+ size += UUIDSerializer.serializer.serializedSize(header.planId, version);
+ size += TypeSizes.sizeof(header.sessionIndex);
size += TypeSizes.sizeof(header.sequenceNumber);
size += TypeSizes.sizeof(header.version.toString());
size += TypeSizes.sizeof(header.format.name);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
index 03bcaed..9f43982 100644
--- a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
@@ -18,15 +18,15 @@
package org.apache.cassandra.streaming.messages;
import java.io.IOException;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
+import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.streaming.StreamManager;
import org.apache.cassandra.streaming.StreamReader;
+import org.apache.cassandra.streaming.StreamReceiveException;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.compress.CompressedStreamReader;
import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -40,21 +40,27 @@ public class IncomingFileMessage extends StreamMessage
public static Serializer<IncomingFileMessage> serializer = new Serializer<IncomingFileMessage>()
{
@SuppressWarnings("resource")
- public IncomingFileMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
+ public IncomingFileMessage deserialize(DataInputPlus input, int version, StreamSession session) throws IOException
{
- DataInputPlus input = new DataInputStreamPlus(Channels.newInputStream(in));
FileMessageHeader header = FileMessageHeader.serializer.deserialize(input, version);
+ session = StreamManager.instance.findSession(header.sender, header.planId, header.sessionIndex);
+ if (session == null)
+ throw new IllegalStateException(String.format("unknown stream session: %s - %d", header.planId, header.sessionIndex));
+ ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(header.tableId);
+ if (cfs == null)
+ throw new StreamReceiveException(session, "CF " + header.tableId + " was dropped during streaming");
+
StreamReader reader = !header.isCompressed() ? new StreamReader(header, session)
- : new CompressedStreamReader(header, session);
+ : new CompressedStreamReader(header, session);
try
{
- return new IncomingFileMessage(reader.read(in), header);
+ return new IncomingFileMessage(reader.read(input), header);
}
catch (Throwable t)
{
JVMStabilityInspector.inspectThrowable(t);
- throw t;
+ throw new StreamReceiveException(session, t);
}
}
@@ -62,6 +68,11 @@ public class IncomingFileMessage extends StreamMessage
{
throw new UnsupportedOperationException("Not allowed to call serialize on an incoming file");
}
+
+ public long serializedSize(IncomingFileMessage message, int version)
+ {
+ throw new UnsupportedOperationException("Not allowed to call serializedSize on an incoming file");
+ }
};
public FileMessageHeader header;
@@ -77,7 +88,8 @@ public class IncomingFileMessage extends StreamMessage
@Override
public String toString()
{
- return "File (" + header + ", file: " + sstable.getFilename() + ")";
+ String filename = sstable != null ? sstable.getFilename() : null;
+ return "File (" + header + ", file: " + filename + ")";
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java b/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java
index bfdc72e..f80c617 100644
--- a/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java
@@ -19,8 +19,8 @@
package org.apache.cassandra.streaming.messages;
import java.io.IOException;
-import java.nio.channels.ReadableByteChannel;
+import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.streaming.StreamSession;
@@ -28,13 +28,18 @@ public class KeepAliveMessage extends StreamMessage
{
public static Serializer<KeepAliveMessage> serializer = new Serializer<KeepAliveMessage>()
{
- public KeepAliveMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
+ public KeepAliveMessage deserialize(DataInputPlus in, int version, StreamSession session) throws IOException
{
return new KeepAliveMessage();
}
public void serialize(KeepAliveMessage message, DataOutputStreamPlus out, int version, StreamSession session)
{}
+
+ public long serializedSize(KeepAliveMessage message, int version)
+ {
+ return 0;
+ }
};
public KeepAliveMessage()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index e3e6b9b..f44b41c 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -18,17 +18,18 @@
package org.apache.cassandra.streaming.messages;
import java.io.IOException;
-import java.nio.channels.ReadableByteChannel;
import java.util.List;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.StreamWriter;
import org.apache.cassandra.streaming.compress.CompressedStreamWriter;
import org.apache.cassandra.streaming.compress.CompressionInfo;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Ref;
@@ -39,7 +40,7 @@ public class OutgoingFileMessage extends StreamMessage
{
public static Serializer<OutgoingFileMessage> serializer = new Serializer<OutgoingFileMessage>()
{
- public OutgoingFileMessage deserialize(ReadableByteChannel in, int version, StreamSession session)
+ public OutgoingFileMessage deserialize(DataInputPlus in, int version, StreamSession session)
{
throw new UnsupportedOperationException("Not allowed to call deserialize on an outgoing file");
}
@@ -57,6 +58,11 @@ public class OutgoingFileMessage extends StreamMessage
message.finishTransfer();
}
}
+
+ public long serializedSize(OutgoingFileMessage message, int version)
+ {
+ return 0;
+ }
};
public final FileMessageHeader header;
@@ -65,7 +71,7 @@ public class OutgoingFileMessage extends StreamMessage
private boolean completed = false;
private boolean transferring = false;
- public OutgoingFileMessage(Ref<SSTableReader> ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, boolean keepSSTableLevel)
+ public OutgoingFileMessage(Ref<SSTableReader> ref, StreamSession session, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, boolean keepSSTableLevel)
{
super(Type.FILE);
this.ref = ref;
@@ -73,6 +79,9 @@ public class OutgoingFileMessage extends StreamMessage
SSTableReader sstable = ref.get();
filename = sstable.getFilename();
this.header = new FileMessageHeader(sstable.metadata().id,
+ FBUtilities.getBroadcastAddress(),
+ session.planId(),
+ session.sessionIndex(),
sequenceNumber,
sstable.descriptor.version,
sstable.descriptor.formatType,
@@ -93,12 +102,12 @@ public class OutgoingFileMessage extends StreamMessage
}
CompressionInfo compressionInfo = FileMessageHeader.serializer.serialize(header, out, version);
-
+ out.flush();
final SSTableReader reader = ref.get();
StreamWriter writer = compressionInfo == null ?
- new StreamWriter(reader, header.sections, session) :
- new CompressedStreamWriter(reader, header.sections,
- compressionInfo, session);
+ new StreamWriter(reader, header.sections, session) :
+ new CompressedStreamWriter(reader, header.sections,
+ compressionInfo, session);
writer.write(out);
}
@@ -140,5 +149,10 @@ public class OutgoingFileMessage extends StreamMessage
{
return "File (" + header + ", file: " + filename + ")";
}
+
+ public String getFilename()
+ {
+ return filename;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/messages/PrepareAckMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/PrepareAckMessage.java b/src/java/org/apache/cassandra/streaming/messages/PrepareAckMessage.java
new file mode 100644
index 0000000..f43ff01
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/PrepareAckMessage.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming.messages;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.streaming.StreamSession;
+
+public class PrepareAckMessage extends StreamMessage
+{
+ public static Serializer<PrepareAckMessage> serializer = new Serializer<PrepareAckMessage>()
+ {
+ public void serialize(PrepareAckMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
+ {
+ //nop
+ }
+
+ public PrepareAckMessage deserialize(DataInputPlus in, int version, StreamSession session) throws IOException
+ {
+ return new PrepareAckMessage();
+ }
+
+ public long serializedSize(PrepareAckMessage message, int version)
+ {
+ return 0;
+ }
+ };
+
+ public PrepareAckMessage()
+ {
+ super(Type.PREPARE_ACK);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Prepare ACK";
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java b/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
deleted file mode 100644
index 1f53be7..0000000
--- a/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming.messages;
-
-import java.io.*;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.util.ArrayList;
-import java.util.Collection;
-
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
-import org.apache.cassandra.io.util.DataOutputStreamPlus;
-import org.apache.cassandra.streaming.StreamRequest;
-import org.apache.cassandra.streaming.StreamSession;
-import org.apache.cassandra.streaming.StreamSummary;
-
-public class PrepareMessage extends StreamMessage
-{
- public static Serializer<PrepareMessage> serializer = new Serializer<PrepareMessage>()
- {
- @SuppressWarnings("resource") // Not closing constructed DataInputPlus's as the channel needs to remain open.
- public PrepareMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
- {
- DataInputPlus input = new DataInputStreamPlus(Channels.newInputStream(in));
- PrepareMessage message = new PrepareMessage();
- // requests
- int numRequests = input.readInt();
- for (int i = 0; i < numRequests; i++)
- message.requests.add(StreamRequest.serializer.deserialize(input, version));
- // summaries
- int numSummaries = input.readInt();
- for (int i = 0; i < numSummaries; i++)
- message.summaries.add(StreamSummary.serializer.deserialize(input, version));
- return message;
- }
-
- public void serialize(PrepareMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
- {
- // requests
- out.writeInt(message.requests.size());
- for (StreamRequest request : message.requests)
- StreamRequest.serializer.serialize(request, out, version);
- // summaries
- out.writeInt(message.summaries.size());
- for (StreamSummary summary : message.summaries)
- StreamSummary.serializer.serialize(summary, out, version);
- }
- };
-
- /**
- * Streaming requests
- */
- public final Collection<StreamRequest> requests = new ArrayList<>();
-
- /**
- * Summaries of streaming out
- */
- public final Collection<StreamSummary> summaries = new ArrayList<>();
-
- public PrepareMessage()
- {
- super(Type.PREPARE);
- }
-
- @Override
- public String toString()
- {
- final StringBuilder sb = new StringBuilder("Prepare (");
- sb.append(requests.size()).append(" requests, ");
- int totalFile = 0;
- for (StreamSummary summary : summaries)
- totalFile += summary.files;
- sb.append(" ").append(totalFile).append(" files");
- sb.append('}');
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/messages/PrepareSynAckMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/PrepareSynAckMessage.java b/src/java/org/apache/cassandra/streaming/messages/PrepareSynAckMessage.java
new file mode 100644
index 0000000..2d8026c
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/PrepareSynAckMessage.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming.messages;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.StreamSummary;
+
+public class PrepareSynAckMessage extends StreamMessage
+{
+ public static Serializer<PrepareSynAckMessage> serializer = new Serializer<PrepareSynAckMessage>()
+ {
+ public void serialize(PrepareSynAckMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
+ {
+ out.writeInt(message.summaries.size());
+ for (StreamSummary summary : message.summaries)
+ StreamSummary.serializer.serialize(summary, out, version);
+ }
+
+ public PrepareSynAckMessage deserialize(DataInputPlus input, int version, StreamSession session) throws IOException
+ {
+ PrepareSynAckMessage message = new PrepareSynAckMessage();
+ int numSummaries = input.readInt();
+ for (int i = 0; i < numSummaries; i++)
+ message.summaries.add(StreamSummary.serializer.deserialize(input, version));
+ return message;
+ }
+
+ public long serializedSize(PrepareSynAckMessage message, int version)
+ {
+ long size = 4; // count of requests and count of summaries
+ for (StreamSummary summary : message.summaries)
+ size += StreamSummary.serializer.serializedSize(summary, version);
+ return size;
+ }
+ };
+
+ /**
+ * Summaries of streaming out
+ */
+ public final Collection<StreamSummary> summaries = new ArrayList<>();
+
+ public PrepareSynAckMessage()
+ {
+ super(Type.PREPARE_SYNACK);
+ }
+
+ @Override
+ public String toString()
+ {
+ final StringBuilder sb = new StringBuilder("Prepare SYNACK (");
+ int totalFile = 0;
+ for (StreamSummary summary : summaries)
+ totalFile += summary.files;
+ sb.append(" ").append(totalFile).append(" files");
+ sb.append('}');
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/messages/PrepareSynMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/PrepareSynMessage.java b/src/java/org/apache/cassandra/streaming/messages/PrepareSynMessage.java
new file mode 100644
index 0000000..6fbaafa
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/PrepareSynMessage.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.messages;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.streaming.StreamRequest;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.StreamSummary;
+
+public class PrepareSynMessage extends StreamMessage
+{
+ public static Serializer<PrepareSynMessage> serializer = new Serializer<PrepareSynMessage>()
+ {
+ public PrepareSynMessage deserialize(DataInputPlus input, int version, StreamSession session) throws IOException
+ {
+ PrepareSynMessage message = new PrepareSynMessage();
+ // requests
+ int numRequests = input.readInt();
+ for (int i = 0; i < numRequests; i++)
+ message.requests.add(StreamRequest.serializer.deserialize(input, version));
+ // summaries
+ int numSummaries = input.readInt();
+ for (int i = 0; i < numSummaries; i++)
+ message.summaries.add(StreamSummary.serializer.deserialize(input, version));
+ return message;
+ }
+
+ public long serializedSize(PrepareSynMessage message, int version)
+ {
+ long size = 4 + 4; // count of requests and count of summaries
+ for (StreamRequest request : message.requests)
+ size += StreamRequest.serializer.serializedSize(request, version);
+ for (StreamSummary summary : message.summaries)
+ size += StreamSummary.serializer.serializedSize(summary, version);
+ return size;
+ }
+
+ public void serialize(PrepareSynMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
+ {
+ // requests
+ out.writeInt(message.requests.size());
+ for (StreamRequest request : message.requests)
+ StreamRequest.serializer.serialize(request, out, version);
+ // summaries
+ out.writeInt(message.summaries.size());
+ for (StreamSummary summary : message.summaries)
+ StreamSummary.serializer.serialize(summary, out, version);
+ }
+ };
+
+ /**
+ * Streaming requests
+ */
+ public final Collection<StreamRequest> requests = new ArrayList<>();
+
+ /**
+ * Summaries of streaming out
+ */
+ public final Collection<StreamSummary> summaries = new ArrayList<>();
+
+ public PrepareSynMessage()
+ {
+ super(Type.PREPARE_SYN);
+ }
+
+ @Override
+ public String toString()
+ {
+ final StringBuilder sb = new StringBuilder("Prepare SYN (");
+ sb.append(requests.size()).append(" requests, ");
+ int totalFile = 0;
+ for (StreamSummary summary : summaries)
+ totalFile += summary.files;
+ sb.append(" ").append(totalFile).append(" files");
+ sb.append('}');
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
index 55dd7e6..3988dcc 100644
--- a/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/ReceivedMessage.java
@@ -18,11 +18,8 @@
package org.apache.cassandra.streaming.messages;
import java.io.*;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.streaming.StreamSession;
@@ -32,9 +29,8 @@ public class ReceivedMessage extends StreamMessage
public static Serializer<ReceivedMessage> serializer = new Serializer<ReceivedMessage>()
{
@SuppressWarnings("resource") // Not closing constructed DataInputPlus's as the channel needs to remain open.
- public ReceivedMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
+ public ReceivedMessage deserialize(DataInputPlus input, int version, StreamSession session) throws IOException
{
- DataInputPlus input = new DataInputStreamPlus(Channels.newInputStream(in));
return new ReceivedMessage(TableId.deserialize(input), input.readInt());
}
@@ -43,6 +39,11 @@ public class ReceivedMessage extends StreamMessage
message.tableId.serialize(out);
out.writeInt(message.sequenceNumber);
}
+
+ public long serializedSize(ReceivedMessage message, int version)
+ {
+ return message.tableId.serializedSize() + 4;
+ }
};
public final TableId tableId;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java b/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
deleted file mode 100644
index 047fb06..0000000
--- a/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming.messages;
-
-import java.io.*;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.util.UUID;
-
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
-import org.apache.cassandra.io.util.DataOutputStreamPlus;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.streaming.StreamSession;
-import org.apache.cassandra.utils.UUIDSerializer;
-
-/**
- * @deprecated retry support removed on CASSANDRA-10992
- */
-@Deprecated
-public class RetryMessage extends StreamMessage
-{
- public static Serializer<RetryMessage> serializer = new Serializer<RetryMessage>()
- {
- @SuppressWarnings("resource") // Not closing constructed DataInputPlus's as the channel needs to remain open.
- public RetryMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
- {
- DataInputPlus input = new DataInputStreamPlus(Channels.newInputStream(in));
- return new RetryMessage(UUIDSerializer.serializer.deserialize(input, MessagingService.current_version), input.readInt());
- }
-
- public void serialize(RetryMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
- {
- UUIDSerializer.serializer.serialize(message.cfId, out, MessagingService.current_version);
- out.writeInt(message.sequenceNumber);
- }
- };
-
- public final UUID cfId;
- public final int sequenceNumber;
-
- public RetryMessage(UUID cfId, int sequenceNumber)
- {
- super(Type.RETRY);
- this.cfId = cfId;
- this.sequenceNumber = sequenceNumber;
- }
-
- @Override
- public String toString()
- {
- final StringBuilder sb = new StringBuilder("Retry (");
- sb.append(cfId).append(", #").append(sequenceNumber).append(')');
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java b/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
index 4a5b6df..59ad90e 100644
--- a/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
@@ -17,8 +17,7 @@
*/
package org.apache.cassandra.streaming.messages;
-import java.nio.channels.ReadableByteChannel;
-
+import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.streaming.StreamSession;
@@ -26,12 +25,17 @@ public class SessionFailedMessage extends StreamMessage
{
public static Serializer<SessionFailedMessage> serializer = new Serializer<SessionFailedMessage>()
{
- public SessionFailedMessage deserialize(ReadableByteChannel in, int version, StreamSession session)
+ public SessionFailedMessage deserialize(DataInputPlus in, int version, StreamSession session)
{
return new SessionFailedMessage();
}
public void serialize(SessionFailedMessage message, DataOutputStreamPlus out, int version, StreamSession session) {}
+
+ public long serializedSize(SessionFailedMessage message, int version)
+ {
+ return 0;
+ }
};
public SessionFailedMessage()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org