You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2017/08/22 20:55:10 UTC
[03/11] cassandra git commit: move streaming to use netty
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 3a95015..590ba5f 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -18,8 +18,6 @@
package org.apache.cassandra.streaming;
import java.io.*;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
import java.util.Collection;
import java.util.UUID;
@@ -30,8 +28,7 @@ import com.google.common.collect.UnmodifiableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.ning.compress.lzf.LZFInputStream;
-
+import org.apache.cassandra.io.util.TrackedDataInputPlus;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.*;
@@ -42,9 +39,10 @@ import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.streaming.compress.StreamCompressionInputStream;
import org.apache.cassandra.streaming.messages.FileMessageHeader;
+import org.apache.cassandra.streaming.messages.StreamMessage;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.io.util.TrackedInputStream;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
@@ -88,12 +86,12 @@ public class StreamReader
}
/**
- * @param channel where this reads data from
+ * @param inputPlus where this reads data from
* @return SSTable transferred
* @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
*/
- @SuppressWarnings("resource") // channel needs to remain open, streams on top of it can't be closed
- public SSTableMultiWriter read(ReadableByteChannel channel) throws IOException
+ @SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed
+ public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException
{
long totalSize = totalSize();
@@ -108,7 +106,8 @@ public class StreamReader
session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(),
cfs.getTableName(), pendingRepair);
- TrackedInputStream in = new TrackedInputStream(new LZFInputStream(Channels.newInputStream(channel)));
+
+ TrackedDataInputPlus in = new TrackedDataInputPlus(new StreamCompressionInputStream(inputPlus, StreamMessage.CURRENT_VERSION));
StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata()));
SSTableMultiWriter writer = null;
try
@@ -179,10 +178,10 @@ public class StreamReader
private Row staticRow;
private IOException exception;
- public StreamDeserializer(TableMetadata metadata, InputStream in, Version version, SerializationHeader header) throws IOException
+ public StreamDeserializer(TableMetadata metadata, DataInputPlus in, Version version, SerializationHeader header) throws IOException
{
this.metadata = metadata;
- this.in = new DataInputPlus.DataInputStreamPlus(in);
+ this.in = in;
this.helper = new SerializationHelper(metadata, version.correspondingMessagingVersion(), SerializationHelper.Flag.PRESERVE_SIZE);
this.header = header;
}
@@ -256,8 +255,8 @@ public class StreamReader
// to what we do in hasNext)
Unfiltered unfiltered = iterator.next();
return metadata.isCounter() && unfiltered.kind() == Unfiltered.Kind.ROW
- ? maybeMarkLocalToBeCleared((Row) unfiltered)
- : unfiltered;
+ ? maybeMarkLocalToBeCleared((Row) unfiltered)
+ : unfiltered;
}
private Row maybeMarkLocalToBeCleared(Row row)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/StreamReceiveException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveException.java b/src/java/org/apache/cassandra/streaming/StreamReceiveException.java
new file mode 100644
index 0000000..54b365a
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+public class StreamReceiveException extends RuntimeException
+{
+ public final StreamSession session;
+
+ public StreamReceiveException(StreamSession session, String msg)
+ {
+ super(msg);
+ this.session = session;
+ }
+
+ public StreamReceiveException(StreamSession session, Throwable t)
+ {
+ super(t);
+ this.session = session;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 925dc85..6aa70ad 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -104,6 +104,7 @@ public class StreamReceiveTask extends StreamTask
remoteSSTablesReceived++;
assert tableId.equals(sstable.getTableId());
+ logger.debug("recevied {} of {} total files", remoteSSTablesReceived, totalFiles);
Collection<SSTableReader> finished = null;
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index 67d7d0d..9371c65 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -17,8 +17,9 @@
*/
package org.apache.cassandra.streaming;
-import java.io.IOException;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -27,7 +28,7 @@ import com.google.common.util.concurrent.Futures;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.net.IncomingStreamingConnection;
+import io.netty.channel.Channel;
import org.apache.cassandra.utils.FBUtilities;
/**
@@ -103,12 +104,10 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
UUID planId,
StreamOperation streamOperation,
InetAddress from,
- IncomingStreamingConnection connection,
- boolean isForOutgoing,
- int version,
+ Channel channel,
boolean keepSSTableLevel,
UUID pendingRepair,
- PreviewKind previewKind) throws IOException
+ PreviewKind previewKind)
{
StreamResultFuture future = StreamManager.instance.getReceivingStream(planId);
if (future == null)
@@ -119,7 +118,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
future = new StreamResultFuture(planId, streamOperation, keepSSTableLevel, pendingRepair, previewKind);
StreamManager.instance.registerReceiving(future);
}
- future.attachConnection(from, sessionIndex, connection, isForOutgoing, version);
+ future.attachConnection(from, sessionIndex, channel);
logger.info("[Stream #{}, ID#{}] Received streaming plan for {}", planId, sessionIndex, streamOperation.getDescription());
return future;
}
@@ -131,11 +130,18 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
return future;
}
- private void attachConnection(InetAddress from, int sessionIndex, IncomingStreamingConnection connection, boolean isForOutgoing, int version) throws IOException
+ public StreamCoordinator getCoordinator()
{
- StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex, connection.socket.getInetAddress());
+ return coordinator;
+ }
+
+ private void attachConnection(InetAddress from, int sessionIndex, Channel channel)
+ {
+ SocketAddress addr = channel.remoteAddress();
+ InetAddress connecting = (addr instanceof InetSocketAddress ? ((InetSocketAddress) addr).getAddress() : from);
+ StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex, connecting);
session.init(this);
- session.handler.initiateOnReceivingSide(connection, isForOutgoing, version);
+ session.attach(channel);
}
public void addEventListener(StreamEventHandler listener)
@@ -206,6 +212,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
private synchronized void maybeComplete()
{
+ logger.warn("[Stream #{}] maybeComplete", planId);
if (!coordinator.hasActiveSessions())
{
StreamState finalState = getCurrentState();
@@ -221,4 +228,9 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
}
}
}
+
+ StreamSession getSession(InetAddress peer, int sessionIndex)
+ {
+ return coordinator.getSessionById(peer, sessionIndex);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 5ca9938..0381416 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -17,9 +17,8 @@
*/
package org.apache.cassandra.streaming;
-import java.io.IOException;
import java.net.InetAddress;
-import java.net.Socket;
+import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.util.*;
import java.util.concurrent.*;
@@ -29,17 +28,21 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.*;
+import com.google.common.util.concurrent.Futures;
-import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.DatabaseDescriptor;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelId;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.PartitionPosition;
@@ -47,10 +50,12 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.*;
import org.apache.cassandra.metrics.StreamingMetrics;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.async.OutboundConnectionIdentifier;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.async.NettyStreamingMessageSender;
import org.apache.cassandra.streaming.messages.*;
-import org.apache.cassandra.utils.CassandraVersion;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
@@ -59,79 +64,80 @@ import org.apache.cassandra.utils.concurrent.Refs;
/**
* Handles the streaming a one or more section of one of more sstables to and from a specific
- * remote node.
+ * remote node. The sending side performs a block-level transfer of the source sstable, while the receiver
+ * must deserilaize that data stream into an partitions and rows, and then write that out as an sstable.
*
- * Both this node and the remote one will create a similar symmetrical StreamSession. A streaming
+ * Both this node and the remote one will create a similar symmetrical {@link StreamSession}. A streaming
* session has the following life-cycle:
*
- * 1. Connections Initialization
+ * 1. Session Initialization
*
- * (a) A node (the initiator in the following) create a new StreamSession, initialize it (init())
- * and then start it (start()). Start will create a {@link ConnectionHandler} that will create
- * two connections to the remote node (the follower in the following) with whom to stream and send
- * a StreamInit message. The first connection will be the incoming connection for the
- * initiator, and the second connection will be the outgoing.
- * (b) Upon reception of that StreamInit message, the follower creates its own StreamSession,
- * initialize it if it still does not exist, and attach connecting socket to its ConnectionHandler
- * according to StreamInit message's isForOutgoing flag.
- * (d) When the both incoming and outgoing connections are established, StreamSession calls
- * StreamSession#onInitializationComplete method to start the streaming prepare phase
- * (StreamResultFuture.startStreaming()).
+ * (a) A node (the initiator in the following) create a new {@link StreamSession},
+ * initialize it {@link #init(StreamResultFuture)}, and then start it ({@link #start()}).
+ * Starting a session causes a {@link StreamInitMessage} to be sent.
+ * (b) Upon reception of that {@link StreamInitMessage}, the follower creates its own {@link StreamSession},
+ * and initializes it if it still does not exist.
+ * (c) After the initiator sends the {@link StreamInitMessage}, it invokes
+ * {@link StreamSession#onInitializationComplete()} to start the streaming prepare phase.
*
* 2. Streaming preparation phase
*
- * (a) This phase is started when the initiator onInitializationComplete() method is called. This method sends a
- * PrepareMessage that includes what files/sections this node will stream to the follower
- * (stored in a StreamTransferTask, each column family has it's own transfer task) and what
- * the follower needs to stream back (StreamReceiveTask, same as above). If the initiator has
- * nothing to receive from the follower, it goes directly to its Streaming phase. Otherwise,
- * it waits for the follower PrepareMessage.
- * (b) Upon reception of the PrepareMessage, the follower records which files/sections it will receive
- * and send back its own PrepareMessage with a summary of the files/sections that will be sent to
- * the initiator (prepare()). After having sent that message, the follower goes to its Streamning
- * phase.
- * (c) When the initiator receives the follower PrepareMessage, it records which files/sections it will
- * receive and then goes to his own Streaming phase.
+ * (a) A {@link PrepareSynMessage} is sent that includes a) what files/sections this node will stream to the follower
+ * (stored locally in a {@link StreamTransferTask}, one for each table) and b) what the follower needs to
+ * stream back (stored locally in a {@link StreamReceiveTask}, one for each table).
+ * (b) Upon reception of the {@link PrepareSynMessage}, the follower records which files/sections it will receive
+ * and send back a {@link PrepareSynAckMessage}, which contains a summary of the files/sections that will be sent to
+ * the initiator.
+ * (c) When the initiator receives the {@link PrepareSynAckMessage}, it records which files/sections it will
+ * receive, and then goes to it's Streaming phase (see next section). If the intiator is to receive files,
+ * it sends a {@link PrepareAckMessage} to the follower to indicate that it can start streaming to the initiator.
+ * (d) (Optional) If the follower receives a {@link PrepareAckMessage}, it enters it's Streaming phase.
*
* 3. Streaming phase
*
- * (a) The streaming phase is started by each node (the sender in the follower, but note that each side
- * of the StreamSession may be sender for some of the files) involved by calling startStreamingFiles().
- * This will sequentially send a FileMessage for each file of each SteamTransferTask. Each FileMessage
- * consists of a FileMessageHeader that indicates which file is coming and then start streaming the
- * content for that file (StreamWriter in FileMessage.serialize()). When a file is fully sent, the
- * fileSent() method is called for that file. If all the files for a StreamTransferTask are sent
- * (StreamTransferTask.complete()), the task is marked complete (taskCompleted()).
- * (b) On the receiving side, a SSTable will be written for the incoming file (StreamReader in
- * FileMessage.deserialize()) and once the FileMessage is fully received, the file will be marked as
- * complete (received()). When all files for the StreamReceiveTask have been received, the sstables
- * are added to the CFS (and 2ndary index are built, StreamReceiveTask.complete()) and the task
- * is marked complete (taskCompleted())
+ * (a) The streaming phase is started at each node by calling {@link StreamSession#startStreamingFiles(boolean)}.
+ * This will send, sequentially on each outbound streaming connection (see {@link NettyStreamingMessageSender}),
+ * an {@link OutgoingFileMessage} for each file in each of the {@link StreamTransferTask}.
+ * Each {@link OutgoingFileMessage} consists of a {@link FileMessageHeader} that contains metadata about the file
+ * being streamed, followed by the file content itself. Once all the files for a {@link StreamTransferTask} are sent,
+ * the task is marked complete {@link StreamTransferTask#complete(int)}.
+ * (b) On the receiving side, a SSTable will be written for the incoming file, and once the file is fully received,
+ * the file will be marked as complete ({@link StreamReceiveTask#received(SSTableMultiWriter)}). When all files
+ * for the {@link StreamReceiveTask} have been received, the sstables are added to the CFS (and 2ndary indexes/MV are built),
+ * and the task is marked complete ({@link #taskCompleted(StreamReceiveTask)}).
* (b) If during the streaming of a particular file an error occurs on the receiving end of a stream
- * (FileMessage.deserialize), the node will send a SessionFailedMessage to the sender and close the stream session.
- * (c) When all transfer and receive tasks for a session are complete, the move to the Completion phase
- * (maybeCompleted()).
+ * (it may be either the initiator or the follower), the node will send a {@link SessionFailedMessage}
+ * to the sender and close the stream session.
+ * (c) When all transfer and receive tasks for a session are complete, the session moves to the Completion phase
+ * ({@link #maybeCompleted()}).
*
* 4. Completion phase
*
- * (a) When a node has finished all transfer and receive task, it enter the completion phase (maybeCompleted()).
- * If it had already received a CompleteMessage from the other side (it is in the WAIT_COMPLETE state), that
- * session is done is is closed (closeSession()). Otherwise, the node switch to the WAIT_COMPLETE state and
- * send a CompleteMessage to the other side.
+ * (a) When a node enters the completion phase, it sends a {@link CompleteMessage} to the peer, and then enter the
+ * {@link StreamSession.State#WAIT_COMPLETE} state. If it has already received a {@link CompleteMessage}
+ * from the peer, session is complete and is then closed ({@link #closeSession(State)}). Otherwise, the node
+ * switch to the {@link StreamSession.State#WAIT_COMPLETE} state and send a {@link CompleteMessage} to the other side.
+ *
+ * In brief, the message passing looks like this (I for initiator, F for follwer):
+ * (session init)
+ * I: StreamInitMessage
+ * (session prepare)
+ * I: PrepareSynMessage
+ * F: PrepareSynAckMessage
+ * I: PrepareAckMessage
+ * (stream - this can happen in both directions)
+ * I: OutgoingFileMessage
+ * F: ReceivedMessage
+ * (completion)
+ * I/F: CompleteMessage
+ *
+ * All messages which derive from {@link StreamMessage} are sent by the standard internode messaging
+ * (via {@link org.apache.cassandra.net.MessagingService}, while the actual files themselves are sent by a special
+ * "streaming" connection type. See {@link NettyStreamingMessageSender} for details. Because of the asynchronous
*/
public class StreamSession implements IEndpointStateChangeSubscriber
{
-
- /**
- * Version where keep-alive support was added
- */
- private static final CassandraVersion STREAM_KEEP_ALIVE_VERSION = new CassandraVersion("3.10");
private static final Logger logger = LoggerFactory.getLogger(StreamSession.class);
- private static final DebuggableScheduledThreadPoolExecutor keepAliveExecutor = new DebuggableScheduledThreadPoolExecutor("StreamKeepAliveExecutor");
- static {
- // Immediately remove keep-alive task when cancelled.
- keepAliveExecutor.setRemoveOnCancelPolicy(true);
- }
/**
* Streaming endpoint.
@@ -139,7 +145,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber
* Each {@code StreamSession} is identified by this InetAddress which is broadcast address of the node streaming.
*/
public final InetAddress peer;
+
private final int index;
+
/** Actual connecting address. Can be the same as {@linkplain #peer}. */
public final InetAddress connecting;
@@ -154,20 +162,18 @@ public class StreamSession implements IEndpointStateChangeSubscriber
// data receivers, filled after receiving prepare message
private final Map<TableId, StreamReceiveTask> receivers = new ConcurrentHashMap<>();
private final StreamingMetrics metrics;
- /* can be null when session is created in remote */
- private final StreamConnectionFactory factory;
- public final Map<String, Set<Range<Token>>> transferredRangesPerKeyspace = new HashMap<>();
+ final Map<String, Set<Range<Token>>> transferredRangesPerKeyspace = new HashMap<>();
- public final ConnectionHandler handler;
+ private final NettyStreamingMessageSender messageSender;
+ private final ConcurrentMap<ChannelId, Channel> incomingChannels = new ConcurrentHashMap<>();
- private AtomicBoolean isAborted = new AtomicBoolean(false);
+ private final AtomicBoolean isAborted = new AtomicBoolean(false);
private final boolean keepSSTableLevel;
- private ScheduledFuture<?> keepAliveFuture = null;
private final UUID pendingRepair;
private final PreviewKind previewKind;
- public static enum State
+ public enum State
{
INITIALIZED,
PREPARING,
@@ -184,17 +190,16 @@ public class StreamSession implements IEndpointStateChangeSubscriber
* Create new streaming session with the peer.
* @param peer Address of streaming peer
* @param connecting Actual connecting address
- * @param factory is used for establishing connection
*/
public StreamSession(InetAddress peer, InetAddress connecting, StreamConnectionFactory factory, int index, boolean keepSSTableLevel, UUID pendingRepair, PreviewKind previewKind)
{
this.peer = peer;
this.connecting = connecting;
this.index = index;
- this.factory = factory;
- this.handler = new ConnectionHandler(this, isKeepAliveSupported()?
- (int)TimeUnit.SECONDS.toMillis(2 * DatabaseDescriptor.getStreamingKeepAlivePeriod()) :
- DatabaseDescriptor.getStreamingSocketTimeout(), previewKind.isPreview());
+
+ OutboundConnectionIdentifier id = OutboundConnectionIdentifier.stream(new InetSocketAddress(FBUtilities.getBroadcastAddress(), 0),
+ new InetSocketAddress(connecting, MessagingService.portFor(connecting)));
+ this.messageSender = new NettyStreamingMessageSender(this, id, factory, StreamMessage.CURRENT_VERSION, previewKind.isPreview());
this.metrics = StreamingMetrics.get(connecting);
this.keepSSTableLevel = keepSSTableLevel;
this.pendingRepair = pendingRepair;
@@ -242,12 +247,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber
return receivers.get(tableId).getTransaction();
}
- private boolean isKeepAliveSupported()
- {
- CassandraVersion peerVersion = Gossiper.instance.getReleaseVersion(peer);
- return peerVersion != null && peerVersion.compareTo(STREAM_KEEP_ALIVE_VERSION) >= 0;
- }
-
/**
* Bind this session to report to specific {@link StreamResultFuture} and
* perform pre-streaming initialization.
@@ -258,13 +257,18 @@ public class StreamSession implements IEndpointStateChangeSubscriber
{
this.streamResult = streamResult;
StreamHook.instance.reportStreamFuture(this, streamResult);
+ }
- if (isKeepAliveSupported())
- scheduleKeepAliveTask();
- else
- logger.debug("Peer {} does not support keep-alive.", peer);
+ public boolean attach(Channel channel)
+ {
+ if (!messageSender.hasControlChannel())
+ messageSender.injectControlMessageChannel(channel);
+ return incomingChannels.putIfAbsent(channel.id(), channel) == null;
}
+ /**
+ * invoked by the node that begins the stream session (it may be sending files, receiving files, or both)
+ */
public void start()
{
if (requests.isEmpty() && transfers.isEmpty())
@@ -279,7 +283,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
logger.info("[Stream #{}] Starting streaming to {}{}", planId(),
peer,
peer.equals(connecting) ? "" : " through " + connecting);
- handler.initiate();
+ messageSender.initialize();
onInitializationComplete();
}
catch (Exception e)
@@ -289,12 +293,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber
}
}
- public Socket createConnection() throws IOException
- {
- assert factory != null;
- return factory.createConnection(connecting);
- }
-
/**
* Request data fetch task to this session.
*
@@ -317,7 +315,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
* @param columnFamilies Transfer ColumnFamilies
* @param flushTables flush tables?
*/
- public synchronized void addTransferRanges(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean flushTables)
+ synchronized void addTransferRanges(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean flushTables)
{
failIfFinished();
Collection<ColumnFamilyStore> stores = getColumnFamilyStores(keyspace, columnFamilies);
@@ -428,7 +426,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
}
}
- public synchronized void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails)
+ synchronized void addTransferFiles(Collection<SSTableStreamingSections> sstableDetails)
{
failIfFinished();
Iterator<SSTableStreamingSections> iter = sstableDetails.iterator();
@@ -472,31 +470,37 @@ public class StreamSession implements IEndpointStateChangeSubscriber
}
}
- private synchronized void closeSession(State finalState)
+ private synchronized Future closeSession(State finalState)
{
+ Future abortedTasksFuture = null;
if (isAborted.compareAndSet(false, true))
{
state(finalState);
+ // ensure aborting the tasks do not happen on the network IO thread (read: netty event loop)
+ // as we don't want any blocking disk IO to stop the network thread
if (finalState == State.FAILED)
- {
- for (StreamTask task : Iterables.concat(receivers.values(), transfers.values()))
- task.abort();
- }
+ abortedTasksFuture = ScheduledExecutors.nonPeriodicTasks.submit(this::abortTasks);
- if (keepAliveFuture != null)
- {
- logger.debug("[Stream #{}] Finishing keep-alive task.", planId());
- keepAliveFuture.cancel(false);
- keepAliveFuture = null;
- }
-
- // Note that we shouldn't block on this close because this method is called on the handler
- // incoming thread (so we would deadlock).
- handler.close();
+ incomingChannels.values().stream().map(channel -> channel.close());
+ messageSender.close();
streamResult.handleSessionComplete(this);
}
+ return abortedTasksFuture != null ? abortedTasksFuture : Futures.immediateFuture(null);
+ }
+
+ private void abortTasks()
+ {
+ try
+ {
+ receivers.values().forEach(StreamReceiveTask::abort);
+ transfers.values().forEach(StreamTransferTask::abort);
+ }
+ catch (Exception e)
+ {
+ logger.warn("failed to abort some streaming tasks", e);
+ }
}
/**
@@ -517,6 +521,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber
return state;
}
+ public NettyStreamingMessageSender getMessageSender()
+ {
+ return messageSender;
+ }
+
/**
* Return if this session completed successfully.
*
@@ -531,27 +540,37 @@ public class StreamSession implements IEndpointStateChangeSubscriber
{
switch (message.type)
{
- case PREPARE:
- PrepareMessage msg = (PrepareMessage) message;
+ case STREAM_INIT:
+ // nop
+ break;
+ case PREPARE_SYN:
+ PrepareSynMessage msg = (PrepareSynMessage) message;
prepare(msg.requests, msg.summaries);
break;
-
+ case PREPARE_SYNACK:
+ prepareSynAck((PrepareSynAckMessage) message);
+ break;
+ case PREPARE_ACK:
+ prepareAck((PrepareAckMessage) message);
+ break;
case FILE:
receive((IncomingFileMessage) message);
break;
-
case RECEIVED:
ReceivedMessage received = (ReceivedMessage) message;
received(received.tableId, received.sequenceNumber);
break;
-
case COMPLETE:
complete();
break;
-
+ case KEEP_ALIVE:
+ // NOP - we only send/receive the KEEP_ALIVE to force the TCP connection to remain open
+ break;
case SESSION_FAILED:
sessionFailed();
break;
+ default:
+ throw new AssertionError("unhandled StreamMessage type: " + message.getClass().getName());
}
}
@@ -562,55 +581,43 @@ public class StreamSession implements IEndpointStateChangeSubscriber
{
// send prepare message
state(State.PREPARING);
- PrepareMessage prepare = new PrepareMessage();
+ PrepareSynMessage prepare = new PrepareSynMessage();
prepare.requests.addAll(requests);
for (StreamTransferTask task : transfers.values())
prepare.summaries.add(task.getSummary());
- handler.sendMessage(prepare);
-
- // if we don't need to prepare for receiving stream, start sending files immediately
- if (requests.isEmpty())
- startStreamingFiles();
+ messageSender.sendMessage(prepare);
}
- /**l
+ /**
* Call back for handling exception during streaming.
- *
- * @param e thrown exception
*/
- public void onError(Throwable e)
+ public Future onError(Throwable e)
{
logError(e);
// send session failure message
- if (handler.isOutgoingConnected())
- handler.sendMessage(new SessionFailedMessage());
+ if (messageSender.connected())
+ messageSender.sendMessage(new SessionFailedMessage());
// fail session
- closeSession(State.FAILED);
+ return closeSession(State.FAILED);
}
private void logError(Throwable e)
{
if (e instanceof SocketTimeoutException)
{
- if (isKeepAliveSupported())
- logger.error("[Stream #{}] Did not receive response from peer {}{} for {} secs. Is peer down? " +
- "If not, maybe try increasing streaming_keep_alive_period_in_secs.", planId(),
- peer.getHostAddress(),
- peer.equals(connecting) ? "" : " through " + connecting.getHostAddress(),
- 2 * DatabaseDescriptor.getStreamingKeepAlivePeriod(),
- e);
- else
- logger.error("[Stream #{}] Streaming socket timed out. This means the session peer stopped responding or " +
- "is still processing received data. If there is no sign of failure in the other end or a very " +
- "dense table is being transferred you may want to increase streaming_socket_timeout_in_ms " +
- "property. Current value is {}ms.", planId(), DatabaseDescriptor.getStreamingSocketTimeout(), e);
+ logger.error("[Stream #{}] Did not receive response from peer {}{} for {} secs. Is peer down? " +
+ "If not, maybe try increasing streaming_keep_alive_period_in_secs.", planId(),
+ peer.getHostAddress(),
+ peer.equals(connecting) ? "" : " through " + connecting.getHostAddress(),
+ 2 * DatabaseDescriptor.getStreamingKeepAlivePeriod(),
+ e);
}
else
{
logger.error("[Stream #{}] Streaming error occurred on session with peer {}{}", planId(),
- peer.getHostAddress(),
- peer.equals(connecting) ? "" : " through " + connecting.getHostAddress(),
- e);
+ peer.getHostAddress(),
+ peer.equals(connecting) ? "" : " through " + connecting.getHostAddress(),
+ e);
}
}
@@ -621,29 +628,55 @@ public class StreamSession implements IEndpointStateChangeSubscriber
{
// prepare tasks
state(State.PREPARING);
+ ScheduledExecutors.nonPeriodicTasks.execute(() -> prepareAsync(requests, summaries));
+ }
+
+ /**
+ * Finish preparing the session. This method is blocking (memtables are flushed in {@link #addTransferRanges}),
+ * so the logic should not execute on the main IO thread (read: netty event loop).
+ */
+ private void prepareAsync(Collection<StreamRequest> requests, Collection<StreamSummary> summaries)
+ {
+
for (StreamRequest request : requests)
addTransferRanges(request.keyspace, request.ranges, request.columnFamilies, true); // always flush on stream request
for (StreamSummary summary : summaries)
prepareReceiving(summary);
- // send back prepare message if prepare message contains stream request
- if (!requests.isEmpty())
- {
- PrepareMessage prepare = new PrepareMessage();
+ PrepareSynAckMessage prepareSynAck = new PrepareSynAckMessage();
+ if (!peer.equals(FBUtilities.getBroadcastAddress()))
for (StreamTransferTask task : transfers.values())
- prepare.summaries.add(task.getSummary());
- handler.sendMessage(prepare);
+ prepareSynAck.summaries.add(task.getSummary());
+ messageSender.sendMessage(prepareSynAck);
+
+
+ streamResult.handleSessionPrepared(this);
+ maybeCompleted();
+ }
+
+ private void prepareSynAck(PrepareSynAckMessage msg)
+ {
+ if (!msg.summaries.isEmpty())
+ {
+ for (StreamSummary summary : msg.summaries)
+ prepareReceiving(summary);
+
+ // only send the (final) ACK if we are expecting the peer to send this node (the initiator) some files
+ messageSender.sendMessage(new PrepareAckMessage());
}
if (isPreview())
- {
completePreview();
- return;
- }
+ else
+ startStreamingFiles(true);
+ }
- // if there are files to stream
- if (!maybeCompleted())
- startStreamingFiles();
+ private void prepareAck(PrepareAckMessage msg)
+ {
+ if (isPreview())
+ completePreview();
+ else
+ startStreamingFiles(true);
}
/**
@@ -665,7 +698,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
}
/**
- * Call back after receiving FileMessageHeader.
+ * Call back after receiving a streamed file.
*
* @param message received file
*/
@@ -680,7 +713,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
StreamingMetrics.totalIncomingBytes.inc(headerSize);
metrics.incomingBytes.inc(headerSize);
// send back file received message
- handler.sendMessage(new ReceivedMessage(message.header.tableId, message.header.sequenceNumber));
+ messageSender.sendMessage(new ReceivedMessage(message.header.tableId, message.header.sequenceNumber));
receivers.get(message.header.tableId).received(message.sstable);
}
@@ -700,11 +733,12 @@ public class StreamSession implements IEndpointStateChangeSubscriber
*/
public synchronized void complete()
{
+ logger.debug("handling Complete message, state = {}, completeSent = {}", state, completeSent);
if (state == State.WAIT_COMPLETE)
{
if (!completeSent)
{
- handler.sendMessage(new CompleteMessage());
+ messageSender.sendMessage(new CompleteMessage());
completeSent = true;
}
closeSession(State.COMPLETE);
@@ -712,17 +746,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber
else
{
state(State.WAIT_COMPLETE);
- handler.closeIncoming();
- }
- }
-
- private synchronized void scheduleKeepAliveTask()
- {
- if (keepAliveFuture == null)
- {
- int keepAlivePeriod = DatabaseDescriptor.getStreamingKeepAlivePeriod();
- logger.debug("[Stream #{}] Scheduling keep-alive task with {}s period.", planId(), keepAlivePeriod);
- keepAliveFuture = keepAliveExecutor.scheduleAtFixedRate(new KeepAliveTask(), 0, keepAlivePeriod, TimeUnit.SECONDS);
}
}
@@ -804,7 +827,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
{
if (!completeSent)
{
- handler.sendMessage(new CompleteMessage());
+ messageSender.sendMessage(new CompleteMessage());
completeSent = true;
}
closeSession(State.COMPLETE);
@@ -812,10 +835,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber
else
{
// notify peer that this session is completed
- handler.sendMessage(new CompleteMessage());
+ messageSender.sendMessage(new CompleteMessage());
completeSent = true;
state(State.WAIT_COMPLETE);
- handler.closeOutgoing();
}
}
return completed;
@@ -840,46 +862,30 @@ public class StreamSession implements IEndpointStateChangeSubscriber
receivers.put(summary.tableId, new StreamReceiveTask(this, summary.tableId, summary.files, summary.totalSize));
}
- private void startStreamingFiles()
+ private void startStreamingFiles(boolean notifyPrepared)
{
- streamResult.handleSessionPrepared(this);
+ if (notifyPrepared)
+ streamResult.handleSessionPrepared(this);
state(State.STREAMING);
for (StreamTransferTask task : transfers.values())
{
Collection<OutgoingFileMessage> messages = task.getFileMessages();
- if (messages.size() > 0)
- handler.sendMessages(messages);
- else
- taskCompleted(task); // there is no file to send
- }
- }
-
- class KeepAliveTask implements Runnable
- {
- private KeepAliveMessage last = null;
-
- public void run()
- {
- //to avoid jamming the message queue, we only send if the last one was sent
- if (last == null || last.wasSent())
+ if (!messages.isEmpty())
{
- logger.trace("[Stream #{}] Sending keep-alive to {}.", planId(), peer);
- last = new KeepAliveMessage();
- try
+ for (OutgoingFileMessage ofm : messages)
{
- handler.sendMessage(last);
- }
- catch (RuntimeException e) //connection handler is closed
- {
- logger.debug("[Stream #{}] Could not send keep-alive message (perhaps stream session is finished?).", planId(), e);
+ // pass the session planId/index to the OFM (which is only set at init(), after the transfers have already been created)
+ ofm.header.addSessionInfo(this);
+ messageSender.sendMessage(ofm);
}
}
else
{
- logger.trace("[Stream #{}] Skip sending keep-alive to {} (previous was not yet sent).", planId(), peer);
+ taskCompleted(task); // there are no files to send
}
}
+ maybeCompleted();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index 748da8b..5e21712 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -20,11 +20,15 @@ package org.apache.cassandra.streaming;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.TableId;
@@ -37,6 +41,7 @@ import org.apache.cassandra.utils.concurrent.Ref;
*/
public class StreamTransferTask extends StreamTask
{
+ private static final Logger logger = LoggerFactory.getLogger(StreamTransferTask.class);
private static final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("StreamingTransferTaskTimeouts"));
private final AtomicInteger sequenceNumber = new AtomicInteger(0);
@@ -56,10 +61,10 @@ public class StreamTransferTask extends StreamTask
public synchronized void addTransferFile(Ref<SSTableReader> ref, long estimatedKeys, List<Pair<Long, Long>> sections)
{
assert ref.get() != null && tableId.equals(ref.get().metadata().id);
- OutgoingFileMessage message = new OutgoingFileMessage(ref, sequenceNumber.getAndIncrement(), estimatedKeys, sections, session.keepSSTableLevel());
+ OutgoingFileMessage message = new OutgoingFileMessage(ref, session, sequenceNumber.getAndIncrement(), estimatedKeys, sections, session.keepSSTableLevel());
message = StreamHook.instance.reportOutgoingFile(session, ref.get(), message);
files.put(message.header.sequenceNumber, message);
- totalSize += message.header.size();
+ totalSize += message.header.size();
}
/**
@@ -80,6 +85,7 @@ public class StreamTransferTask extends StreamTask
if (file != null)
file.complete();
+ logger.debug("recevied sequenceNumber {}, remaining files {}", sequenceNumber, files.keySet());
signalComplete = files.isEmpty();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java
index 6c86c8b..81b3d8a 100644
--- a/src/java/org/apache/cassandra/streaming/StreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java
@@ -19,21 +19,21 @@ package org.apache.cassandra.streaming;
import java.io.File;
import java.io.IOException;
-import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.ning.compress.lzf.LZFOutputStream;
-
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.DataIntegrityMetadata;
import org.apache.cassandra.io.util.DataIntegrityMetadata.ChecksumValidator;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
-import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
+import org.apache.cassandra.streaming.compress.ByteBufCompressionDataOutputStreamPlus;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
@@ -51,11 +51,6 @@ public class StreamWriter
protected final StreamRateLimiter limiter;
protected final StreamSession session;
- private OutputStream compressedOutput;
-
- // allocate buffer to use for transfers only once
- private byte[] transferBuffer;
-
public StreamWriter(SSTableReader sstable, Collection<Pair<Long, Long>> sections, StreamSession session)
{
this.session = session;
@@ -78,45 +73,48 @@ public class StreamWriter
logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(),
sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize);
- try(RandomAccessReader file = sstable.openDataReader();
+ try(ChannelProxy proxy = sstable.getDataChannel().sharedCopy();
ChecksumValidator validator = new File(sstable.descriptor.filenameFor(Component.CRC)).exists()
? DataIntegrityMetadata.checksumValidator(sstable.descriptor)
- : null;)
+ : null)
{
- transferBuffer = validator == null ? new byte[DEFAULT_CHUNK_SIZE] : new byte[validator.chunkSize];
+ int bufferSize = validator == null ? DEFAULT_CHUNK_SIZE: validator.chunkSize;
// setting up data compression stream
- compressedOutput = new LZFOutputStream(output);
long progress = 0L;
- // stream each of the required sections of the file
- for (Pair<Long, Long> section : sections)
+ try (DataOutputStreamPlus compressedOutput = new ByteBufCompressionDataOutputStreamPlus(output, limiter))
{
- long start = validator == null ? section.left : validator.chunkStart(section.left);
- int readOffset = (int) (section.left - start);
- // seek to the beginning of the section
- file.seek(start);
- if (validator != null)
- validator.seek(start);
-
- // length of the section to read
- long length = section.right - start;
- // tracks write progress
- long bytesRead = 0;
- while (bytesRead < length)
+ // stream each of the required sections of the file
+ for (Pair<Long, Long> section : sections)
{
- long lastBytesRead = write(file, validator, readOffset, length, bytesRead);
- bytesRead += lastBytesRead;
- progress += (lastBytesRead - readOffset);
- session.progress(sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress, totalSize);
- readOffset = 0;
+ long start = validator == null ? section.left : validator.chunkStart(section.left);
+ // if the transfer does not start on the valididator's chunk boundary, this is the number of bytes to offset by
+ int transferOffset = (int) (section.left - start);
+ if (validator != null)
+ validator.seek(start);
+
+ // length of the section to read
+ long length = section.right - start;
+ // tracks write progress
+ long bytesRead = 0;
+ while (bytesRead < length)
+ {
+ int toTransfer = (int) Math.min(bufferSize, length - bytesRead);
+ long lastBytesRead = write(proxy, validator, compressedOutput, start, transferOffset, toTransfer, bufferSize);
+ start += lastBytesRead;
+ bytesRead += lastBytesRead;
+ progress += (lastBytesRead - transferOffset);
+ session.progress(sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress, totalSize);
+ transferOffset = 0;
+ }
+
+ // make sure that current section is sent
+ output.flush();
}
-
- // make sure that current section is sent
- compressedOutput.flush();
+ logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}",
+ session.planId(), sstable.getFilename(), session.peer, FBUtilities.prettyPrintMemory(progress), FBUtilities.prettyPrintMemory(totalSize));
}
- logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}",
- session.planId(), sstable.getFilename(), session.peer, FBUtilities.prettyPrintMemory(progress), FBUtilities.prettyPrintMemory(totalSize));
}
}
@@ -131,27 +129,44 @@ public class StreamWriter
/**
* Sequentially read bytes from the file and write them to the output stream
*
- * @param reader The file reader to read from
+ * @param proxy The file reader to read from
* @param validator validator to verify data integrity
- * @param start number of bytes to skip transfer, but include for validation.
- * @param length The full length that should be read from {@code reader}
- * @param bytesTransferred Number of bytes already read out of {@code length}
+ * @param start The readd offset from the beginning of the {@code proxy} file.
+ * @param transferOffset number of bytes to skip transfer, but include for validation.
+ * @param toTransfer The number of bytes to be transferred.
*
- * @return Number of bytes read
+ * @return Number of bytes transferred.
*
* @throws java.io.IOException on any I/O error
*/
- protected long write(RandomAccessReader reader, ChecksumValidator validator, int start, long length, long bytesTransferred) throws IOException
+ protected long write(ChannelProxy proxy, ChecksumValidator validator, DataOutputStreamPlus output, long start, int transferOffset, int toTransfer, int bufferSize) throws IOException
{
- int toTransfer = (int) Math.min(transferBuffer.length, length - bytesTransferred);
- int minReadable = (int) Math.min(transferBuffer.length, reader.length() - reader.getFilePointer());
+ // the count of bytes to read off disk
+ int minReadable = (int) Math.min(bufferSize, proxy.size() - start);
- reader.readFully(transferBuffer, 0, minReadable);
- if (validator != null)
- validator.validate(transferBuffer, 0, minReadable);
+ // this buffer will hold the data from disk. as it will be compressed on the fly by
+ // ByteBufCompressionDataOutputStreamPlus.write(ByteBuffer), we can release this buffer as soon as we can.
+ ByteBuffer buffer = ByteBuffer.allocateDirect(minReadable);
+ try
+ {
+ int readCount = proxy.read(buffer, start);
+ assert readCount == minReadable : String.format("could not read required number of bytes from file to be streamed: read %d bytes, wanted %d bytes", readCount, minReadable);
+ buffer.flip();
- limiter.acquire(toTransfer - start);
- compressedOutput.write(transferBuffer, start, (toTransfer - start));
+ if (validator != null)
+ {
+ validator.validate(buffer);
+ buffer.flip();
+ }
+
+ buffer.position(transferOffset);
+ buffer.limit(transferOffset + (toTransfer - transferOffset));
+ output.write(buffer);
+ }
+ finally
+ {
+ FileUtils.clean(buffer);
+ }
return toTransfer;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/StreamingMessageSender.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamingMessageSender.java b/src/java/org/apache/cassandra/streaming/StreamingMessageSender.java
new file mode 100644
index 0000000..9562981
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamingMessageSender.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+import java.io.IOException;
+
+import org.apache.cassandra.streaming.messages.StreamMessage;
+
+public interface StreamingMessageSender
+{
+ void initialize() throws IOException;
+
+ void sendMessage(StreamMessage message) throws IOException;
+
+ boolean connected();
+
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
new file mode 100644
index 0000000..f872005
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming.async;
+
+import java.io.IOError;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelPipeline;
+import io.netty.util.AttributeKey;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.DataOutputBufferFixed;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
+import org.apache.cassandra.net.async.NettyFactory;
+import org.apache.cassandra.net.async.OutboundConnectionIdentifier;
+import org.apache.cassandra.streaming.StreamConnectionFactory;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.StreamingMessageSender;
+import org.apache.cassandra.streaming.messages.IncomingFileMessage;
+import org.apache.cassandra.streaming.messages.KeepAliveMessage;
+import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
+import org.apache.cassandra.streaming.messages.StreamInitMessage;
+import org.apache.cassandra.streaming.messages.StreamMessage;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Responsible for sending {@link StreamMessage}s to a given peer. We manage an array of netty {@link Channel}s
+ * for sending {@link OutgoingFileMessage} instances; all other {@link StreamMessage} types are sent via
+ * a special control channel. The reason for this is to treat those messages carefully and not let them get stuck
+ * behind a file transfer.
+ *
+ * One of the challenges when sending files is we might need to delay shipping the file if:
+ *
+ * - we've exceeded our network I/O use due to rate limiting (at the cassandra level)
+ * - the receiver isn't keeping up, which causes the local TCP socket buffer to not empty, which causes epoll writes to not
+ * move any bytes to the socket, which causes buffers to stick around in user-land (a/k/a cassandra) memory.
+ *
+ * When those conditions occur, it's easy enough to reschedule processing the file once the resources pick up
+ * (we acquire the permits from the rate limiter, or the socket drains). However, we need to ensure that
+ * no other messages are submitted to the same channel while the current file is still being processed.
+ */
+public class NettyStreamingMessageSender implements StreamingMessageSender
+{
+ private static final Logger logger = LoggerFactory.getLogger(NettyStreamingMessageSender.class);
+
+ private static final int DEFAULT_MAX_PARALLEL_TRANSFERS = FBUtilities.getAvailableProcessors();
+ private static final int MAX_PARALLEL_TRANSFERS = Integer.parseInt(System.getProperty(Config.PROPERTY_PREFIX + "streaming.session.parallelTransfers", Integer.toString(DEFAULT_MAX_PARALLEL_TRANSFERS)));
+
+ // a simple mechansim for allowing a degree of fairnes across multiple sessions
+ private static final Semaphore fileTransferSemaphore = new Semaphore(DEFAULT_MAX_PARALLEL_TRANSFERS, true);
+
+ private final StreamSession session;
+ private final boolean isPreview;
+ private final int protocolVersion;
+ private final OutboundConnectionIdentifier connectionId;
+ private final StreamConnectionFactory factory;
+
+ private volatile boolean closed;
+
+ /**
+ * A special {@link Channel} for sending non-file streaming messages, basically anything that isn't an
+ * {@link OutgoingFileMessage} (or an {@link IncomingFileMessage}, but a node doesn't send that, it's only received).
+ */
+ private Channel controlMessageChannel;
+
+ // note: this really doesn't need to be a LBQ, just something that's thread safe
+ private final Collection<ScheduledFuture<?>> channelKeepAlives = new LinkedBlockingQueue<>();
+
+ private final ThreadPoolExecutor fileTransferExecutor;
+
+ /**
+ * A {@link ThreadLocal} used by the threads in {@link #fileTransferExecutor} to stash references to constructed
+ * and connected {@link Channel}s.
+ */
+ private final ConcurrentMap<Thread, Channel> threadLocalChannel = new ConcurrentHashMap<>();
+
+ /**
+ * A netty channel attribute used to indicate if a channel is currently transferring a file. This is primarily used
+ * to indicate to the {@link KeepAliveTask} if it is safe to send a {@link KeepAliveMessage}, as sending the
+ * (application level) keep-alive in the middle of streaming a file would be bad news.
+ */
+ @VisibleForTesting
+ static final AttributeKey<Boolean> TRANSFERRING_FILE_ATTR = AttributeKey.valueOf("transferringFile");
+
+ public NettyStreamingMessageSender(StreamSession session, OutboundConnectionIdentifier connectionId, StreamConnectionFactory factory, int protocolVersion, boolean isPreview)
+ {
+ this.session = session;
+ this.protocolVersion = protocolVersion;
+ this.connectionId = connectionId;
+ this.factory = factory;
+ this.isPreview = isPreview;
+
+ String name = session.peer.toString().replace(':', '.');
+ fileTransferExecutor = new DebuggableThreadPoolExecutor(1, MAX_PARALLEL_TRANSFERS, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
+ new NamedThreadFactory("NettyStreaming-Outbound-" + name));
+ fileTransferExecutor.allowCoreThreadTimeOut(true);
+ }
+
+ @Override
+ public void initialize() throws IOException
+ {
+ StreamInitMessage message = new StreamInitMessage(FBUtilities.getBroadcastAddress(),
+ session.sessionIndex(),
+ session.planId(),
+ session.streamOperation(),
+ session.keepSSTableLevel(),
+ session.getPendingRepair(),
+ session.getPreviewKind());
+ sendMessage(message);
+ }
+
+ public boolean hasControlChannel()
+ {
+ return controlMessageChannel != null;
+ }
+
+ public void injectControlMessageChannel(Channel channel)
+ {
+ this.controlMessageChannel = channel;
+ channel.attr(TRANSFERRING_FILE_ATTR).set(Boolean.FALSE);
+ scheduleKeepAliveTask(channel);
+ }
+
+ private void setupControlMessageChannel() throws IOException
+ {
+ if (controlMessageChannel == null)
+ {
+ controlMessageChannel = createChannel();
+ scheduleKeepAliveTask(controlMessageChannel);
+ }
+ }
+
+ private void scheduleKeepAliveTask(Channel channel)
+ {
+ int keepAlivePeriod = DatabaseDescriptor.getStreamingKeepAlivePeriod();
+ logger.debug("{} Scheduling keep-alive task with {}s period.", createLogTag(session, channel), keepAlivePeriod);
+
+ KeepAliveTask task = new KeepAliveTask(channel, session);
+ ScheduledFuture<?> scheduledFuture = channel.eventLoop().scheduleAtFixedRate(task, 0, keepAlivePeriod, TimeUnit.SECONDS);
+ channelKeepAlives.add(scheduledFuture);
+ task.future = scheduledFuture;
+ }
+
+ private Channel createChannel() throws IOException
+ {
+ Channel channel = factory.createConnection(connectionId, protocolVersion);
+ ChannelPipeline pipeline = channel.pipeline();
+ pipeline.addLast(NettyFactory.instance.streamingGroup, NettyFactory.INBOUND_STREAM_HANDLER_NAME, new StreamingInboundHandler(connectionId.remoteAddress(), protocolVersion, session));
+ channel.attr(TRANSFERRING_FILE_ATTR).set(Boolean.FALSE);
+ return channel;
+ }
+
+ static String createLogTag(StreamSession session, Channel channel)
+ {
+ StringBuilder sb = new StringBuilder(64);
+ sb.append("[Stream");
+
+ if (session != null)
+ sb.append(" #").append(session.planId());
+
+ if (channel != null)
+ sb.append(" channel: ").append(channel.id());
+
+ sb.append(']');
+ return sb.toString();
+ }
+
+ @Override
+ public void sendMessage(StreamMessage message)
+ {
+ if (closed)
+ throw new RuntimeException("stream has been closed, cannot send " + message);
+
+ if (message instanceof OutgoingFileMessage)
+ {
+ if (isPreview)
+ throw new RuntimeException("Cannot send file messages for preview streaming sessions");
+ logger.debug("{} Sending {}", createLogTag(session, null), message);
+ fileTransferExecutor.submit(new FileStreamTask((OutgoingFileMessage)message));
+ return;
+ }
+
+ try
+ {
+ setupControlMessageChannel();
+ sendControlMessage(controlMessageChannel, message, future -> onControlMessageComplete(future, message));
+ }
+ catch (Exception e)
+ {
+ close();
+ session.onError(e);
+ }
+ }
+
+ private void sendControlMessage(Channel channel, StreamMessage message, GenericFutureListener listener) throws IOException
+ {
+ logger.debug("{} Sending {}", createLogTag(session, channel), message);
+
+ // we anticipate that the control messages are rather small, so allocating a ByteBuf shouldn't blow out of memory.
+ long messageSize = StreamMessage.serializedSize(message, protocolVersion);
+ if (messageSize > 1 << 30)
+ {
+ throw new IllegalStateException(String.format("%s something is seriously wrong with the calculated stream control message's size: %d bytes, type is %s",
+ createLogTag(session, channel), messageSize, message.type));
+ }
+
+ // as control messages are (expected to be) small, we can simply allocate a ByteBuf here, wrap it, and send via the channel
+ ByteBuf buf = channel.alloc().directBuffer((int) messageSize, (int) messageSize);
+ ByteBuffer nioBuf = buf.nioBuffer(0, (int) messageSize);
+ @SuppressWarnings("resource")
+ DataOutputBufferFixed out = new DataOutputBufferFixed(nioBuf);
+ StreamMessage.serialize(message, out, protocolVersion, session);
+ assert nioBuf.position() == nioBuf.limit();
+ buf.writerIndex(nioBuf.position());
+
+ ChannelFuture channelFuture = channel.writeAndFlush(buf);
+ channelFuture.addListener(future -> listener.operationComplete(future));
+ }
+
+ /**
+ * Decides what to do after a {@link StreamMessage} is processed.
+ *
+ * Note: this is called from the netty event loop.
+ *
+ * @return null if the message was processed sucessfully; else, a {@link java.util.concurrent.Future} to indicate
+ * the status of aborting any remaining tasks in the session.
+ */
+ java.util.concurrent.Future onControlMessageComplete(Future<?> future, StreamMessage msg)
+ {
+ ChannelFuture channelFuture = (ChannelFuture)future;
+ Throwable cause = future.cause();
+ if (cause == null)
+ return null;
+
+ Channel channel = channelFuture.channel();
+ logger.error("{} failed to send a stream message/file to peer {}: msg = {}",
+ createLogTag(session, channel), connectionId, msg, future.cause());
+
+ // StreamSession will invoke close(), but we have to mark this sender as closed so the session doesn't try
+ // to send any failure messages
+ return session.onError(cause);
+ }
+
+ class FileStreamTask implements Runnable
+ {
+ /**
+ * Time interval, in minutes, to wait between logging a message indicating that we're waiting on a semaphore
+ * permit to become available.
+ */
+ private static final int SEMAPHORE_UNAVAILABLE_LOG_INTERVAL = 3;
+
+ /**
+ * Even though we expect only an {@link OutgoingFileMessage} at runtime, the type here is {@link StreamMessage}
+ * to facilitate simpler testing.
+ */
+ private final StreamMessage msg;
+
+ FileStreamTask(OutgoingFileMessage ofm)
+ {
+ this.msg = ofm;
+ }
+
+ /**
+ * For testing purposes
+ */
+ FileStreamTask(StreamMessage msg)
+ {
+ this.msg = msg;
+ }
+
+ @Override
+ public void run()
+ {
+ if (!acquirePermit(SEMAPHORE_UNAVAILABLE_LOG_INTERVAL))
+ return;
+
+ try
+ {
+ Channel channel = getOrCreateChannel();
+ if (!channel.attr(TRANSFERRING_FILE_ATTR).compareAndSet(false, true))
+ throw new IllegalStateException("channel's transferring state is currently set to true. refusing to start new stream");
+
+ // close the DataOutputStreamPlus as we're done with it - but don't close the channel
+ try (DataOutputStreamPlus outPlus = ByteBufDataOutputStreamPlus.create(session, channel, 1 << 16))
+ {
+ StreamMessage.serialize(msg, outPlus, protocolVersion, session);
+ channel.flush();
+ }
+ finally
+ {
+ channel.attr(TRANSFERRING_FILE_ATTR).set(Boolean.FALSE);
+ }
+ }
+ catch (Exception e)
+ {
+ session.onError(e);
+ }
+ finally
+ {
+ fileTransferSemaphore.release();
+ }
+ }
+
+ boolean acquirePermit(int logInterval)
+ {
+ long logIntervalNanos = TimeUnit.MINUTES.toNanos(logInterval);
+ long timeOfLastLogging = System.nanoTime();
+ while (true)
+ {
+ if (closed)
+ return false;
+ try
+ {
+ if (fileTransferSemaphore.tryAcquire(1, TimeUnit.SECONDS))
+ return true;
+
+ // log a helpful message to operators in case they are wondering why a given session might not be making progress.
+ long now = System.nanoTime();
+ if (now - timeOfLastLogging > logIntervalNanos)
+ {
+ timeOfLastLogging = now;
+ OutgoingFileMessage ofm = (OutgoingFileMessage)msg;
+ logger.info("{} waiting to acquire a permit to begin streaming file {}. This message logs every {} minutes",
+ createLogTag(session, null), ofm.getFilename(), logInterval);
+ }
+ }
+ catch (InterruptedException ie)
+ {
+ //ignore
+ }
+ }
+ }
+
+ private Channel getOrCreateChannel()
+ {
+ Thread currentThread = Thread.currentThread();
+ try
+ {
+ Channel channel = threadLocalChannel.get(currentThread);
+ if (channel != null)
+ return channel;
+
+ channel = createChannel();
+ threadLocalChannel.put(currentThread, channel);
+ return channel;
+ }
+ catch (Exception e)
+ {
+ throw new IOError(e);
+ }
+ }
+
+ /**
+ * For testing purposes
+ */
+ void injectChannel(Channel channel)
+ {
+ Thread currentThread = Thread.currentThread();
+ if (threadLocalChannel.get(currentThread) != null)
+ throw new IllegalStateException("previous channel already set");
+
+ threadLocalChannel.put(currentThread, channel);
+ }
+
+ /**
+ * For testing purposes
+ */
+ void unsetChannel()
+ {
+ threadLocalChannel.remove(Thread.currentThread());
+ }
+ }
+
+ /**
+ * Periodically sends the {@link KeepAliveMessage}.
+ *
+ * NOTE: this task, and the callback function {@link #keepAliveListener(Future)} is executed in the netty event loop.
+ */
+ class KeepAliveTask implements Runnable
+ {
+ private final Channel channel;
+ private final StreamSession session;
+
+ /**
+ * A reference to the scheduled task for this instance so that it may be cancelled.
+ */
+ ScheduledFuture<?> future;
+
+ KeepAliveTask(Channel channel, StreamSession session)
+ {
+ this.channel = channel;
+ this.session = session;
+ }
+
+ public void run()
+ {
+ // if the channel has been closed, cancel the scheduled task and return
+ if (!channel.isOpen() || closed)
+ {
+ future.cancel(false);
+ return;
+ }
+
+ // if the channel is currently processing streaming, skip this execution. As this task executes
+ // on the event loop, even if there is a race with a FileStreamTask which changes the channel attribute
+ // after we check it, the FileStreamTask cannot send out any bytes as this KeepAliveTask is executing
+ // on the event loop (and FileStreamTask publishes it's buffer to the channel, consumed after we're done here).
+ if (channel.attr(TRANSFERRING_FILE_ATTR).get())
+ return;
+
+ try
+ {
+ logger.trace("{} Sending keep-alive to {}.", createLogTag(session, channel), session.peer);
+ sendControlMessage(channel, new KeepAliveMessage(), this::keepAliveListener);
+ }
+ catch (IOException ioe)
+ {
+ future.cancel(false);
+ }
+ }
+
+ private void keepAliveListener(Future<? super Void> future)
+ {
+ if (future.isSuccess() || future.isCancelled())
+ return;
+
+ logger.debug("{} Could not send keep-alive message (perhaps stream session is finished?).",
+ createLogTag(session, channel), future.cause());
+ }
+ }
+
+ /**
+ * For testing purposes only.
+ */
+ void setClosed()
+ {
+ closed = true;
+ }
+
+ void setControlMessageChannel(Channel channel)
+ {
+ controlMessageChannel = channel;
+ }
+
+ int semaphoreAvailablePermits()
+ {
+ return fileTransferSemaphore.availablePermits();
+ }
+
+ @Override
+ public boolean connected()
+ {
+ return !closed;
+ }
+
+ @Override
+ public void close()
+ {
+ closed = true;
+ logger.debug("{} Closing stream connection channels on {}", createLogTag(session, null), connectionId);
+ channelKeepAlives.stream().map(scheduledFuture -> scheduledFuture.cancel(false));
+ channelKeepAlives.clear();
+
+ threadLocalChannel.values().stream().map(channel -> channel.close());
+ threadLocalChannel.clear();
+ fileTransferExecutor.shutdownNow();
+
+ if (controlMessageChannel != null)
+ controlMessageChannel.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/async/StreamCompressionSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/async/StreamCompressionSerializer.java b/src/java/org/apache/cassandra/streaming/async/StreamCompressionSerializer.java
new file mode 100644
index 0000000..ca15b78
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/async/StreamCompressionSerializer.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming.async;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4FastDecompressor;
+import org.apache.cassandra.io.util.DataInputPlus;
+
+/**
+ * A serialiazer for stream compressed files (see package-level documentation). Much like a typical compressed
+ * output stream, this class operates on buffers or chunks of the data at a a time. The format for each compressed
+ * chunk is as follows:
+ *
+ * - int - compressed payload length
+ * - int - uncompressed payload length
+ * - bytes - compressed payload
+ */
+public class StreamCompressionSerializer
+{
+ private final ByteBufAllocator allocator;
+
+ public StreamCompressionSerializer(ByteBufAllocator allocator)
+ {
+ this.allocator = allocator;
+ }
+
+ /**
+ * Length of heaer data, which includes compressed length, uncompressed length.
+ */
+ private static final int HEADER_LENGTH = 8;
+
+ /**
+ * @return A buffer with decompressed data.
+ */
+ public ByteBuf serialize(LZ4Compressor compressor, ByteBuffer in, int version)
+ {
+ final int uncompressedLength = in.remaining();
+ int maxLength = compressor.maxCompressedLength(uncompressedLength);
+ ByteBuf out = allocator.directBuffer(maxLength);
+ try
+ {
+ ByteBuffer compressedNioBuffer = out.nioBuffer(HEADER_LENGTH, maxLength - HEADER_LENGTH);
+ compressor.compress(in, compressedNioBuffer);
+ final int compressedLength = compressedNioBuffer.position();
+ out.setInt(0, compressedLength);
+ out.setInt(4, uncompressedLength);
+ out.writerIndex(HEADER_LENGTH + compressedLength);
+ }
+ catch (Exception e)
+ {
+ if (out != null)
+ out.release();
+ }
+ return out;
+ }
+
+ /**
+ * @return A buffer with decompressed data.
+ */
+ public ByteBuf deserialize(LZ4FastDecompressor decompressor, DataInputPlus in, int version) throws IOException
+ {
+ final int compressedLength = in.readInt();
+ final int uncompressedLength = in.readInt();
+
+ // there's no guarantee the next compressed block is contained within one buffer in the input,
+ // so hence we need a 'staging' buffer to get all the bytes into one contiguous buffer for the decompressor
+ ByteBuf compressed = null;
+ ByteBuf uncompressed = null;
+ try
+ {
+ final ByteBuffer compressedNioBuffer;
+
+ // ReadableByteChannel allows us to keep the bytes off-heap because we pass a ByteBuffer to RBC.read(BB),
+ // DataInputPlus.read() takes a byte array (thus, an on-heap array).
+ if (in instanceof ReadableByteChannel)
+ {
+ compressed = allocator.directBuffer(compressedLength);
+ compressedNioBuffer = compressed.nioBuffer(0, compressedLength);
+ int readLength = ((ReadableByteChannel) in).read(compressedNioBuffer);
+ assert readLength == compressedNioBuffer.position();
+ compressedNioBuffer.flip();
+ }
+ else
+ {
+ byte[] compressedBytes = new byte[compressedLength];
+ in.readFully(compressedBytes);
+ compressedNioBuffer = ByteBuffer.wrap(compressedBytes);
+ }
+
+ uncompressed = allocator.directBuffer(uncompressedLength);
+ ByteBuffer uncompressedNioBuffer = uncompressed.nioBuffer(0, uncompressedLength);
+ decompressor.decompress(compressedNioBuffer, uncompressedNioBuffer);
+ uncompressed.writerIndex(uncompressedLength);
+ return uncompressed;
+ }
+ catch (Exception e)
+ {
+ if (uncompressed != null)
+ uncompressed.release();
+
+ if (e instanceof IOException)
+ throw e;
+ throw new IOException(e);
+ }
+ finally
+ {
+ if (compressed != null)
+ compressed.release();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org