You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by jasobrown <gi...@git.apache.org> on 2018/08/21 20:46:40 UTC
[GitHub] cassandra pull request #253: 13630
GitHub user jasobrown opened a pull request:
https://github.com/apache/cassandra/pull/253
13630
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/jasobrown/cassandra 13630
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/cassandra/pull/253.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #253
----
commit 2f5d03cc9d8373bab723010f7a280ba21658a6da
Author: Jason Brown <ja...@...>
Date: 2018-07-28T14:09:01Z
Support sanely sending large messages with internode messaging via netty
commit 34eb6afbdfeb9c75685316d22152dde0fd460820
Author: Jason Brown <ja...@...>
Date: 2018-08-20T11:54:13Z
clean up error handling logic in MIH; allow custom timeouts for blocking on the queue in RebufferingByteBufDataInputPlus; fix MIHTest
commit ed9f3093005483e2a1d0b54e6f60c67070a51962
Author: Jason Brown <ja...@...>
Date: 2018-08-20T12:05:30Z
add RebufferingByteBufDataInputPlus timeout test
commit f61ffaa22d87a68ed9a2bed5329240209053cec6
Author: Jason Brown <ja...@...>
Date: 2018-08-20T20:38:21Z
fix ChannelWriterTest to wait for asynchronous blokcing implemenations
commit 3bdec75b5c84f250e4384092f9c3435eca389d3e
Author: Jason Brown <ja...@...>
Date: 2018-08-20T20:39:21Z
point to jasobrown dtest repo
commit 8296bf9c18924659f694418730f17428cf89d5b3
Author: Jason Brown <ja...@...>
Date: 2018-08-21T17:45:04Z
trivial lambda change
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r216103837
--- Diff: src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java ---
@@ -393,6 +393,18 @@ private Channel getOrCreateChannel()
}
}
+ private void onError(Throwable t)
+ {
+ try
+ {
+ session.onError(t).get(5, TimeUnit.MINUTES);
--- End diff --
done
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r216103691
--- Diff: src/java/org/apache/cassandra/net/async/MessageInHandler.java ---
@@ -18,143 +18,296 @@
package org.apache.cassandra.net.async;
-import java.io.DataInputStream;
+import java.io.EOFException;
import java.io.IOException;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.BiConsumer;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
-import com.google.common.primitives.Ints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
-import org.apache.cassandra.io.util.DataInputBuffer;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.exceptions.UnknownTableException;
import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.ParameterType;
-import org.apache.cassandra.utils.vint.VIntCoding;
+import org.apache.cassandra.net.MessageIn.MessageInProcessor;
/**
* Parses incoming messages as per the 4.0 internode messaging protocol.
*/
-public class MessageInHandler extends BaseMessageInHandler
+public class MessageInHandler extends ChannelInboundHandlerAdapter
{
public static final Logger logger = LoggerFactory.getLogger(MessageInHandler.class);
- private MessageHeader messageHeader;
+ private final InetAddressAndPort peer;
- MessageInHandler(InetAddressAndPort peer, int messagingVersion)
+ private final BufferHandler bufferHandler;
+ private volatile boolean closed;
+
+ public MessageInHandler(InetAddressAndPort peer, MessageInProcessor messageProcessor, boolean handlesLargeMessages)
+ {
+ this.peer = peer;
+
+ bufferHandler = handlesLargeMessages
+ ? new BlockingBufferHandler(messageProcessor)
+ : new NonblockingBufferHandler(messageProcessor);
+ }
+
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException
+ {
+ if (!closed)
+ {
+ bufferHandler.channelRead(ctx, (ByteBuf) msg);
+ }
+ else
+ {
+ ReferenceCountUtil.release(msg);
+ ctx.close();
+ }
+ }
+
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+ {
+ if (cause instanceof EOFException)
+ logger.trace("eof reading from socket; closing", cause);
+ else if (cause instanceof UnknownTableException)
+ logger.warn("Got message from unknown table while reading from socket; closing", cause);
+ else if (cause instanceof IOException)
+ logger.trace("IOException reading from socket; closing", cause);
+ else
+ logger.warn("Unexpected exception caught in inbound channel pipeline from " + ctx.channel().remoteAddress(), cause);
+
+ close();
+ ctx.close();
+ }
+
+ public void channelInactive(ChannelHandlerContext ctx)
+ {
+ logger.trace("received channel closed message for peer {} on local addr {}", ctx.channel().remoteAddress(), ctx.channel().localAddress());
+ close();
+ ctx.fireChannelInactive();
+ }
+
+ void close()
{
- this (peer, messagingVersion, MESSAGING_SERVICE_CONSUMER);
+ closed = true;
+ bufferHandler.close();
}
- public MessageInHandler(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
+ boolean isClosed()
{
- super(peer, messagingVersion, messageConsumer);
+ return closed;
+ }
- assert messagingVersion >= MessagingService.VERSION_40 : String.format("wrong messaging version for this handler: got %d, but expect %d or higher",
- messagingVersion, MessagingService.VERSION_40);
- state = State.READ_FIRST_CHUNK;
+ /**
+ * An abstraction around how incoming buffers are handled: either in a non-blocking manner ({@link NonblockingBufferHandler})
+ * or in a blocking manner ({@link BlockingBufferHandler}).
+ *
+ * The methods declared here will only be invoked on the netty event loop.
+ */
+ interface BufferHandler
+ {
+ void channelRead(ChannelHandlerContext ctx, ByteBuf in) throws IOException;
+
+ void close();
}
/**
- * For each new message coming in, builds up a {@link MessageHeader} instance incrementally. This method
- * attempts to deserialize as much header information as it can out of the incoming {@link ByteBuf}, and
- * maintains a trivial state machine to remember progress across invocations.
+ * Processes incoming buffers on the netty event loop, in a non-blocking manner. If buffers are not completely consumed,
+ * it is stashed in {@link #retainedInlineBuffer}, and the next incoming buffer is combined with it.
*/
- @SuppressWarnings("resource")
- public void handleDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception
+ class NonblockingBufferHandler implements BufferHandler
--- End diff --
This can be made into a static inner class.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r215936173
--- Diff: src/java/org/apache/cassandra/net/async/MessageOutHandler.java ---
@@ -140,8 +125,17 @@ public void write(ChannelHandlerContext ctx, Object o, ChannelPromise promise)
out = ctx.alloc().ioBuffer((int)currentFrameSize);
- captureTracingInfo(msg);
- serializeMessage(msg, out);
+ @SuppressWarnings("resource")
+ ByteBufDataOutputPlus outputPlus = new ByteBufDataOutputPlus(out);
+ msg.message.serialize(outputPlus, targetMessagingVersion, connectionId, msg.id, msg.timestampNanos);
+
+ // next few lines are for debugging ... massively helpful!!
+ // if we allocated too much buffer for this message, we'll log here.
+ // if we allocated to little buffer space, we would have hit an exception when trying to write more bytes to it
+ if (out.isWritable())
+ errorLogger.error("{} reported message size {}, actual message size {}, msg {}",
--- End diff --
The `errorLogger` is an instance of `NoSpamLogger`, so we can adjust the log intervals. I'm not sure if making it an assert is better (as that would throw and Exception, and the connection would get killed). As it's mostly a developer-level logging thing, I could just remove it, as well. wdyt?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r216069307
--- Diff: src/java/org/apache/cassandra/net/MessageIn.java ---
@@ -231,4 +241,437 @@ public String toString()
sbuf.append("FROM:").append(from).append(" TYPE:").append(getMessageType()).append(" VERB:").append(verb);
return sbuf.toString();
}
+
+ public static MessageInProcessor getProcessor(InetAddressAndPort peer, int messagingVersion)
+ {
+ return getProcessor(peer, messagingVersion, MessageInProcessor.MESSAGING_SERVICE_CONSUMER);
+
+ }
+
+ public static MessageInProcessor getProcessor(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
+ {
+ return messagingVersion >= MessagingService.VERSION_40
+ ? new MessageInProcessorAsOf40(peer, messagingVersion, messageConsumer)
+ : new MessageInProcessorPre40(peer, messagingVersion, messageConsumer);
+
+ }
+
+ /**
+ * Implementations contain the mechanics and logic of parsing incoming messages. Allows for both non-blocking
+ * and blocking styles of interaction via the {@link #process(ByteBuf)} and {@link #process(RebufferingByteBufDataInputPlus)}
+ * methods, respectively.
+ *
+ * Does not contain the actual deserialization code for message fields nor payload. That is left to the
+ * {@link MessageIn#read(DataInputPlus, int, int)} family of methods.
+ */
+ public static abstract class MessageInProcessor
+ {
+ /**
+ * The current state of deserializing an incoming message. This enum is only used in the nonblocking versions.
+ */
+ public enum State
+ {
+ READ_PREFIX,
+ READ_IP_ADDRESS,
+ READ_VERB,
+ READ_PARAMETERS_SIZE,
+ READ_PARAMETERS_DATA,
+ READ_PAYLOAD_SIZE,
+ READ_PAYLOAD
+ }
+
+ static final int VERB_LENGTH = Integer.BYTES;
+
+ /**
+ * The default target for consuming deserialized {@link MessageIn}.
+ */
+ private static final BiConsumer<MessageIn, Integer> MESSAGING_SERVICE_CONSUMER = (messageIn, id) -> MessagingService.instance().receive(messageIn, id);
+
+ final InetAddressAndPort peer;
+ final int messagingVersion;
+
+ /**
+ * Abstracts out depending directly on {@link MessagingService#receive(MessageIn, int)}; this makes tests more sane
+ * as they don't require nor trigger the entire message processing circus.
+ */
+ final BiConsumer<MessageIn, Integer> messageConsumer;
+
+ /**
+ * Captures the current {@link State} of processing a message. Primarily useful in the non-blocking use case.
+ */
+ State state = State.READ_PREFIX;
+
+ /**
+ * Captures the current data we've parsed out of in incoming message. Primarily useful in the non-blocking use case.
+ */
+ MessageHeader messageHeader;
+
+ /**
+ * Process the buffer in a non-blocking manner. Will try to read out as much of a message(s) as possible,
+ * and send any fully deserialized messages to {@link #messageConsumer}.
+ */
+ public abstract void process(ByteBuf in) throws IOException;
+
+ /**
+ * Process the buffer in a blocking manner. Will read as many messages as possible, blocking for more data,
+ * and send any fully deserialized messages to {@link #messageConsumer}.
+ */
+ public abstract void process(RebufferingByteBufDataInputPlus in) throws IOException;
+
+ MessageInProcessor(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
+ {
+ this.peer = peer;
+ this.messagingVersion = messagingVersion;
+ this.messageConsumer = messageConsumer;
+ }
+
+ /**
+ * Only applicable in the non-blocking use case, and should ony be used for testing!!!
+ */
+ @VisibleForTesting
+ public MessageHeader getMessageHeader()
+ {
+ return messageHeader;
+ }
+
+ /**
+ * A simple struct to hold the message header data as it is being built up.
+ */
+ public static class MessageHeader
+ {
+ public int messageId;
+ long constructionTime;
+ public InetAddressAndPort from;
+ public MessagingService.Verb verb;
+ int payloadSize;
+
+ Map<ParameterType, Object> parameters = Collections.emptyMap();
+
+ /**
+ * Length of the parameter data. If the message's version is {@link MessagingService#VERSION_40} or higher,
+ * this value is the total number of header bytes; else, for legacy messaging, this is the number of
+ * key/value entries in the header.
+ */
+ int parameterLength;
+ }
+
+ MessageHeader readPrefix(DataInputPlus in) throws IOException
+ {
+ MessagingService.validateMagic(in.readInt());
+ MessageHeader messageHeader = new MessageHeader();
+ messageHeader.messageId = in.readInt();
+ int messageTimestamp = in.readInt(); // make sure to read the sent timestamp, even if DatabaseDescriptor.hasCrossNodeTimeout() is not enabled
+ messageHeader.constructionTime = MessageIn.deriveConstructionTime(peer, messageTimestamp, ApproximateTime.currentTimeMillis());
+
+ return messageHeader;
+ }
+ }
+
+ /**
+ * Reads the incoming stream of bytes in the 4.0 format.
+ */
+ static class MessageInProcessorAsOf40 extends MessageInProcessor
+ {
+ MessageInProcessorAsOf40(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
+ {
+ super(peer, messagingVersion, messageConsumer);
+ assert messagingVersion >= MessagingService.VERSION_40;
+ }
+
+ @SuppressWarnings("resource")
+ public void process(ByteBuf in) throws IOException
--- End diff --
done. I also pulled the main loop logic into the base class.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r216106796
--- Diff: src/java/org/apache/cassandra/net/MessageOut.java ---
@@ -180,6 +199,73 @@ public String toString()
return sbuf.toString();
}
+ /**
+ * The main entry point for sending an internode message to a peer node in the cluster.
+ */
+ public void serialize(DataOutputPlus out, int messagingVersion, OutboundConnectionIdentifier destinationId, int id, long timestampNanos) throws IOException
+ {
+ captureTracingInfo(destinationId);
+
+ out.writeInt(MessagingService.PROTOCOL_MAGIC);
+ out.writeInt(id);
+
+ // int cast cuts off the high-order half of the timestamp, which we can assume remains
+ // the same between now and when the recipient reconstructs it.
+ out.writeInt((int) NanoTimeToCurrentTimeMillis.convert(timestampNanos));
+ serialize(out, messagingVersion);
+ }
+
+ /**
+ * Record any tracing data, if enabled on this message.
+ */
+ @VisibleForTesting
+ void captureTracingInfo(OutboundConnectionIdentifier destinationId)
+ {
+ try
+ {
+ UUID sessionId = (UUID)getParameter(ParameterType.TRACE_SESSION);
+ if (sessionId != null)
--- End diff --
I assume `sessionId != null` means that tracing is enabled? Otherwise we should explicitly check whether tracing is enabled.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r215734926
--- Diff: src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java ---
@@ -183,6 +195,11 @@ public int available() throws EOFException
return availableBytes;
}
+ public boolean isEmpty() throws EOFException
+ {
+ return available() == 0;
--- End diff --
The method `available()` has a side effect of `channelConfig.setAutoRead(true)` when the `availableBytes` falls below the `lowWatermark`. Are you sure that is ok?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r216088059
--- Diff: src/java/org/apache/cassandra/net/MessageIn.java ---
@@ -231,4 +241,437 @@ public String toString()
sbuf.append("FROM:").append(from).append(" TYPE:").append(getMessageType()).append(" VERB:").append(verb);
return sbuf.toString();
}
+
+ public static MessageInProcessor getProcessor(InetAddressAndPort peer, int messagingVersion)
+ {
+ return getProcessor(peer, messagingVersion, MessageInProcessor.MESSAGING_SERVICE_CONSUMER);
+
+ }
+
+ public static MessageInProcessor getProcessor(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
+ {
+ return messagingVersion >= MessagingService.VERSION_40
+ ? new MessageInProcessorAsOf40(peer, messagingVersion, messageConsumer)
+ : new MessageInProcessorPre40(peer, messagingVersion, messageConsumer);
+
+ }
+
+ /**
+ * Implementations contain the mechanics and logic of parsing incoming messages. Allows for both non-blocking
+ * and blocking styles of interaction via the {@link #process(ByteBuf)} and {@link #process(RebufferingByteBufDataInputPlus)}
+ * methods, respectively.
+ *
+ * Does not contain the actual deserialization code for message fields nor payload. That is left to the
+ * {@link MessageIn#read(DataInputPlus, int, int)} family of methods.
+ */
+ public static abstract class MessageInProcessor
+ {
+ /**
+ * The current state of deserializing an incoming message. This enum is only used in the nonblocking versions.
+ */
+ public enum State
+ {
+ READ_PREFIX,
+ READ_IP_ADDRESS,
+ READ_VERB,
+ READ_PARAMETERS_SIZE,
+ READ_PARAMETERS_DATA,
+ READ_PAYLOAD_SIZE,
+ READ_PAYLOAD
+ }
+
+ static final int VERB_LENGTH = Integer.BYTES;
+
+ /**
+ * The default target for consuming deserialized {@link MessageIn}.
+ */
+ private static final BiConsumer<MessageIn, Integer> MESSAGING_SERVICE_CONSUMER = (messageIn, id) -> MessagingService.instance().receive(messageIn, id);
+
+ final InetAddressAndPort peer;
+ final int messagingVersion;
+
+ /**
+ * Abstracts out depending directly on {@link MessagingService#receive(MessageIn, int)}; this makes tests more sane
+ * as they don't require nor trigger the entire message processing circus.
+ */
+ final BiConsumer<MessageIn, Integer> messageConsumer;
+
+ /**
+ * Captures the current {@link State} of processing a message. Primarily useful in the non-blocking use case.
+ */
+ State state = State.READ_PREFIX;
+
+ /**
+ * Captures the current data we've parsed out of in incoming message. Primarily useful in the non-blocking use case.
+ */
+ MessageHeader messageHeader;
+
+ /**
+ * Process the buffer in a non-blocking manner. Will try to read out as much of a message(s) as possible,
+ * and send any fully deserialized messages to {@link #messageConsumer}.
+ */
+ public abstract void process(ByteBuf in) throws IOException;
+
+ /**
+ * Process the buffer in a blocking manner. Will read as many messages as possible, blocking for more data,
+ * and send any fully deserialized messages to {@link #messageConsumer}.
+ */
+ public abstract void process(RebufferingByteBufDataInputPlus in) throws IOException;
+
+ MessageInProcessor(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
+ {
+ this.peer = peer;
+ this.messagingVersion = messagingVersion;
+ this.messageConsumer = messageConsumer;
+ }
+
+ /**
+ * Only applicable in the non-blocking use case, and should ony be used for testing!!!
+ */
+ @VisibleForTesting
+ public MessageHeader getMessageHeader()
+ {
+ return messageHeader;
+ }
+
+ /**
+ * A simple struct to hold the message header data as it is being built up.
+ */
+ public static class MessageHeader
+ {
+ public int messageId;
+ long constructionTime;
+ public InetAddressAndPort from;
+ public MessagingService.Verb verb;
+ int payloadSize;
+
+ Map<ParameterType, Object> parameters = Collections.emptyMap();
+
+ /**
+ * Length of the parameter data. If the message's version is {@link MessagingService#VERSION_40} or higher,
+ * this value is the total number of header bytes; else, for legacy messaging, this is the number of
+ * key/value entries in the header.
+ */
+ int parameterLength;
+ }
+
+ MessageHeader readPrefix(DataInputPlus in) throws IOException
+ {
+ MessagingService.validateMagic(in.readInt());
+ MessageHeader messageHeader = new MessageHeader();
+ messageHeader.messageId = in.readInt();
+ int messageTimestamp = in.readInt(); // make sure to read the sent timestamp, even if DatabaseDescriptor.hasCrossNodeTimeout() is not enabled
+ messageHeader.constructionTime = MessageIn.deriveConstructionTime(peer, messageTimestamp, ApproximateTime.currentTimeMillis());
+
+ return messageHeader;
+ }
+ }
+
+ /**
+ * Reads the incoming stream of bytes in the 4.0 format.
+ */
+ static class MessageInProcessorAsOf40 extends MessageInProcessor
+ {
+ MessageInProcessorAsOf40(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
+ {
+ super(peer, messagingVersion, messageConsumer);
+ assert messagingVersion >= MessagingService.VERSION_40;
+ }
+
+ @SuppressWarnings("resource")
+ public void process(ByteBuf in) throws IOException
+ {
+ ByteBufDataInputPlus inputPlus = new ByteBufDataInputPlus(in);
+ while (true)
+ {
+ switch (state)
+ {
+ case READ_PREFIX:
+ if (in.readableBytes() < MessageOut.MESSAGE_PREFIX_SIZE)
+ return;
+ MessageHeader header = readPrefix(inputPlus);
+ if (header == null)
+ return;
+ header.from = peer;
+ messageHeader = header;
+ state = State.READ_VERB;
+ // fall-through
+ case READ_VERB:
+ if (in.readableBytes() < VERB_LENGTH)
+ return;
+ messageHeader.verb = MessagingService.Verb.fromId(in.readInt());
+ state = State.READ_PARAMETERS_SIZE;
+ // fall-through
+ case READ_PARAMETERS_SIZE:
+ long length = VIntCoding.readUnsignedVInt(in);
+ if (length < 0)
+ return;
+ messageHeader.parameterLength = Ints.checkedCast(length);
+ messageHeader.parameters = messageHeader.parameterLength == 0 ? Collections.emptyMap() : new EnumMap<>(ParameterType.class);
+ state = State.READ_PARAMETERS_DATA;
+ // fall-through
+ case READ_PARAMETERS_DATA:
+ if (messageHeader.parameterLength > 0)
+ {
+ if (in.readableBytes() < messageHeader.parameterLength)
+ return;
+ readParameters(inputPlus, messageHeader.parameterLength, messageHeader.parameters);
+ }
+ state = State.READ_PAYLOAD_SIZE;
+ // fall-through
+ case READ_PAYLOAD_SIZE:
+ length = VIntCoding.readUnsignedVInt(in);
+ if (length < 0)
+ return;
+ messageHeader.payloadSize = (int) length;
+ state = State.READ_PAYLOAD;
+ // fall-through
+ case READ_PAYLOAD:
+ if (in.readableBytes() < messageHeader.payloadSize)
+ return;
+
+ MessageIn<Object> messageIn = MessageIn.read(inputPlus, messagingVersion,
+ messageHeader.messageId, messageHeader.constructionTime, messageHeader.from,
+ messageHeader.payloadSize, messageHeader.verb, messageHeader.parameters);
+
+ if (messageIn != null)
+ messageConsumer.accept(messageIn, messageHeader.messageId);
+
+ state = State.READ_PREFIX;
+ messageHeader = null;
+ break;
+ default:
+ throw new IllegalStateException("unknown/unhandled state: " + state);
+ }
+ }
+ }
+
+ private void readParameters(DataInputPlus inputPlus, int parameterLength, Map<ParameterType, Object> parameters) throws IOException
+ {
+ TrackedDataInputPlus inputTracker = new TrackedDataInputPlus(inputPlus);
+
+ while (inputTracker.getBytesRead() < parameterLength)
+ {
+ String key = DataInputStream.readUTF(inputTracker);
+ ParameterType parameterType = ParameterType.byName.get(key);
+ long valueLength = VIntCoding.readUnsignedVInt(inputTracker);
--- End diff --
Unused variable.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r215735492
--- Diff: src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java ---
@@ -393,6 +393,18 @@ private Channel getOrCreateChannel()
}
}
+ private void onError(Throwable t)
+ {
+ try
+ {
+ session.onError(t).get(5, TimeUnit.MINUTES);
--- End diff --
Why do we have a 5 minute timeout? We should pull this out as a constant.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r216157093
--- Diff: src/java/org/apache/cassandra/net/MessageIn.java ---
@@ -231,4 +241,437 @@ public String toString()
sbuf.append("FROM:").append(from).append(" TYPE:").append(getMessageType()).append(" VERB:").append(verb);
return sbuf.toString();
}
+
+ public static MessageInProcessor getProcessor(InetAddressAndPort peer, int messagingVersion)
+ {
+ return getProcessor(peer, messagingVersion, MessageInProcessor.MESSAGING_SERVICE_CONSUMER);
+
+ }
+
+ public static MessageInProcessor getProcessor(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
+ {
+ return messagingVersion >= MessagingService.VERSION_40
+ ? new MessageInProcessorAsOf40(peer, messagingVersion, messageConsumer)
+ : new MessageInProcessorPre40(peer, messagingVersion, messageConsumer);
+
+ }
+
+ /**
+ * Implementations contain the mechanics and logic of parsing incoming messages. Allows for both non-blocking
+ * and blocking styles of interaction via the {@link #process(ByteBuf)} and {@link #process(RebufferingByteBufDataInputPlus)}
+ * methods, respectively.
+ *
+ * Does not contain the actual deserialization code for message fields nor payload. That is left to the
+ * {@link MessageIn#read(DataInputPlus, int, int)} family of methods.
+ */
+ public static abstract class MessageInProcessor
+ {
+ /**
+ * The current state of deserializing an incoming message. This enum is only used in the nonblocking versions.
+ */
+ public enum State
+ {
+ READ_PREFIX,
+ READ_IP_ADDRESS,
+ READ_VERB,
+ READ_PARAMETERS_SIZE,
+ READ_PARAMETERS_DATA,
+ READ_PAYLOAD_SIZE,
+ READ_PAYLOAD
+ }
+
+ static final int VERB_LENGTH = Integer.BYTES;
+
+ /**
+ * The default target for consuming deserialized {@link MessageIn}.
+ */
+ private static final BiConsumer<MessageIn, Integer> MESSAGING_SERVICE_CONSUMER = (messageIn, id) -> MessagingService.instance().receive(messageIn, id);
+
+ final InetAddressAndPort peer;
+ final int messagingVersion;
+
+ /**
+ * Abstracts out depending directly on {@link MessagingService#receive(MessageIn, int)}; this makes tests more sane
+ * as they don't require nor trigger the entire message processing circus.
+ */
+ final BiConsumer<MessageIn, Integer> messageConsumer;
+
+ /**
+ * Captures the current {@link State} of processing a message. Primarily useful in the non-blocking use case.
+ */
+ State state = State.READ_PREFIX;
+
+ /**
+ * Captures the current data we've parsed out of in incoming message. Primarily useful in the non-blocking use case.
+ */
+ MessageHeader messageHeader;
+
+ /**
+ * Process the buffer in a non-blocking manner. Will try to read out as much of a message(s) as possible,
+ * and send any fully deserialized messages to {@link #messageConsumer}.
+ */
+ public abstract void process(ByteBuf in) throws IOException;
+
+ /**
+ * Process the buffer in a blocking manner. Will read as many messages as possible, blocking for more data,
+ * and send any fully deserialized messages to {@link #messageConsumer}.
+ */
+ public abstract void process(RebufferingByteBufDataInputPlus in) throws IOException;
+
+ MessageInProcessor(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
+ {
+ this.peer = peer;
+ this.messagingVersion = messagingVersion;
+ this.messageConsumer = messageConsumer;
+ }
+
+ /**
+ * Only applicable in the non-blocking use case, and should ony be used for testing!!!
+ */
+ @VisibleForTesting
+ public MessageHeader getMessageHeader()
+ {
+ return messageHeader;
+ }
+
+ /**
+ * A simple struct to hold the message header data as it is being built up.
+ */
+ public static class MessageHeader
+ {
+ public int messageId;
+ long constructionTime;
+ public InetAddressAndPort from;
+ public MessagingService.Verb verb;
+ int payloadSize;
+
+ Map<ParameterType, Object> parameters = Collections.emptyMap();
+
+ /**
+ * Length of the parameter data. If the message's version is {@link MessagingService#VERSION_40} or higher,
+ * this value is the total number of header bytes; else, for legacy messaging, this is the number of
+ * key/value entries in the header.
+ */
+ int parameterLength;
+ }
+
+ MessageHeader readPrefix(DataInputPlus in) throws IOException
+ {
+ MessagingService.validateMagic(in.readInt());
+ MessageHeader messageHeader = new MessageHeader();
+ messageHeader.messageId = in.readInt();
+ int messageTimestamp = in.readInt(); // make sure to read the sent timestamp, even if DatabaseDescriptor.hasCrossNodeTimeout() is not enabled
+ messageHeader.constructionTime = MessageIn.deriveConstructionTime(peer, messageTimestamp, ApproximateTime.currentTimeMillis());
+
+ return messageHeader;
+ }
+ }
+
+ /**
+ * Reads the incoming stream of bytes in the 4.0 format.
+ */
+ static class MessageInProcessorAsOf40 extends MessageInProcessor
+ {
+ MessageInProcessorAsOf40(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
+ {
+ super(peer, messagingVersion, messageConsumer);
+ assert messagingVersion >= MessagingService.VERSION_40;
+ }
+
+ @SuppressWarnings("resource")
+ public void process(ByteBuf in) throws IOException
+ {
+ ByteBufDataInputPlus inputPlus = new ByteBufDataInputPlus(in);
+ while (true)
+ {
+ switch (state)
+ {
+ case READ_PREFIX:
+ if (in.readableBytes() < MessageOut.MESSAGE_PREFIX_SIZE)
+ return;
+ MessageHeader header = readPrefix(inputPlus);
+ if (header == null)
+ return;
+ header.from = peer;
+ messageHeader = header;
+ state = State.READ_VERB;
+ // fall-through
+ case READ_VERB:
+ if (in.readableBytes() < VERB_LENGTH)
+ return;
+ messageHeader.verb = MessagingService.Verb.fromId(in.readInt());
+ state = State.READ_PARAMETERS_SIZE;
+ // fall-through
+ case READ_PARAMETERS_SIZE:
+ long length = VIntCoding.readUnsignedVInt(in);
+ if (length < 0)
+ return;
+ messageHeader.parameterLength = Ints.checkedCast(length);
+ messageHeader.parameters = messageHeader.parameterLength == 0 ? Collections.emptyMap() : new EnumMap<>(ParameterType.class);
+ state = State.READ_PARAMETERS_DATA;
+ // fall-through
+ case READ_PARAMETERS_DATA:
+ if (messageHeader.parameterLength > 0)
+ {
+ if (in.readableBytes() < messageHeader.parameterLength)
+ return;
+ readParameters(inputPlus, messageHeader.parameterLength, messageHeader.parameters);
+ }
+ state = State.READ_PAYLOAD_SIZE;
+ // fall-through
+ case READ_PAYLOAD_SIZE:
+ length = VIntCoding.readUnsignedVInt(in);
+ if (length < 0)
+ return;
+ messageHeader.payloadSize = (int) length;
+ state = State.READ_PAYLOAD;
+ // fall-through
+ case READ_PAYLOAD:
+ if (in.readableBytes() < messageHeader.payloadSize)
+ return;
+
+ MessageIn<Object> messageIn = MessageIn.read(inputPlus, messagingVersion,
+ messageHeader.messageId, messageHeader.constructionTime, messageHeader.from,
+ messageHeader.payloadSize, messageHeader.verb, messageHeader.parameters);
+
+ if (messageIn != null)
+ messageConsumer.accept(messageIn, messageHeader.messageId);
+
+ state = State.READ_PREFIX;
+ messageHeader = null;
+ break;
+ default:
+ throw new IllegalStateException("unknown/unhandled state: " + state);
+ }
+ }
+ }
+
+ private void readParameters(DataInputPlus inputPlus, int parameterLength, Map<ParameterType, Object> parameters) throws IOException
+ {
+ TrackedDataInputPlus inputTracker = new TrackedDataInputPlus(inputPlus);
+
+ while (inputTracker.getBytesRead() < parameterLength)
+ {
+ String key = DataInputStream.readUTF(inputTracker);
+ ParameterType parameterType = ParameterType.byName.get(key);
+ long valueLength = VIntCoding.readUnsignedVInt(inputTracker);
+ parameters.put(parameterType, parameterType.serializer.deserialize(inputTracker, messagingVersion));
+ }
+ }
+
+ public void process(RebufferingByteBufDataInputPlus in) throws IOException
+ {
+ while (in.isOpen() && !in.isEmpty())
+ {
+ messageHeader = readPrefix(in);
+ messageHeader.from = peer;
+ messageHeader.verb = MessagingService.Verb.fromId(in.readInt());
+ messageHeader.parameterLength = Ints.checkedCast(VIntCoding.readUnsignedVInt(in));
+ messageHeader.parameters = messageHeader.parameterLength == 0 ? Collections.emptyMap() : new EnumMap<>(ParameterType.class);
+ if (messageHeader.parameterLength > 0)
+ readParameters(in, messageHeader.parameterLength, messageHeader.parameters);
+
+ messageHeader.payloadSize = Ints.checkedCast(VIntCoding.readUnsignedVInt(in));
+ MessageIn<Object> messageIn = MessageIn.read(in, messagingVersion,
+ messageHeader.messageId, messageHeader.constructionTime, messageHeader.from,
+ messageHeader.payloadSize, messageHeader.verb, messageHeader.parameters);
+ if (messageIn != null)
+ messageConsumer.accept(messageIn, messageHeader.messageId);
+ }
+ }
+ }
+
+ /**
+ * Reads the incoming stream of bytes in the pre-4.0 format.
+ */
+ static class MessageInProcessorPre40 extends MessageInProcessor
+ {
+ private static final int PARAMETERS_SIZE_LENGTH = Integer.BYTES;
+ private static final int PARAMETERS_VALUE_SIZE_LENGTH = Integer.BYTES;
+ private static final int PAYLOAD_SIZE_LENGTH = Integer.BYTES;
+
+ MessageInProcessorPre40(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
+ {
+ super(peer, messagingVersion, messageConsumer);
+ assert messagingVersion < MessagingService.VERSION_40;
+ }
+
+ public void process(ByteBuf in) throws IOException
+ {
+ ByteBufDataInputPlus inputPlus = new ByteBufDataInputPlus(in);
+ while (true)
+ {
+ switch (state)
+ {
+ case READ_PREFIX:
+ if (in.readableBytes() < MessageOut.MESSAGE_PREFIX_SIZE)
+ return;
+ MessageHeader header = readPrefix(inputPlus);
+ if (header == null)
+ return;
+ messageHeader = header;
+ state = State.READ_IP_ADDRESS;
+ // fall-through
+ case READ_IP_ADDRESS:
+ // unfortunately, this assumes knowledge of how CompactEndpointSerializationHelper serializes data (the first byte is the size).
+ // first, check that we can actually read the size byte, then check if we can read that number of bytes.
+ // the "+ 1" is to make sure we have the size byte in addition to the serialized IP addr count of bytes in the buffer.
+ int readableBytes = in.readableBytes();
+ if (readableBytes < 1 || readableBytes < in.getByte(in.readerIndex()) + 1)
+ return;
+ messageHeader.from = CompactEndpointSerializationHelper.instance.deserialize(inputPlus, messagingVersion);
+ state = State.READ_VERB;
+ // fall-through
+ case READ_VERB:
+ if (in.readableBytes() < VERB_LENGTH)
+ return;
+ messageHeader.verb = MessagingService.Verb.fromId(in.readInt());
+ state = State.READ_PARAMETERS_SIZE;
+ // fall-through
+ case READ_PARAMETERS_SIZE:
+ if (in.readableBytes() < PARAMETERS_SIZE_LENGTH)
+ return;
+ messageHeader.parameterLength = in.readInt();
+ messageHeader.parameters = messageHeader.parameterLength == 0 ? Collections.emptyMap() : new EnumMap<>(ParameterType.class);
+ state = State.READ_PARAMETERS_DATA;
+ // fall-through
+ case READ_PARAMETERS_DATA:
+ if (messageHeader.parameterLength > 0)
+ {
+ if (!readParameters(in, inputPlus, messageHeader.parameterLength, messageHeader.parameters))
+ return;
+ }
+ state = State.READ_PAYLOAD_SIZE;
+ // fall-through
+ case READ_PAYLOAD_SIZE:
+ if (in.readableBytes() < PAYLOAD_SIZE_LENGTH)
+ return;
+ messageHeader.payloadSize = in.readInt();
+ state = State.READ_PAYLOAD;
+ // fall-through
+ case READ_PAYLOAD:
+ if (in.readableBytes() < messageHeader.payloadSize)
+ return;
+
+ MessageIn<Object> messageIn = MessageIn.read(inputPlus, messagingVersion,
+ messageHeader.messageId, messageHeader.constructionTime, messageHeader.from,
+ messageHeader.payloadSize, messageHeader.verb, messageHeader.parameters);
+
+ if (messageIn != null)
+ messageConsumer.accept(messageIn, messageHeader.messageId);
+
+ state = State.READ_PREFIX;
+ messageHeader = null;
+ break;
+ default:
+ throw new IllegalStateException("unknown/unhandled state: " + state);
+ }
+ }
+ }
+
+ /**
+ * @return <code>true</code> if all the parameters have been read from the {@link ByteBuf}; else, <code>false</code>.
+ */
+ private boolean readParameters(ByteBuf in, ByteBufDataInputPlus inputPlus, int parameterCount, Map<ParameterType, Object> parameters) throws IOException
+ {
+ // makes the assumption that map.size() is a constant time function (HashMap.size() is)
+ while (parameters.size() < parameterCount)
+ {
+ if (!canReadNextParam(in))
+ return false;
+
+ String key = DataInputStream.readUTF(inputPlus);
+ ParameterType parameterType = ParameterType.byName.get(key);
+ byte[] value = new byte[in.readInt()];
+ in.readBytes(value);
+ try (DataInputBuffer buffer = new DataInputBuffer(value))
+ {
+ parameters.put(parameterType, parameterType.serializer.deserialize(buffer, messagingVersion));
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Determine if we can read the next parameter from the {@link ByteBuf}. This method will *always* set the {@code in}
+ * readIndex back to where it was when this method was invoked.
+ * <p>
+ * NOTE: this function would be sooo much simpler if we included a parameters length int in the messaging format,
+ * instead of checking the remaining readable bytes for each field as we're parsing it. c'est la vie ...
+ */
+ @VisibleForTesting
+ boolean canReadNextParam(ByteBuf in)
+ {
+ in.markReaderIndex();
+ // capture the readableBytes value here to avoid all the virtual function calls.
+ // subtract 6 as we know we'll be reading a short and an int (for the utf and value lengths).
+ final int minimumBytesRequired = 6;
+ int readableBytes = in.readableBytes() - minimumBytesRequired;
+ if (readableBytes < 0)
+ return false;
+
+ // this is a tad invasive, but since we know the UTF string is prefaced with a 2-byte length,
+ // read that to make sure we have enough bytes to read the string itself.
+ short strLen = in.readShort();
+ // check if we can read that many bytes for the UTF
+ if (strLen > readableBytes)
+ {
+ in.resetReaderIndex();
+ return false;
+ }
+ in.skipBytes(strLen);
+ readableBytes -= strLen;
+
+ // check if we can read the value length
+ if (readableBytes < PARAMETERS_VALUE_SIZE_LENGTH)
+ {
+ in.resetReaderIndex();
+ return false;
+ }
+ int valueLength = in.readInt();
+ // check if we read that many bytes for the value
+ if (valueLength > readableBytes)
+ {
+ in.resetReaderIndex();
+ return false;
+ }
+
+ in.resetReaderIndex();
+ return true;
+ }
+
+ public void process(RebufferingByteBufDataInputPlus in) throws IOException
+ {
+ while (in.isOpen() && !in.isEmpty())
+ {
+ messageHeader = readPrefix(in);
+ messageHeader.from = CompactEndpointSerializationHelper.instance.deserialize(in, messagingVersion);
+ messageHeader.verb = MessagingService.Verb.fromId(in.readInt());
+ messageHeader.parameterLength = in.readInt();
+ messageHeader.parameters = messageHeader.parameterLength == 0 ? Collections.emptyMap() : new EnumMap<>(ParameterType.class);
+ if (messageHeader.parameterLength > 0)
+ readParameters(in, messageHeader.parameterLength, messageHeader.parameters);
+
+ messageHeader.payloadSize = in.readInt();
+ MessageIn<Object> messageIn = MessageIn.read(in, messagingVersion,
+ messageHeader.messageId, messageHeader.constructionTime, messageHeader.from,
+ messageHeader.payloadSize, messageHeader.verb, messageHeader.parameters);
+ if (messageIn != null)
+ messageConsumer.accept(messageIn, messageHeader.messageId);
+ }
+ }
+
+ private void readParameters(RebufferingByteBufDataInputPlus in, int parameterCount, Map<ParameterType, Object> parameters) throws IOException
+ {
+ // makes the assumption that map.size() is a constant time function (HashMap.size() is)
+ while (parameters.size() < parameterCount)
+ {
+ String key = DataInputStream.readUTF(in);
+ ParameterType parameterType = ParameterType.byName.get(key);
+ int valueLength = in.readInt();
--- End diff --
removed
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r216047177
--- Diff: src/java/org/apache/cassandra/net/async/MessageOutHandler.java ---
@@ -140,8 +125,17 @@ public void write(ChannelHandlerContext ctx, Object o, ChannelPromise promise)
out = ctx.alloc().ioBuffer((int)currentFrameSize);
- captureTracingInfo(msg);
- serializeMessage(msg, out);
+ @SuppressWarnings("resource")
+ ByteBufDataOutputPlus outputPlus = new ByteBufDataOutputPlus(out);
+ msg.message.serialize(outputPlus, targetMessagingVersion, connectionId, msg.id, msg.timestampNanos);
+
+ // next few lines are for debugging ... massively helpful!!
+ // if we allocated too much buffer for this message, we'll log here.
+ // if we allocated to little buffer space, we would have hit an exception when trying to write more bytes to it
+ if (out.isWritable())
+ errorLogger.error("{} reported message size {}, actual message size {}, msg {}",
--- End diff --
It's fine. We can leave it in.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r216157094
--- Diff: src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java ---
@@ -196,10 +220,12 @@ protected void doFlush(int count) throws IOException
int byteCount = buffer.position();
currentBuf.writerIndex(byteCount);
- if (!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, byteCount, 2, TimeUnit.MINUTES))
+ if (!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, byteCount, rateLimiterBlockTime, rateLimiterBlockTimeUnit))
--- End diff --
Good catch. ftr, the only place where this went from 2 up to 5 minutes was in `NettyStreamingMessageSender.FIleSendTask` where I [defaulted to 5 minutes](https://github.com/apache/cassandra/pull/253/files#diff-6d6dd6c9e52acdc8462538a0d3695c0aR327). I've switched that call site to default to 2 minutes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r216157099
--- Diff: src/java/org/apache/cassandra/net/MessageOut.java ---
@@ -180,6 +199,73 @@ public String toString()
return sbuf.toString();
}
+ /**
+ * The main entry point for sending an internode message to a peer node in the cluster.
+ */
+ public void serialize(DataOutputPlus out, int messagingVersion, OutboundConnectionIdentifier destinationId, int id, long timestampNanos) throws IOException
+ {
+ captureTracingInfo(destinationId);
+
+ out.writeInt(MessagingService.PROTOCOL_MAGIC);
+ out.writeInt(id);
+
+ // int cast cuts off the high-order half of the timestamp, which we can assume remains
+ // the same between now and when the recipient reconstructs it.
+ out.writeInt((int) NanoTimeToCurrentTimeMillis.convert(timestampNanos));
+ serialize(out, messagingVersion);
+ }
+
+ /**
+ * Record any tracing data, if enabled on this message.
+ */
+ @VisibleForTesting
+ void captureTracingInfo(OutboundConnectionIdentifier destinationId)
+ {
+ try
+ {
+ UUID sessionId = (UUID)getParameter(ParameterType.TRACE_SESSION);
--- End diff --
fixed
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r215955652
--- Diff: src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java ---
@@ -183,6 +195,11 @@ public int available() throws EOFException
return availableBytes;
}
+ public boolean isEmpty() throws EOFException
+ {
+ return available() == 0;
--- End diff --
So, looking at this again, I needed to figure out why `available()` does the autoRead check. For posterity, here's the reasoning behind it:
There a case where we enqueue a buffer, and disable autoRead. Then read all of the buffer exactly (and no more bytes are available to read in the instance). In an effort to not block indefinitely (so I could check if `StreamingInboundHandler.close` is true), i called available() (and did the autoRead check) instead of attempt to read a value from the inputPlus (which would call `reBuffer()` and block either indefinitely or throw an exception if I added a timeout). I didn't want the indefinite block nor an exception to be thrown, so I bolted the autoRead check onto `available()`.
I think it still makes sense to move this into a separate method call.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r216103840
--- Diff: src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java ---
@@ -36,6 +37,11 @@
public class RebufferingByteBufDataInputPlus extends RebufferingInputStream implements ReadableByteChannel
{
+ /**
+ * Default to a very large value.
+ */
+ private static final long DEFAULT_REBUFFER_BLOCK_IN_MILLIS = TimeUnit.DAYS.toMillis(2);
--- End diff --
I've lowered it to 3 minutes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r215733891
--- Diff: src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java ---
@@ -36,6 +37,11 @@
public class RebufferingByteBufDataInputPlus extends RebufferingInputStream implements ReadableByteChannel
{
+ /**
+ * Default to a very large value.
+ */
+ private static final long DEFAULT_REBUFFER_BLOCK_IN_MILLIS = TimeUnit.DAYS.toMillis(2);
--- End diff --
Its risky to default to a very large value. Why do we want to default to such a large value?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r216106193
--- Diff: src/java/org/apache/cassandra/net/MessageOut.java ---
@@ -180,6 +199,73 @@ public String toString()
return sbuf.toString();
}
+ /**
+ * The main entry point for sending an internode message to a peer node in the cluster.
+ */
+ public void serialize(DataOutputPlus out, int messagingVersion, OutboundConnectionIdentifier destinationId, int id, long timestampNanos) throws IOException
+ {
+ captureTracingInfo(destinationId);
+
+ out.writeInt(MessagingService.PROTOCOL_MAGIC);
+ out.writeInt(id);
+
+ // int cast cuts off the high-order half of the timestamp, which we can assume remains
+ // the same between now and when the recipient reconstructs it.
+ out.writeInt((int) NanoTimeToCurrentTimeMillis.convert(timestampNanos));
+ serialize(out, messagingVersion);
+ }
+
+ /**
+ * Record any tracing data, if enabled on this message.
+ */
+ @VisibleForTesting
+ void captureTracingInfo(OutboundConnectionIdentifier destinationId)
+ {
+ try
+ {
+ UUID sessionId = (UUID)getParameter(ParameterType.TRACE_SESSION);
--- End diff --
Nit: Spacing
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r216118578
--- Diff: src/java/org/apache/cassandra/net/MessageOut.java ---
@@ -180,6 +199,73 @@ public String toString()
return sbuf.toString();
}
+ /**
+ * The main entry point for sending an internode message to a peer node in the cluster.
+ */
+ public void serialize(DataOutputPlus out, int messagingVersion, OutboundConnectionIdentifier destinationId, int id, long timestampNanos) throws IOException
+ {
+ captureTracingInfo(destinationId);
+
+ out.writeInt(MessagingService.PROTOCOL_MAGIC);
+ out.writeInt(id);
+
+ // int cast cuts off the high-order half of the timestamp, which we can assume remains
+ // the same between now and when the recipient reconstructs it.
+ out.writeInt((int) NanoTimeToCurrentTimeMillis.convert(timestampNanos));
+ serialize(out, messagingVersion);
+ }
+
+ /**
+ * Record any tracing data, if enabled on this message.
+ */
+ @VisibleForTesting
+ void captureTracingInfo(OutboundConnectionIdentifier destinationId)
+ {
+ try
+ {
+ UUID sessionId = (UUID)getParameter(ParameterType.TRACE_SESSION);
+ if (sessionId != null)
--- End diff --
Yes. Further, this method is basically taken verbatim from 3.11's [OutboundTcpConnection.writeConnected()](https://github.com/apache/cassandra/blob/cassandra-3.11/src/java/org/apache/cassandra/net/OutboundTcpConnection.java#L312) method.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r215939675
--- Diff: src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java ---
@@ -36,6 +37,11 @@
public class RebufferingByteBufDataInputPlus extends RebufferingInputStream implements ReadableByteChannel
{
+ /**
+ * Default to a very large value.
+ */
+ private static final long DEFAULT_REBUFFER_BLOCK_IN_MILLIS = TimeUnit.DAYS.toMillis(2);
--- End diff --
The corresponding pre-4.0 code just does blocking IO on a socket, with no timeouts. Thus it blocks forever. 2 days seems shorter than forever, but I'm game to change this to any time-bounded value.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r216105848
--- Diff: src/java/org/apache/cassandra/net/MessageOut.java ---
@@ -180,6 +199,73 @@ public String toString()
return sbuf.toString();
}
+ /**
+ * The main entry point for sending an internode message to a peer node in the cluster.
+ */
+ public void serialize(DataOutputPlus out, int messagingVersion, OutboundConnectionIdentifier destinationId, int id, long timestampNanos) throws IOException
+ {
+ captureTracingInfo(destinationId);
+
+ out.writeInt(MessagingService.PROTOCOL_MAGIC);
+ out.writeInt(id);
+
+ // int cast cuts off the high-order half of the timestamp, which we can assume remains
+ // the same between now and when the recipient reconstructs it.
+ out.writeInt((int) NanoTimeToCurrentTimeMillis.convert(timestampNanos));
+ serialize(out, messagingVersion);
+ }
+
+ /**
+ * Record any tracing data, if enabled on this message.
+ */
+ @VisibleForTesting
+ void captureTracingInfo(OutboundConnectionIdentifier destinationId)
+ {
+ try
+ {
+ UUID sessionId = (UUID)getParameter(ParameterType.TRACE_SESSION);
+ if (sessionId != null)
+ {
+ TraceState state = Tracing.instance.get(sessionId);
+ String logMessage = String.format("Sending %s message to %s", verb, destinationId.connectionAddress());
+ // session may have already finished; see CASSANDRA-5668
+ if (state == null)
+ {
+ Tracing.TraceType traceType = (Tracing.TraceType)getParameter(ParameterType.TRACE_TYPE);
+ traceType = traceType == null ? Tracing.TraceType.QUERY : traceType;
+ Tracing.instance.trace(ByteBuffer.wrap(UUIDGen.decompose(sessionId)), logMessage, traceType.getTTL());
+ }
+ else
+ {
+ state.trace(logMessage);
+ if (verb == MessagingService.Verb.REQUEST_RESPONSE)
+ Tracing.instance.doneWithNonLocalSession(state);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ logger.warn("failed to capture the tracing info for an outbound message to {}, ignoring", destinationId, e);
+ }
+ }
+
+ private Object getParameter(ParameterType type)
+ {
+ for (int ii = 0; ii < parameters.size(); ii += PARAMETER_TUPLE_SIZE)
+ {
+ if (((ParameterType)parameters.get(ii + PARAMETER_TUPLE_TYPE_OFFSET)).equals(type))
--- End diff --
Don't need the typecast to `ParameterType`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r216102238
--- Diff: test/unit/org/apache/cassandra/net/MessageInProcessorPre40Test.java ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.MessageIn.MessageInProcessorPre40;
+import org.apache.cassandra.net.async.ByteBufDataOutputPlus;
+
+public class MessageInProcessorPre40Test
+{
+ private static InetAddressAndPort addr;
+
+ private MessageInProcessorPre40 processor;
+ private ByteBuf buf;
+
+ @BeforeClass
+ public static void before() throws UnknownHostException
+ {
+ DatabaseDescriptor.daemonInitialization();
+ addr = InetAddressAndPort.getByName("127.0.0.1");
+ }
+
+ @Before
+ public void setup()
+ {
+ processor = new MessageInProcessorPre40(addr, MessagingService.VERSION_30, (messageIn, integer) -> {});
+ }
+
+ @After
+ public void tearDown()
+ {
+ if (buf != null && buf.refCnt() > 0)
+ buf.release();
+ }
+
+ @Test
+ public void canReadNextParam_HappyPath() throws IOException
+ {
+ buildParamBufPre40(13);
+ Assert.assertTrue(processor.canReadNextParam(buf));
+ }
+
+ @Test
+ public void canReadNextParam_OnlyFirstByte() throws IOException
+ {
+ buildParamBufPre40(13);
+ buf.writerIndex(1);
+ Assert.assertFalse(processor.canReadNextParam(buf));
+ }
+
+ @Test
+ public void canReadNextParam_PartialUTF() throws IOException
+ {
+ buildParamBufPre40(13);
+ buf.writerIndex(5);
+ Assert.assertFalse(processor.canReadNextParam(buf));
+ }
+
+ @Test
+ public void canReadNextParam_TruncatedValueLength() throws IOException
+ {
+ buildParamBufPre40(13);
+ buf.writerIndex(buf.writerIndex() - 13 - 2);
+ Assert.assertFalse(processor.canReadNextParam(buf));
+ }
+
+ @Test
+ public void canReadNextParam_MissingLastBytes() throws IOException
+ {
+ buildParamBufPre40(13);
+ buf.writerIndex(buf.writerIndex() - 2);
+ Assert.assertFalse(processor.canReadNextParam(buf));
+ }
+
+ private void buildParamBufPre40(int valueLength) throws IOException
+ {
+ buf = Unpooled.buffer(1024, 1024); // 1k should be enough for everybody!
--- End diff --
🤣
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #253: CASSANDRA-13630
Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r216185533
--- Diff: test/unit/org/apache/cassandra/net/async/ChannelWriterTest.java ---
@@ -70,32 +86,71 @@ public static void before()
@Before
public void setup()
{
- OutboundConnectionIdentifier id = OutboundConnectionIdentifier.small(InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 0),
- InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 0));
+ if (type == ChannelWriterType.LARGE_MESSAGE)
+ {
+ id = OutboundConnectionIdentifier.large(InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 0),
+ InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 0));
+ }
+ else
+ {
+ id = OutboundConnectionIdentifier.small(InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 0),
+ InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 0));
+ }
+
channel = new EmbeddedChannel();
omc = new NonSendingOutboundMessagingConnection(id, null, Optional.empty());
- channelWriter = ChannelWriter.create(channel, omc::handleMessageResult, Optional.empty());
- channel.pipeline().addFirst(new MessageOutHandler(id, MessagingService.current_version, channelWriter, () -> null));
- coalescingStrategy = CoalescingStrategies.newCoalescingStrategy(CoalescingStrategies.Strategy.FIXED.name(), COALESCE_WINDOW_MS, null, "test");
+ wrapper = new MessageOutWrapper();
+ OutboundConnectionParams.Builder builder = OutboundConnectionParams.builder()
+ .messageResultConsumer(this::handleMessageResult)
+ .coalescingStrategy(Optional.empty())
+ .protocolVersion(MessagingService.current_version)
+ .connectionId(id);
+
+ if (type == ChannelWriterType.COALESCING)
+ coalescingStrategy = CoalescingStrategies.newCoalescingStrategy(CoalescingStrategies.Strategy.FIXED.name(), COALESCE_WINDOW_MS, null, "test");
+ else
+ coalescingStrategy = Optional.empty();
--- End diff --
Wait for CASSANDRA-14503 :)
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #253: CASSANDRA-13630
Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r216180830
--- Diff: test/unit/org/apache/cassandra/net/async/ChannelWriterTest.java ---
@@ -70,32 +86,71 @@ public static void before()
@Before
public void setup()
{
- OutboundConnectionIdentifier id = OutboundConnectionIdentifier.small(InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 0),
- InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 0));
+ if (type == ChannelWriterType.LARGE_MESSAGE)
+ {
+ id = OutboundConnectionIdentifier.large(InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 0),
+ InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 0));
+ }
+ else
+ {
+ id = OutboundConnectionIdentifier.small(InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 0),
+ InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 0));
+ }
+
channel = new EmbeddedChannel();
omc = new NonSendingOutboundMessagingConnection(id, null, Optional.empty());
- channelWriter = ChannelWriter.create(channel, omc::handleMessageResult, Optional.empty());
- channel.pipeline().addFirst(new MessageOutHandler(id, MessagingService.current_version, channelWriter, () -> null));
- coalescingStrategy = CoalescingStrategies.newCoalescingStrategy(CoalescingStrategies.Strategy.FIXED.name(), COALESCE_WINDOW_MS, null, "test");
+ wrapper = new MessageOutWrapper();
+ OutboundConnectionParams.Builder builder = OutboundConnectionParams.builder()
+ .messageResultConsumer(this::handleMessageResult)
+ .coalescingStrategy(Optional.empty())
+ .protocolVersion(MessagingService.current_version)
+ .connectionId(id);
+
+ if (type == ChannelWriterType.COALESCING)
+ coalescingStrategy = CoalescingStrategies.newCoalescingStrategy(CoalescingStrategies.Strategy.FIXED.name(), COALESCE_WINDOW_MS, null, "test");
+ else
+ coalescingStrategy = Optional.empty();
--- End diff --
This is beyond the scope of this PR, however, we should refactor `OutboundConnectionParams` to not store an `Optional`.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r216157096
--- Diff: src/java/org/apache/cassandra/net/async/MessageInHandler.java ---
@@ -18,143 +18,296 @@
package org.apache.cassandra.net.async;
-import java.io.DataInputStream;
+import java.io.EOFException;
import java.io.IOException;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.BiConsumer;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
-import com.google.common.primitives.Ints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
-import org.apache.cassandra.io.util.DataInputBuffer;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.exceptions.UnknownTableException;
import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.ParameterType;
-import org.apache.cassandra.utils.vint.VIntCoding;
+import org.apache.cassandra.net.MessageIn.MessageInProcessor;
/**
* Parses incoming messages as per the 4.0 internode messaging protocol.
*/
-public class MessageInHandler extends BaseMessageInHandler
+public class MessageInHandler extends ChannelInboundHandlerAdapter
{
public static final Logger logger = LoggerFactory.getLogger(MessageInHandler.class);
- private MessageHeader messageHeader;
+ private final InetAddressAndPort peer;
- MessageInHandler(InetAddressAndPort peer, int messagingVersion)
+ private final BufferHandler bufferHandler;
+ private volatile boolean closed;
+
+ public MessageInHandler(InetAddressAndPort peer, MessageInProcessor messageProcessor, boolean handlesLargeMessages)
+ {
+ this.peer = peer;
+
+ bufferHandler = handlesLargeMessages
+ ? new BlockingBufferHandler(messageProcessor)
+ : new NonblockingBufferHandler(messageProcessor);
+ }
+
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException
+ {
+ if (!closed)
+ {
+ bufferHandler.channelRead(ctx, (ByteBuf) msg);
+ }
+ else
+ {
+ ReferenceCountUtil.release(msg);
+ ctx.close();
+ }
+ }
+
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+ {
+ if (cause instanceof EOFException)
+ logger.trace("eof reading from socket; closing", cause);
+ else if (cause instanceof UnknownTableException)
+ logger.warn("Got message from unknown table while reading from socket; closing", cause);
+ else if (cause instanceof IOException)
+ logger.trace("IOException reading from socket; closing", cause);
+ else
+ logger.warn("Unexpected exception caught in inbound channel pipeline from " + ctx.channel().remoteAddress(), cause);
+
+ close();
+ ctx.close();
+ }
+
+ public void channelInactive(ChannelHandlerContext ctx)
+ {
+ logger.trace("received channel closed message for peer {} on local addr {}", ctx.channel().remoteAddress(), ctx.channel().localAddress());
+ close();
+ ctx.fireChannelInactive();
+ }
+
+ void close()
{
- this (peer, messagingVersion, MESSAGING_SERVICE_CONSUMER);
+ closed = true;
+ bufferHandler.close();
}
- public MessageInHandler(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
+ boolean isClosed()
{
- super(peer, messagingVersion, messageConsumer);
+ return closed;
+ }
- assert messagingVersion >= MessagingService.VERSION_40 : String.format("wrong messaging version for this handler: got %d, but expect %d or higher",
- messagingVersion, MessagingService.VERSION_40);
- state = State.READ_FIRST_CHUNK;
+ /**
+ * An abstraction around how incoming buffers are handled: either in a non-blocking manner ({@link NonblockingBufferHandler})
+ * or in a blocking manner ({@link BlockingBufferHandler}).
+ *
+ * The methods declared here will only be invoked on the netty event loop.
+ */
+ interface BufferHandler
+ {
+ void channelRead(ChannelHandlerContext ctx, ByteBuf in) throws IOException;
+
+ void close();
}
/**
- * For each new message coming in, builds up a {@link MessageHeader} instance incrementally. This method
- * attempts to deserialize as much header information as it can out of the incoming {@link ByteBuf}, and
- * maintains a trivial state machine to remember progress across invocations.
+ * Processes incoming buffers on the netty event loop, in a non-blocking manner. If buffers are not completely consumed,
+ * it is stashed in {@link #retainedInlineBuffer}, and the next incoming buffer is combined with it.
*/
- @SuppressWarnings("resource")
- public void handleDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception
+ class NonblockingBufferHandler implements BufferHandler
--- End diff --
done
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r216118489
--- Diff: src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java ---
@@ -196,10 +220,12 @@ protected void doFlush(int count) throws IOException
int byteCount = buffer.position();
currentBuf.writerIndex(byteCount);
- if (!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, byteCount, 2, TimeUnit.MINUTES))
+ if (!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, byteCount, rateLimiterBlockTime, rateLimiterBlockTimeUnit))
--- End diff --
yes, i intentionally set it to 2 minutes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r216087977
--- Diff: src/java/org/apache/cassandra/net/MessageIn.java ---
@@ -231,4 +241,437 @@ public String toString()
sbuf.append("FROM:").append(from).append(" TYPE:").append(getMessageType()).append(" VERB:").append(verb);
return sbuf.toString();
}
+
+ public static MessageInProcessor getProcessor(InetAddressAndPort peer, int messagingVersion)
+ {
+ return getProcessor(peer, messagingVersion, MessageInProcessor.MESSAGING_SERVICE_CONSUMER);
+
+ }
+
+ public static MessageInProcessor getProcessor(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
+ {
+ return messagingVersion >= MessagingService.VERSION_40
+ ? new MessageInProcessorAsOf40(peer, messagingVersion, messageConsumer)
+ : new MessageInProcessorPre40(peer, messagingVersion, messageConsumer);
+
+ }
+
+ /**
+ * Implementations contain the mechanics and logic of parsing incoming messages. Allows for both non-blocking
+ * and blocking styles of interaction via the {@link #process(ByteBuf)} and {@link #process(RebufferingByteBufDataInputPlus)}
+ * methods, respectively.
+ *
+ * Does not contain the actual deserialization code for message fields nor payload. That is left to the
+ * {@link MessageIn#read(DataInputPlus, int, int)} family of methods.
+ */
+ public static abstract class MessageInProcessor
+ {
+ /**
+ * The current state of deserializing an incoming message. This enum is only used in the nonblocking versions.
+ */
+ public enum State
+ {
+ READ_PREFIX,
+ READ_IP_ADDRESS,
+ READ_VERB,
+ READ_PARAMETERS_SIZE,
+ READ_PARAMETERS_DATA,
+ READ_PAYLOAD_SIZE,
+ READ_PAYLOAD
+ }
+
+ static final int VERB_LENGTH = Integer.BYTES;
+
+ /**
+ * The default target for consuming deserialized {@link MessageIn}.
+ */
+ private static final BiConsumer<MessageIn, Integer> MESSAGING_SERVICE_CONSUMER = (messageIn, id) -> MessagingService.instance().receive(messageIn, id);
+
+ final InetAddressAndPort peer;
+ final int messagingVersion;
+
+ /**
+ * Abstracts out depending directly on {@link MessagingService#receive(MessageIn, int)}; this makes tests more sane
+ * as they don't require nor trigger the entire message processing circus.
+ */
+ final BiConsumer<MessageIn, Integer> messageConsumer;
+
+ /**
+ * Captures the current {@link State} of processing a message. Primarily useful in the non-blocking use case.
+ */
+ State state = State.READ_PREFIX;
+
+ /**
+ * Captures the current data we've parsed out of in incoming message. Primarily useful in the non-blocking use case.
+ */
+ MessageHeader messageHeader;
+
+ /**
+ * Process the buffer in a non-blocking manner. Will try to read out as much of a message(s) as possible,
+ * and send any fully deserialized messages to {@link #messageConsumer}.
+ */
+ public abstract void process(ByteBuf in) throws IOException;
+
+ /**
+ * Process the buffer in a blocking manner. Will read as many messages as possible, blocking for more data,
+ * and send any fully deserialized messages to {@link #messageConsumer}.
+ */
+ public abstract void process(RebufferingByteBufDataInputPlus in) throws IOException;
+
+ MessageInProcessor(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
+ {
+ this.peer = peer;
+ this.messagingVersion = messagingVersion;
+ this.messageConsumer = messageConsumer;
+ }
+
+ /**
+ * Only applicable in the non-blocking use case, and should ony be used for testing!!!
+ */
+ @VisibleForTesting
+ public MessageHeader getMessageHeader()
+ {
+ return messageHeader;
+ }
+
+ /**
+ * A simple struct to hold the message header data as it is being built up.
+ */
+ public static class MessageHeader
+ {
+ public int messageId;
+ long constructionTime;
+ public InetAddressAndPort from;
+ public MessagingService.Verb verb;
+ int payloadSize;
+
+ Map<ParameterType, Object> parameters = Collections.emptyMap();
+
+ /**
+ * Length of the parameter data. If the message's version is {@link MessagingService#VERSION_40} or higher,
+ * this value is the total number of header bytes; else, for legacy messaging, this is the number of
+ * key/value entries in the header.
+ */
+ int parameterLength;
+ }
+
+ MessageHeader readPrefix(DataInputPlus in) throws IOException
+ {
+ MessagingService.validateMagic(in.readInt());
+ MessageHeader messageHeader = new MessageHeader();
+ messageHeader.messageId = in.readInt();
+ int messageTimestamp = in.readInt(); // make sure to read the sent timestamp, even if DatabaseDescriptor.hasCrossNodeTimeout() is not enabled
+ messageHeader.constructionTime = MessageIn.deriveConstructionTime(peer, messageTimestamp, ApproximateTime.currentTimeMillis());
+
+ return messageHeader;
+ }
+ }
+
+ /**
+ * Reads the incoming stream of bytes in the 4.0 format.
+ */
+ static class MessageInProcessorAsOf40 extends MessageInProcessor
+ {
+ MessageInProcessorAsOf40(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
+ {
+ super(peer, messagingVersion, messageConsumer);
+ assert messagingVersion >= MessagingService.VERSION_40;
+ }
+
+ @SuppressWarnings("resource")
+ public void process(ByteBuf in) throws IOException
+ {
+ ByteBufDataInputPlus inputPlus = new ByteBufDataInputPlus(in);
+ while (true)
+ {
+ switch (state)
+ {
+ case READ_PREFIX:
+ if (in.readableBytes() < MessageOut.MESSAGE_PREFIX_SIZE)
+ return;
+ MessageHeader header = readPrefix(inputPlus);
+ if (header == null)
+ return;
+ header.from = peer;
+ messageHeader = header;
+ state = State.READ_VERB;
+ // fall-through
+ case READ_VERB:
+ if (in.readableBytes() < VERB_LENGTH)
+ return;
+ messageHeader.verb = MessagingService.Verb.fromId(in.readInt());
+ state = State.READ_PARAMETERS_SIZE;
+ // fall-through
+ case READ_PARAMETERS_SIZE:
+ long length = VIntCoding.readUnsignedVInt(in);
+ if (length < 0)
+ return;
+ messageHeader.parameterLength = Ints.checkedCast(length);
+ messageHeader.parameters = messageHeader.parameterLength == 0 ? Collections.emptyMap() : new EnumMap<>(ParameterType.class);
+ state = State.READ_PARAMETERS_DATA;
+ // fall-through
+ case READ_PARAMETERS_DATA:
+ if (messageHeader.parameterLength > 0)
+ {
+ if (in.readableBytes() < messageHeader.parameterLength)
+ return;
+ readParameters(inputPlus, messageHeader.parameterLength, messageHeader.parameters);
+ }
+ state = State.READ_PAYLOAD_SIZE;
+ // fall-through
+ case READ_PAYLOAD_SIZE:
+ length = VIntCoding.readUnsignedVInt(in);
+ if (length < 0)
+ return;
+ messageHeader.payloadSize = (int) length;
+ state = State.READ_PAYLOAD;
+ // fall-through
+ case READ_PAYLOAD:
+ if (in.readableBytes() < messageHeader.payloadSize)
+ return;
+
+ MessageIn<Object> messageIn = MessageIn.read(inputPlus, messagingVersion,
+ messageHeader.messageId, messageHeader.constructionTime, messageHeader.from,
+ messageHeader.payloadSize, messageHeader.verb, messageHeader.parameters);
+
+ if (messageIn != null)
+ messageConsumer.accept(messageIn, messageHeader.messageId);
+
+ state = State.READ_PREFIX;
+ messageHeader = null;
+ break;
+ default:
+ throw new IllegalStateException("unknown/unhandled state: " + state);
+ }
+ }
+ }
+
+ private void readParameters(DataInputPlus inputPlus, int parameterLength, Map<ParameterType, Object> parameters) throws IOException
+ {
+ TrackedDataInputPlus inputTracker = new TrackedDataInputPlus(inputPlus);
+
+ while (inputTracker.getBytesRead() < parameterLength)
+ {
+ String key = DataInputStream.readUTF(inputTracker);
+ ParameterType parameterType = ParameterType.byName.get(key);
+ long valueLength = VIntCoding.readUnsignedVInt(inputTracker);
+ parameters.put(parameterType, parameterType.serializer.deserialize(inputTracker, messagingVersion));
+ }
+ }
+
+ public void process(RebufferingByteBufDataInputPlus in) throws IOException
+ {
+ while (in.isOpen() && !in.isEmpty())
+ {
+ messageHeader = readPrefix(in);
+ messageHeader.from = peer;
+ messageHeader.verb = MessagingService.Verb.fromId(in.readInt());
+ messageHeader.parameterLength = Ints.checkedCast(VIntCoding.readUnsignedVInt(in));
+ messageHeader.parameters = messageHeader.parameterLength == 0 ? Collections.emptyMap() : new EnumMap<>(ParameterType.class);
+ if (messageHeader.parameterLength > 0)
+ readParameters(in, messageHeader.parameterLength, messageHeader.parameters);
+
+ messageHeader.payloadSize = Ints.checkedCast(VIntCoding.readUnsignedVInt(in));
+ MessageIn<Object> messageIn = MessageIn.read(in, messagingVersion,
+ messageHeader.messageId, messageHeader.constructionTime, messageHeader.from,
+ messageHeader.payloadSize, messageHeader.verb, messageHeader.parameters);
+ if (messageIn != null)
+ messageConsumer.accept(messageIn, messageHeader.messageId);
+ }
+ }
+ }
+
+ /**
+ * Reads the incoming stream of bytes in the pre-4.0 format.
+ */
+ static class MessageInProcessorPre40 extends MessageInProcessor
+ {
+ private static final int PARAMETERS_SIZE_LENGTH = Integer.BYTES;
+ private static final int PARAMETERS_VALUE_SIZE_LENGTH = Integer.BYTES;
+ private static final int PAYLOAD_SIZE_LENGTH = Integer.BYTES;
+
+ MessageInProcessorPre40(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
+ {
+ super(peer, messagingVersion, messageConsumer);
+ assert messagingVersion < MessagingService.VERSION_40;
+ }
+
+ public void process(ByteBuf in) throws IOException
+ {
+ ByteBufDataInputPlus inputPlus = new ByteBufDataInputPlus(in);
+ while (true)
+ {
+ switch (state)
+ {
+ case READ_PREFIX:
+ if (in.readableBytes() < MessageOut.MESSAGE_PREFIX_SIZE)
+ return;
+ MessageHeader header = readPrefix(inputPlus);
+ if (header == null)
+ return;
+ messageHeader = header;
+ state = State.READ_IP_ADDRESS;
+ // fall-through
+ case READ_IP_ADDRESS:
+ // unfortunately, this assumes knowledge of how CompactEndpointSerializationHelper serializes data (the first byte is the size).
+ // first, check that we can actually read the size byte, then check if we can read that number of bytes.
+ // the "+ 1" is to make sure we have the size byte in addition to the serialized IP addr count of bytes in the buffer.
+ int readableBytes = in.readableBytes();
+ if (readableBytes < 1 || readableBytes < in.getByte(in.readerIndex()) + 1)
+ return;
+ messageHeader.from = CompactEndpointSerializationHelper.instance.deserialize(inputPlus, messagingVersion);
+ state = State.READ_VERB;
+ // fall-through
+ case READ_VERB:
+ if (in.readableBytes() < VERB_LENGTH)
+ return;
+ messageHeader.verb = MessagingService.Verb.fromId(in.readInt());
+ state = State.READ_PARAMETERS_SIZE;
+ // fall-through
+ case READ_PARAMETERS_SIZE:
+ if (in.readableBytes() < PARAMETERS_SIZE_LENGTH)
+ return;
+ messageHeader.parameterLength = in.readInt();
+ messageHeader.parameters = messageHeader.parameterLength == 0 ? Collections.emptyMap() : new EnumMap<>(ParameterType.class);
+ state = State.READ_PARAMETERS_DATA;
+ // fall-through
+ case READ_PARAMETERS_DATA:
+ if (messageHeader.parameterLength > 0)
+ {
+ if (!readParameters(in, inputPlus, messageHeader.parameterLength, messageHeader.parameters))
+ return;
+ }
+ state = State.READ_PAYLOAD_SIZE;
+ // fall-through
+ case READ_PAYLOAD_SIZE:
+ if (in.readableBytes() < PAYLOAD_SIZE_LENGTH)
+ return;
+ messageHeader.payloadSize = in.readInt();
+ state = State.READ_PAYLOAD;
+ // fall-through
+ case READ_PAYLOAD:
+ if (in.readableBytes() < messageHeader.payloadSize)
+ return;
+
+ MessageIn<Object> messageIn = MessageIn.read(inputPlus, messagingVersion,
+ messageHeader.messageId, messageHeader.constructionTime, messageHeader.from,
+ messageHeader.payloadSize, messageHeader.verb, messageHeader.parameters);
+
+ if (messageIn != null)
+ messageConsumer.accept(messageIn, messageHeader.messageId);
+
+ state = State.READ_PREFIX;
+ messageHeader = null;
+ break;
+ default:
+ throw new IllegalStateException("unknown/unhandled state: " + state);
+ }
+ }
+ }
+
+ /**
+ * @return <code>true</code> if all the parameters have been read from the {@link ByteBuf}; else, <code>false</code>.
+ */
+ private boolean readParameters(ByteBuf in, ByteBufDataInputPlus inputPlus, int parameterCount, Map<ParameterType, Object> parameters) throws IOException
+ {
+ // makes the assumption that map.size() is a constant time function (HashMap.size() is)
+ while (parameters.size() < parameterCount)
+ {
+ if (!canReadNextParam(in))
+ return false;
+
+ String key = DataInputStream.readUTF(inputPlus);
+ ParameterType parameterType = ParameterType.byName.get(key);
+ byte[] value = new byte[in.readInt()];
+ in.readBytes(value);
+ try (DataInputBuffer buffer = new DataInputBuffer(value))
+ {
+ parameters.put(parameterType, parameterType.serializer.deserialize(buffer, messagingVersion));
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Determine if we can read the next parameter from the {@link ByteBuf}. This method will *always* set the {@code in}
+ * readIndex back to where it was when this method was invoked.
+ * <p>
+ * NOTE: this function would be sooo much simpler if we included a parameters length int in the messaging format,
+ * instead of checking the remaining readable bytes for each field as we're parsing it. c'est la vie ...
+ */
+ @VisibleForTesting
+ boolean canReadNextParam(ByteBuf in)
+ {
+ in.markReaderIndex();
+ // capture the readableBytes value here to avoid all the virtual function calls.
+ // subtract 6 as we know we'll be reading a short and an int (for the utf and value lengths).
+ final int minimumBytesRequired = 6;
+ int readableBytes = in.readableBytes() - minimumBytesRequired;
+ if (readableBytes < 0)
+ return false;
+
+ // this is a tad invasive, but since we know the UTF string is prefaced with a 2-byte length,
+ // read that to make sure we have enough bytes to read the string itself.
+ short strLen = in.readShort();
+ // check if we can read that many bytes for the UTF
+ if (strLen > readableBytes)
+ {
+ in.resetReaderIndex();
+ return false;
+ }
+ in.skipBytes(strLen);
+ readableBytes -= strLen;
+
+ // check if we can read the value length
+ if (readableBytes < PARAMETERS_VALUE_SIZE_LENGTH)
+ {
+ in.resetReaderIndex();
+ return false;
+ }
+ int valueLength = in.readInt();
+ // check if we read that many bytes for the value
+ if (valueLength > readableBytes)
+ {
+ in.resetReaderIndex();
+ return false;
+ }
+
+ in.resetReaderIndex();
+ return true;
+ }
+
+ public void process(RebufferingByteBufDataInputPlus in) throws IOException
+ {
+ while (in.isOpen() && !in.isEmpty())
+ {
+ messageHeader = readPrefix(in);
+ messageHeader.from = CompactEndpointSerializationHelper.instance.deserialize(in, messagingVersion);
+ messageHeader.verb = MessagingService.Verb.fromId(in.readInt());
+ messageHeader.parameterLength = in.readInt();
+ messageHeader.parameters = messageHeader.parameterLength == 0 ? Collections.emptyMap() : new EnumMap<>(ParameterType.class);
+ if (messageHeader.parameterLength > 0)
+ readParameters(in, messageHeader.parameterLength, messageHeader.parameters);
+
+ messageHeader.payloadSize = in.readInt();
+ MessageIn<Object> messageIn = MessageIn.read(in, messagingVersion,
+ messageHeader.messageId, messageHeader.constructionTime, messageHeader.from,
+ messageHeader.payloadSize, messageHeader.verb, messageHeader.parameters);
+ if (messageIn != null)
+ messageConsumer.accept(messageIn, messageHeader.messageId);
+ }
+ }
+
+ private void readParameters(RebufferingByteBufDataInputPlus in, int parameterCount, Map<ParameterType, Object> parameters) throws IOException
+ {
+ // makes the assumption that map.size() is a constant time function (HashMap.size() is)
+ while (parameters.size() < parameterCount)
+ {
+ String key = DataInputStream.readUTF(in);
+ ParameterType parameterType = ParameterType.byName.get(key);
+ int valueLength = in.readInt();
--- End diff --
Unused variable.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r216098891
--- Diff: src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java ---
@@ -196,10 +220,12 @@ protected void doFlush(int count) throws IOException
int byteCount = buffer.position();
currentBuf.writerIndex(byteCount);
- if (!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, byteCount, 2, TimeUnit.MINUTES))
+ if (!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, byteCount, rateLimiterBlockTime, rateLimiterBlockTimeUnit))
--- End diff --
This is going up from 2 minutes to potentially 5 minutes. I think this should be ok but just making sure.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r216086521
--- Diff: src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java ---
@@ -36,6 +37,11 @@
public class RebufferingByteBufDataInputPlus extends RebufferingInputStream implements ReadableByteChannel
{
+ /**
+ * Default to a very large value.
+ */
+ private static final long DEFAULT_REBUFFER_BLOCK_IN_MILLIS = TimeUnit.DAYS.toMillis(2);
--- End diff --
I think it doesn't make sense to wait for more than 5 minutes to rebuffer. Given buffer sizes are on the order of a few megabytes at most, 5 minutes is an eternity. It's best to keep it short so failures are exposed sooner than later and rather than threads being stuck waiting on a call that doesn't timeout in a reasonable amount of time.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r215944830
--- Diff: src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java ---
@@ -183,6 +195,11 @@ public int available() throws EOFException
return availableBytes;
}
+ public boolean isEmpty() throws EOFException
+ {
+ return available() == 0;
--- End diff --
Nice find. I added that side effect to
[StreamingInboundHandler to help deserialization](https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java#L167). Clearly, I could move that into a separate method, and have `StreamingInboundHandler` invoke that after the `available()` call. wdyt?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r216157097
--- Diff: src/java/org/apache/cassandra/net/MessageOut.java ---
@@ -180,6 +199,73 @@ public String toString()
return sbuf.toString();
}
+ /**
+ * The main entry point for sending an internode message to a peer node in the cluster.
+ */
+ public void serialize(DataOutputPlus out, int messagingVersion, OutboundConnectionIdentifier destinationId, int id, long timestampNanos) throws IOException
+ {
+ captureTracingInfo(destinationId);
+
+ out.writeInt(MessagingService.PROTOCOL_MAGIC);
+ out.writeInt(id);
+
+ // int cast cuts off the high-order half of the timestamp, which we can assume remains
+ // the same between now and when the recipient reconstructs it.
+ out.writeInt((int) NanoTimeToCurrentTimeMillis.convert(timestampNanos));
+ serialize(out, messagingVersion);
+ }
+
+ /**
+ * Record any tracing data, if enabled on this message.
+ */
+ @VisibleForTesting
+ void captureTracingInfo(OutboundConnectionIdentifier destinationId)
+ {
+ try
+ {
+ UUID sessionId = (UUID)getParameter(ParameterType.TRACE_SESSION);
+ if (sessionId != null)
+ {
+ TraceState state = Tracing.instance.get(sessionId);
+ String logMessage = String.format("Sending %s message to %s", verb, destinationId.connectionAddress());
+ // session may have already finished; see CASSANDRA-5668
+ if (state == null)
+ {
+ Tracing.TraceType traceType = (Tracing.TraceType)getParameter(ParameterType.TRACE_TYPE);
+ traceType = traceType == null ? Tracing.TraceType.QUERY : traceType;
+ Tracing.instance.trace(ByteBuffer.wrap(UUIDGen.decompose(sessionId)), logMessage, traceType.getTTL());
+ }
+ else
+ {
+ state.trace(logMessage);
+ if (verb == MessagingService.Verb.REQUEST_RESPONSE)
+ Tracing.instance.doneWithNonLocalSession(state);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ logger.warn("failed to capture the tracing info for an outbound message to {}, ignoring", destinationId, e);
+ }
+ }
+
+ private Object getParameter(ParameterType type)
+ {
+ for (int ii = 0; ii < parameters.size(); ii += PARAMETER_TUPLE_SIZE)
+ {
+ if (((ParameterType)parameters.get(ii + PARAMETER_TUPLE_TYPE_OFFSET)).equals(type))
--- End diff --
done
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra issue #253: CASSANDRA-13630
Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on the issue:
https://github.com/apache/cassandra/pull/253
Wait for CASSANDRA-14503 :)
On Sun, Sep 9, 2018 at 18:37 Dinesh <no...@github.com> wrote:
> *@dineshjoshi* commented on this pull request.
> ------------------------------
>
> In test/unit/org/apache/cassandra/net/async/ChannelWriterTest.java
> <https://github.com/apache/cassandra/pull/253#discussion_r216180830>:
>
> > channel = new EmbeddedChannel();
> omc = new NonSendingOutboundMessagingConnection(id, null, Optional.empty());
> - channelWriter = ChannelWriter.create(channel, omc::handleMessageResult, Optional.empty());
> - channel.pipeline().addFirst(new MessageOutHandler(id, MessagingService.current_version, channelWriter, () -> null));
> - coalescingStrategy = CoalescingStrategies.newCoalescingStrategy(CoalescingStrategies.Strategy.FIXED.name(), COALESCE_WINDOW_MS, null, "test");
> + wrapper = new MessageOutWrapper();
> + OutboundConnectionParams.Builder builder = OutboundConnectionParams.builder()
> + .messageResultConsumer(this::handleMessageResult)
> + .coalescingStrategy(Optional.empty())
> + .protocolVersion(MessagingService.current_version)
> + .connectionId(id);
> +
> + if (type == ChannelWriterType.COALESCING)
> + coalescingStrategy = CoalescingStrategies.newCoalescingStrategy(CoalescingStrategies.Strategy.FIXED.name(), COALESCE_WINDOW_MS, null, "test");
> + else
> + coalescingStrategy = Optional.empty();
>
> This is beyond the scope of this PR, however, we should refactor
> OutboundConnectionParams to not store an Optional.
>
> —
> You are receiving this because you authored the thread.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/cassandra/pull/253#pullrequestreview-153604787>,
> or mute the thread
> <https://github.com/notifications/unsubscribe-auth/ABVYQazZAIsQIg5RPNy6jGI5KELiH5Cdks5uZcJYgaJpZM4WGkHi>
> .
>
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r215739220
--- Diff: src/java/org/apache/cassandra/net/MessageIn.java ---
@@ -231,4 +241,437 @@ public String toString()
sbuf.append("FROM:").append(from).append(" TYPE:").append(getMessageType()).append(" VERB:").append(verb);
return sbuf.toString();
}
+
+ public static MessageInProcessor getProcessor(InetAddressAndPort peer, int messagingVersion)
+ {
+ return getProcessor(peer, messagingVersion, MessageInProcessor.MESSAGING_SERVICE_CONSUMER);
+
+ }
+
+ public static MessageInProcessor getProcessor(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
+ {
+ return messagingVersion >= MessagingService.VERSION_40
+ ? new MessageInProcessorAsOf40(peer, messagingVersion, messageConsumer)
+ : new MessageInProcessorPre40(peer, messagingVersion, messageConsumer);
+
+ }
+
+ /**
+ * Implementations contain the mechanics and logic of parsing incoming messages. Allows for both non-blocking
+ * and blocking styles of interaction via the {@link #process(ByteBuf)} and {@link #process(RebufferingByteBufDataInputPlus)}
+ * methods, respectively.
+ *
+ * Does not contain the actual deserialization code for message fields nor payload. That is left to the
+ * {@link MessageIn#read(DataInputPlus, int, int)} family of methods.
+ */
+ public static abstract class MessageInProcessor
+ {
+ /**
+ * The current state of deserializing an incoming message. This enum is only used in the nonblocking versions.
+ */
+ public enum State
+ {
+ READ_PREFIX,
+ READ_IP_ADDRESS,
+ READ_VERB,
+ READ_PARAMETERS_SIZE,
+ READ_PARAMETERS_DATA,
+ READ_PAYLOAD_SIZE,
+ READ_PAYLOAD
+ }
+
+ static final int VERB_LENGTH = Integer.BYTES;
+
+ /**
+ * The default target for consuming deserialized {@link MessageIn}.
+ */
+ private static final BiConsumer<MessageIn, Integer> MESSAGING_SERVICE_CONSUMER = (messageIn, id) -> MessagingService.instance().receive(messageIn, id);
+
+ final InetAddressAndPort peer;
+ final int messagingVersion;
+
+ /**
+ * Abstracts out depending directly on {@link MessagingService#receive(MessageIn, int)}; this makes tests more sane
+ * as they don't require nor trigger the entire message processing circus.
+ */
+ final BiConsumer<MessageIn, Integer> messageConsumer;
+
+ /**
+ * Captures the current {@link State} of processing a message. Primarily useful in the non-blocking use case.
+ */
+ State state = State.READ_PREFIX;
+
+ /**
+ * Captures the current data we've parsed out of in incoming message. Primarily useful in the non-blocking use case.
+ */
+ MessageHeader messageHeader;
+
+ /**
+ * Process the buffer in a non-blocking manner. Will try to read out as much of a message(s) as possible,
+ * and send any fully deserialized messages to {@link #messageConsumer}.
+ */
+ public abstract void process(ByteBuf in) throws IOException;
+
+ /**
+ * Process the buffer in a blocking manner. Will read as many messages as possible, blocking for more data,
+ * and send any fully deserialized messages to {@link #messageConsumer}.
+ */
+ public abstract void process(RebufferingByteBufDataInputPlus in) throws IOException;
+
+ MessageInProcessor(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
+ {
+ this.peer = peer;
+ this.messagingVersion = messagingVersion;
+ this.messageConsumer = messageConsumer;
+ }
+
+ /**
+ * Only applicable in the non-blocking use case, and should ony be used for testing!!!
+ */
+ @VisibleForTesting
+ public MessageHeader getMessageHeader()
+ {
+ return messageHeader;
+ }
+
+ /**
+ * A simple struct to hold the message header data as it is being built up.
+ */
+ public static class MessageHeader
+ {
+ public int messageId;
+ long constructionTime;
+ public InetAddressAndPort from;
+ public MessagingService.Verb verb;
+ int payloadSize;
+
+ Map<ParameterType, Object> parameters = Collections.emptyMap();
+
+ /**
+ * Length of the parameter data. If the message's version is {@link MessagingService#VERSION_40} or higher,
+ * this value is the total number of header bytes; else, for legacy messaging, this is the number of
+ * key/value entries in the header.
+ */
+ int parameterLength;
+ }
+
+ MessageHeader readPrefix(DataInputPlus in) throws IOException
+ {
+ MessagingService.validateMagic(in.readInt());
+ MessageHeader messageHeader = new MessageHeader();
+ messageHeader.messageId = in.readInt();
+ int messageTimestamp = in.readInt(); // make sure to read the sent timestamp, even if DatabaseDescriptor.hasCrossNodeTimeout() is not enabled
+ messageHeader.constructionTime = MessageIn.deriveConstructionTime(peer, messageTimestamp, ApproximateTime.currentTimeMillis());
+
+ return messageHeader;
+ }
+ }
+
+ /**
+ * Reads the incoming stream of bytes in the 4.0 format.
+ */
+ static class MessageInProcessorAsOf40 extends MessageInProcessor
+ {
+ MessageInProcessorAsOf40(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
+ {
+ super(peer, messagingVersion, messageConsumer);
+ assert messagingVersion >= MessagingService.VERSION_40;
+ }
+
+ @SuppressWarnings("resource")
+ public void process(ByteBuf in) throws IOException
--- End diff --
This method is a bit complicated to understand. Could you refactor it? For example`RedisDecoder` provides a nice example of this - https://github.com/netty/netty/blob/4.1/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisDecoder.java
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r215733539
--- Diff: src/java/org/apache/cassandra/net/async/MessageOutHandler.java ---
@@ -140,8 +125,17 @@ public void write(ChannelHandlerContext ctx, Object o, ChannelPromise promise)
out = ctx.alloc().ioBuffer((int)currentFrameSize);
- captureTracingInfo(msg);
- serializeMessage(msg, out);
+ @SuppressWarnings("resource")
+ ByteBufDataOutputPlus outputPlus = new ByteBufDataOutputPlus(out);
+ msg.message.serialize(outputPlus, targetMessagingVersion, connectionId, msg.id, msg.timestampNanos);
+
+ // next few lines are for debugging ... massively helpful!!
+ // if we allocated too much buffer for this message, we'll log here.
+ // if we allocated to little buffer space, we would have hit an exception when trying to write more bytes to it
+ if (out.isWritable())
+ errorLogger.error("{} reported message size {}, actual message size {}, msg {}",
--- End diff --
How likely are we to cause log spam?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org