You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by jasobrown <gi...@git.apache.org> on 2018/08/21 20:46:40 UTC

[GitHub] cassandra pull request #253: 13630

GitHub user jasobrown opened a pull request:

    https://github.com/apache/cassandra/pull/253

    13630

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jasobrown/cassandra 13630

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/cassandra/pull/253.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #253
    
----
commit 2f5d03cc9d8373bab723010f7a280ba21658a6da
Author: Jason Brown <ja...@...>
Date:   2018-07-28T14:09:01Z

    Support sanely sending large messages with internode messaging via netty

commit 34eb6afbdfeb9c75685316d22152dde0fd460820
Author: Jason Brown <ja...@...>
Date:   2018-08-20T11:54:13Z

    clean up error handling logic in MIH; allow custom timeouts for blocking on the queue in RebufferingByteBufDataInputPlus; fix MIHTest

commit ed9f3093005483e2a1d0b54e6f60c67070a51962
Author: Jason Brown <ja...@...>
Date:   2018-08-20T12:05:30Z

    add RebufferingByteBufDataInputPlus timeout test

commit f61ffaa22d87a68ed9a2bed5329240209053cec6
Author: Jason Brown <ja...@...>
Date:   2018-08-20T20:38:21Z

    fix ChannelWriterTest to wait for asynchronous blokcing implemenations

commit 3bdec75b5c84f250e4384092f9c3435eca389d3e
Author: Jason Brown <ja...@...>
Date:   2018-08-20T20:39:21Z

    point to jasobrown dtest repo

commit 8296bf9c18924659f694418730f17428cf89d5b3
Author: Jason Brown <ja...@...>
Date:   2018-08-21T17:45:04Z

    trivial lambda change

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #253: 13630

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r216103837
  
    --- Diff: src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java ---
    @@ -393,6 +393,18 @@ private Channel getOrCreateChannel()
                 }
             }
     
    +        private void onError(Throwable t)
    +        {
    +            try
    +            {
    +                session.onError(t).get(5, TimeUnit.MINUTES);
    --- End diff --
    
    done


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #253: 13630

Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r216103691
  
    --- Diff: src/java/org/apache/cassandra/net/async/MessageInHandler.java ---
    @@ -18,143 +18,296 @@
     
     package org.apache.cassandra.net.async;
     
    -import java.io.DataInputStream;
    +import java.io.EOFException;
     import java.io.IOException;
    -import java.util.Collections;
    -import java.util.EnumMap;
    -import java.util.List;
    -import java.util.Map;
    -import java.util.function.BiConsumer;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.RejectedExecutionHandler;
    +import java.util.concurrent.ThreadPoolExecutor;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
     
    -import com.google.common.primitives.Ints;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
     import io.netty.buffer.ByteBuf;
     import io.netty.channel.ChannelHandlerContext;
    -import org.apache.cassandra.io.util.DataInputBuffer;
    +import io.netty.channel.ChannelInboundHandlerAdapter;
    +import io.netty.handler.codec.ByteToMessageDecoder;
    +import io.netty.util.ReferenceCountUtil;
    +import org.apache.cassandra.concurrent.NamedThreadFactory;
    +import org.apache.cassandra.exceptions.UnknownTableException;
     import org.apache.cassandra.locator.InetAddressAndPort;
    -import org.apache.cassandra.net.MessageIn;
    -import org.apache.cassandra.net.MessagingService;
    -import org.apache.cassandra.net.ParameterType;
    -import org.apache.cassandra.utils.vint.VIntCoding;
    +import org.apache.cassandra.net.MessageIn.MessageInProcessor;
     
     /**
      * Parses incoming messages as per the 4.0 internode messaging protocol.
      */
    -public class MessageInHandler extends BaseMessageInHandler
    +public class MessageInHandler extends ChannelInboundHandlerAdapter
     {
         public static final Logger logger = LoggerFactory.getLogger(MessageInHandler.class);
     
    -    private MessageHeader messageHeader;
    +    private final InetAddressAndPort peer;
     
    -    MessageInHandler(InetAddressAndPort peer, int messagingVersion)
    +    private final BufferHandler bufferHandler;
    +    private volatile boolean closed;
    +
    +    public MessageInHandler(InetAddressAndPort peer, MessageInProcessor messageProcessor, boolean handlesLargeMessages)
    +    {
    +        this.peer = peer;
    +
    +        bufferHandler = handlesLargeMessages
    +                        ? new BlockingBufferHandler(messageProcessor)
    +                        : new NonblockingBufferHandler(messageProcessor);
    +    }
    +
    +    public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException
    +    {
    +        if (!closed)
    +        {
    +            bufferHandler.channelRead(ctx, (ByteBuf) msg);
    +        }
    +        else
    +        {
    +            ReferenceCountUtil.release(msg);
    +            ctx.close();
    +        }
    +    }
    +
    +    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
    +    {
    +        if (cause instanceof EOFException)
    +            logger.trace("eof reading from socket; closing", cause);
    +        else if (cause instanceof UnknownTableException)
    +            logger.warn("Got message from unknown table while reading from socket; closing", cause);
    +        else if (cause instanceof IOException)
    +            logger.trace("IOException reading from socket; closing", cause);
    +        else
    +            logger.warn("Unexpected exception caught in inbound channel pipeline from " + ctx.channel().remoteAddress(), cause);
    +
    +        close();
    +        ctx.close();
    +    }
    +
    +    public void channelInactive(ChannelHandlerContext ctx)
    +    {
    +        logger.trace("received channel closed message for peer {} on local addr {}", ctx.channel().remoteAddress(), ctx.channel().localAddress());
    +        close();
    +        ctx.fireChannelInactive();
    +    }
    +
    +    void close()
         {
    -        this (peer, messagingVersion, MESSAGING_SERVICE_CONSUMER);
    +        closed = true;
    +        bufferHandler.close();
         }
     
    -    public MessageInHandler(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
    +    boolean isClosed()
         {
    -        super(peer, messagingVersion, messageConsumer);
    +        return closed;
    +    }
     
    -        assert messagingVersion >= MessagingService.VERSION_40 : String.format("wrong messaging version for this handler: got %d, but expect %d or higher",
    -                                                                              messagingVersion, MessagingService.VERSION_40);
    -        state = State.READ_FIRST_CHUNK;
    +    /**
    +     * An abstraction around how incoming buffers are handled: either in a non-blocking manner ({@link NonblockingBufferHandler})
    +     * or in a blocking manner ({@link BlockingBufferHandler}).
    +     *
    +     * The methods declared here will only be invoked on the netty event loop.
    +     */
    +    interface BufferHandler
    +    {
    +        void channelRead(ChannelHandlerContext ctx, ByteBuf in) throws IOException;
    +
    +        void close();
         }
     
         /**
    -     * For each new message coming in, builds up a {@link MessageHeader} instance incrementally. This method
    -     * attempts to deserialize as much header information as it can out of the incoming {@link ByteBuf}, and
    -     * maintains a trivial state machine to remember progress across invocations.
    +     * Processes incoming buffers on the netty event loop, in a non-blocking manner. If buffers are not completely consumed,
    +     * it is stashed in {@link #retainedInlineBuffer}, and the next incoming buffer is combined with it.
          */
    -    @SuppressWarnings("resource")
    -    public void handleDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception
    +    class NonblockingBufferHandler implements BufferHandler
    --- End diff --
    
    This can be made into a static inner class.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #253: 13630

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r215936173
  
    --- Diff: src/java/org/apache/cassandra/net/async/MessageOutHandler.java ---
    @@ -140,8 +125,17 @@ public void write(ChannelHandlerContext ctx, Object o, ChannelPromise promise)
     
                 out = ctx.alloc().ioBuffer((int)currentFrameSize);
     
    -            captureTracingInfo(msg);
    -            serializeMessage(msg, out);
    +            @SuppressWarnings("resource")
    +            ByteBufDataOutputPlus outputPlus = new ByteBufDataOutputPlus(out);
    +            msg.message.serialize(outputPlus, targetMessagingVersion, connectionId, msg.id, msg.timestampNanos);
    +
    +            // next few lines are for debugging ... massively helpful!!
    +            // if we allocated too much buffer for this message, we'll log here.
    +            // if we allocated to little buffer space, we would have hit an exception when trying to write more bytes to it
    +            if (out.isWritable())
    +                errorLogger.error("{} reported message size {}, actual message size {}, msg {}",
    --- End diff --
    
    The `errorLogger` is an instance of `NoSpamLogger`, so we can adjust the log intervals. I'm not sure if making it an assert is better (as that would throw and Exception, and the connection would get killed). As it's mostly a developer-level logging thing, I could just remove it, as well.  wdyt?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #253: 13630

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r216069307
  
    --- Diff: src/java/org/apache/cassandra/net/MessageIn.java ---
    @@ -231,4 +241,437 @@ public String toString()
             sbuf.append("FROM:").append(from).append(" TYPE:").append(getMessageType()).append(" VERB:").append(verb);
             return sbuf.toString();
         }
    +
    +    public static MessageInProcessor getProcessor(InetAddressAndPort peer, int messagingVersion)
    +    {
    +        return getProcessor(peer, messagingVersion, MessageInProcessor.MESSAGING_SERVICE_CONSUMER);
    +
    +    }
    +
    +    public static MessageInProcessor getProcessor(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
    +    {
    +        return messagingVersion >= MessagingService.VERSION_40
    +               ? new MessageInProcessorAsOf40(peer, messagingVersion, messageConsumer)
    +               : new MessageInProcessorPre40(peer, messagingVersion, messageConsumer);
    +
    +    }
    +
    +    /**
    +     * Implementations contain the mechanics and logic of parsing incoming messages. Allows for both non-blocking
    +     * and blocking styles of interaction via the {@link #process(ByteBuf)} and {@link #process(RebufferingByteBufDataInputPlus)}
    +     * methods, respectively.
    +     *
    +     * Does not contain the actual deserialization code for message fields nor payload. That is left to the
    +     * {@link MessageIn#read(DataInputPlus, int, int)} family of methods.
    +     */
    +    public static abstract class MessageInProcessor
    +    {
    +        /**
    +         * The current state of deserializing an incoming message. This enum is only used in the nonblocking versions.
    +         */
    +        public enum State
    +        {
    +            READ_PREFIX,
    +            READ_IP_ADDRESS,
    +            READ_VERB,
    +            READ_PARAMETERS_SIZE,
    +            READ_PARAMETERS_DATA,
    +            READ_PAYLOAD_SIZE,
    +            READ_PAYLOAD
    +        }
    +
    +        static final int VERB_LENGTH = Integer.BYTES;
    +
    +        /**
    +         * The default target for consuming deserialized {@link MessageIn}.
    +         */
    +        private static final BiConsumer<MessageIn, Integer> MESSAGING_SERVICE_CONSUMER = (messageIn, id) -> MessagingService.instance().receive(messageIn, id);
    +
    +        final InetAddressAndPort peer;
    +        final int messagingVersion;
    +
    +        /**
    +         * Abstracts out depending directly on {@link MessagingService#receive(MessageIn, int)}; this makes tests more sane
    +         * as they don't require nor trigger the entire message processing circus.
    +         */
    +        final BiConsumer<MessageIn, Integer> messageConsumer;
    +
    +        /**
    +         * Captures the current {@link State} of processing a message. Primarily useful in the non-blocking use case.
    +         */
    +        State state = State.READ_PREFIX;
    +
    +        /**
    +         * Captures the current data we've parsed out of in incoming message. Primarily useful in the non-blocking use case.
    +         */
    +        MessageHeader messageHeader;
    +
    +        /**
    +         * Process the buffer in a non-blocking manner. Will try to read out as much of a message(s) as possible,
    +         * and send any fully deserialized messages to {@link #messageConsumer}.
    +         */
    +        public abstract void process(ByteBuf in) throws IOException;
    +
    +        /**
    +         * Process the buffer in a blocking manner. Will read as many messages as possible, blocking for more data,
    +         * and send any fully deserialized messages to {@link #messageConsumer}.
    +         */
    +        public abstract void process(RebufferingByteBufDataInputPlus in) throws IOException;
    +
    +        MessageInProcessor(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
    +        {
    +            this.peer = peer;
    +            this.messagingVersion = messagingVersion;
    +            this.messageConsumer = messageConsumer;
    +        }
    +
    +        /**
    +         * Only applicable in the non-blocking use case, and should ony be used for testing!!!
    +         */
    +        @VisibleForTesting
    +        public MessageHeader getMessageHeader()
    +        {
    +            return messageHeader;
    +        }
    +
    +        /**
    +         * A simple struct to hold the message header data as it is being built up.
    +         */
    +        public static class MessageHeader
    +        {
    +            public int messageId;
    +            long constructionTime;
    +            public InetAddressAndPort from;
    +            public MessagingService.Verb verb;
    +            int payloadSize;
    +
    +            Map<ParameterType, Object> parameters = Collections.emptyMap();
    +
    +            /**
    +             * Length of the parameter data. If the message's version is {@link MessagingService#VERSION_40} or higher,
    +             * this value is the total number of header bytes; else, for legacy messaging, this is the number of
    +             * key/value entries in the header.
    +             */
    +            int parameterLength;
    +        }
    +
    +        MessageHeader readPrefix(DataInputPlus in) throws IOException
    +        {
    +            MessagingService.validateMagic(in.readInt());
    +            MessageHeader messageHeader = new MessageHeader();
    +            messageHeader.messageId = in.readInt();
    +            int messageTimestamp = in.readInt(); // make sure to read the sent timestamp, even if DatabaseDescriptor.hasCrossNodeTimeout() is not enabled
    +            messageHeader.constructionTime = MessageIn.deriveConstructionTime(peer, messageTimestamp, ApproximateTime.currentTimeMillis());
    +
    +            return messageHeader;
    +        }
    +    }
    +
    +    /**
    +     * Reads the incoming stream of bytes in the 4.0 format.
    +     */
    +    static class MessageInProcessorAsOf40 extends MessageInProcessor
    +    {
    +        MessageInProcessorAsOf40(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
    +        {
    +            super(peer, messagingVersion, messageConsumer);
    +            assert messagingVersion >= MessagingService.VERSION_40;
    +        }
    +
    +        @SuppressWarnings("resource")
    +        public void process(ByteBuf in) throws IOException
    --- End diff --
    
    done. I also pulled the main loop logic into the base class.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #253: 13630

Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r216106796
  
    --- Diff: src/java/org/apache/cassandra/net/MessageOut.java ---
    @@ -180,6 +199,73 @@ public String toString()
             return sbuf.toString();
         }
     
    +    /**
    +     * The main entry point for sending an internode message to a peer node in the cluster.
    +     */
    +    public void serialize(DataOutputPlus out, int messagingVersion, OutboundConnectionIdentifier destinationId, int id, long timestampNanos) throws IOException
    +    {
    +        captureTracingInfo(destinationId);
    +
    +        out.writeInt(MessagingService.PROTOCOL_MAGIC);
    +        out.writeInt(id);
    +
    +        // int cast cuts off the high-order half of the timestamp, which we can assume remains
    +        // the same between now and when the recipient reconstructs it.
    +        out.writeInt((int) NanoTimeToCurrentTimeMillis.convert(timestampNanos));
    +        serialize(out, messagingVersion);
    +    }
    +
    +    /**
    +     * Record any tracing data, if enabled on this message.
    +     */
    +    @VisibleForTesting
    +    void captureTracingInfo(OutboundConnectionIdentifier destinationId)
    +    {
    +        try
    +        {
    +            UUID sessionId =  (UUID)getParameter(ParameterType.TRACE_SESSION);
    +            if (sessionId != null)
    --- End diff --
    
    I assume `sessionId != null` means that tracing is enabled? Otherwise we should explicitly check whether tracing is enabled.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #253: 13630

Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r215734926
  
    --- Diff: src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java ---
    @@ -183,6 +195,11 @@ public int available() throws EOFException
             return availableBytes;
         }
     
    +    public boolean isEmpty() throws EOFException
    +    {
    +        return available() == 0;
    --- End diff --
    
    The method `available()` has a side effect of `channelConfig.setAutoRead(true)` when the `availableBytes` falls below the `lowWatermark`. Are you sure that is ok?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #253: 13630

Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r216088059
  
    --- Diff: src/java/org/apache/cassandra/net/MessageIn.java ---
    @@ -231,4 +241,437 @@ public String toString()
             sbuf.append("FROM:").append(from).append(" TYPE:").append(getMessageType()).append(" VERB:").append(verb);
             return sbuf.toString();
         }
    +
    +    public static MessageInProcessor getProcessor(InetAddressAndPort peer, int messagingVersion)
    +    {
    +        return getProcessor(peer, messagingVersion, MessageInProcessor.MESSAGING_SERVICE_CONSUMER);
    +
    +    }
    +
    +    public static MessageInProcessor getProcessor(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
    +    {
    +        return messagingVersion >= MessagingService.VERSION_40
    +               ? new MessageInProcessorAsOf40(peer, messagingVersion, messageConsumer)
    +               : new MessageInProcessorPre40(peer, messagingVersion, messageConsumer);
    +
    +    }
    +
    +    /**
    +     * Implementations contain the mechanics and logic of parsing incoming messages. Allows for both non-blocking
    +     * and blocking styles of interaction via the {@link #process(ByteBuf)} and {@link #process(RebufferingByteBufDataInputPlus)}
    +     * methods, respectively.
    +     *
    +     * Does not contain the actual deserialization code for message fields nor payload. That is left to the
    +     * {@link MessageIn#read(DataInputPlus, int, int)} family of methods.
    +     */
    +    public static abstract class MessageInProcessor
    +    {
    +        /**
    +         * The current state of deserializing an incoming message. This enum is only used in the nonblocking versions.
    +         */
    +        public enum State
    +        {
    +            READ_PREFIX,
    +            READ_IP_ADDRESS,
    +            READ_VERB,
    +            READ_PARAMETERS_SIZE,
    +            READ_PARAMETERS_DATA,
    +            READ_PAYLOAD_SIZE,
    +            READ_PAYLOAD
    +        }
    +
    +        static final int VERB_LENGTH = Integer.BYTES;
    +
    +        /**
    +         * The default target for consuming deserialized {@link MessageIn}.
    +         */
    +        private static final BiConsumer<MessageIn, Integer> MESSAGING_SERVICE_CONSUMER = (messageIn, id) -> MessagingService.instance().receive(messageIn, id);
    +
    +        final InetAddressAndPort peer;
    +        final int messagingVersion;
    +
    +        /**
    +         * Abstracts out depending directly on {@link MessagingService#receive(MessageIn, int)}; this makes tests more sane
    +         * as they don't require nor trigger the entire message processing circus.
    +         */
    +        final BiConsumer<MessageIn, Integer> messageConsumer;
    +
    +        /**
    +         * Captures the current {@link State} of processing a message. Primarily useful in the non-blocking use case.
    +         */
    +        State state = State.READ_PREFIX;
    +
    +        /**
    +         * Captures the current data we've parsed out of in incoming message. Primarily useful in the non-blocking use case.
    +         */
    +        MessageHeader messageHeader;
    +
    +        /**
    +         * Process the buffer in a non-blocking manner. Will try to read out as much of a message(s) as possible,
    +         * and send any fully deserialized messages to {@link #messageConsumer}.
    +         */
    +        public abstract void process(ByteBuf in) throws IOException;
    +
    +        /**
    +         * Process the buffer in a blocking manner. Will read as many messages as possible, blocking for more data,
    +         * and send any fully deserialized messages to {@link #messageConsumer}.
    +         */
    +        public abstract void process(RebufferingByteBufDataInputPlus in) throws IOException;
    +
    +        MessageInProcessor(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
    +        {
    +            this.peer = peer;
    +            this.messagingVersion = messagingVersion;
    +            this.messageConsumer = messageConsumer;
    +        }
    +
    +        /**
    +         * Only applicable in the non-blocking use case, and should ony be used for testing!!!
    +         */
    +        @VisibleForTesting
    +        public MessageHeader getMessageHeader()
    +        {
    +            return messageHeader;
    +        }
    +
    +        /**
    +         * A simple struct to hold the message header data as it is being built up.
    +         */
    +        public static class MessageHeader
    +        {
    +            public int messageId;
    +            long constructionTime;
    +            public InetAddressAndPort from;
    +            public MessagingService.Verb verb;
    +            int payloadSize;
    +
    +            Map<ParameterType, Object> parameters = Collections.emptyMap();
    +
    +            /**
    +             * Length of the parameter data. If the message's version is {@link MessagingService#VERSION_40} or higher,
    +             * this value is the total number of header bytes; else, for legacy messaging, this is the number of
    +             * key/value entries in the header.
    +             */
    +            int parameterLength;
    +        }
    +
    +        MessageHeader readPrefix(DataInputPlus in) throws IOException
    +        {
    +            MessagingService.validateMagic(in.readInt());
    +            MessageHeader messageHeader = new MessageHeader();
    +            messageHeader.messageId = in.readInt();
    +            int messageTimestamp = in.readInt(); // make sure to read the sent timestamp, even if DatabaseDescriptor.hasCrossNodeTimeout() is not enabled
    +            messageHeader.constructionTime = MessageIn.deriveConstructionTime(peer, messageTimestamp, ApproximateTime.currentTimeMillis());
    +
    +            return messageHeader;
    +        }
    +    }
    +
    +    /**
    +     * Reads the incoming stream of bytes in the 4.0 format.
    +     */
    +    static class MessageInProcessorAsOf40 extends MessageInProcessor
    +    {
    +        MessageInProcessorAsOf40(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
    +        {
    +            super(peer, messagingVersion, messageConsumer);
    +            assert messagingVersion >= MessagingService.VERSION_40;
    +        }
    +
    +        @SuppressWarnings("resource")
    +        public void process(ByteBuf in) throws IOException
    +        {
    +            ByteBufDataInputPlus inputPlus = new ByteBufDataInputPlus(in);
    +            while (true)
    +            {
    +                switch (state)
    +                {
    +                    case READ_PREFIX:
    +                        if (in.readableBytes() < MessageOut.MESSAGE_PREFIX_SIZE)
    +                            return;
    +                        MessageHeader header = readPrefix(inputPlus);
    +                        if (header == null)
    +                            return;
    +                        header.from = peer;
    +                        messageHeader = header;
    +                        state = State.READ_VERB;
    +                        // fall-through
    +                    case READ_VERB:
    +                        if (in.readableBytes() < VERB_LENGTH)
    +                            return;
    +                        messageHeader.verb = MessagingService.Verb.fromId(in.readInt());
    +                        state = State.READ_PARAMETERS_SIZE;
    +                        // fall-through
    +                    case READ_PARAMETERS_SIZE:
    +                        long length = VIntCoding.readUnsignedVInt(in);
    +                        if (length < 0)
    +                            return;
    +                        messageHeader.parameterLength = Ints.checkedCast(length);
    +                        messageHeader.parameters = messageHeader.parameterLength == 0 ? Collections.emptyMap() : new EnumMap<>(ParameterType.class);
    +                        state = State.READ_PARAMETERS_DATA;
    +                        // fall-through
    +                    case READ_PARAMETERS_DATA:
    +                        if (messageHeader.parameterLength > 0)
    +                        {
    +                            if (in.readableBytes() < messageHeader.parameterLength)
    +                                return;
    +                            readParameters(inputPlus, messageHeader.parameterLength, messageHeader.parameters);
    +                        }
    +                        state = State.READ_PAYLOAD_SIZE;
    +                        // fall-through
    +                    case READ_PAYLOAD_SIZE:
    +                        length = VIntCoding.readUnsignedVInt(in);
    +                        if (length < 0)
    +                            return;
    +                        messageHeader.payloadSize = (int) length;
    +                        state = State.READ_PAYLOAD;
    +                        // fall-through
    +                    case READ_PAYLOAD:
    +                        if (in.readableBytes() < messageHeader.payloadSize)
    +                            return;
    +
    +                        MessageIn<Object> messageIn = MessageIn.read(inputPlus, messagingVersion,
    +                                                                     messageHeader.messageId, messageHeader.constructionTime, messageHeader.from,
    +                                                                     messageHeader.payloadSize, messageHeader.verb, messageHeader.parameters);
    +
    +                        if (messageIn != null)
    +                            messageConsumer.accept(messageIn, messageHeader.messageId);
    +
    +                        state = State.READ_PREFIX;
    +                        messageHeader = null;
    +                        break;
    +                    default:
    +                        throw new IllegalStateException("unknown/unhandled state: " + state);
    +                }
    +            }
    +        }
    +
    +        private void readParameters(DataInputPlus inputPlus, int parameterLength, Map<ParameterType, Object> parameters) throws IOException
    +        {
    +            TrackedDataInputPlus inputTracker = new TrackedDataInputPlus(inputPlus);
    +
    +            while (inputTracker.getBytesRead() < parameterLength)
    +            {
    +                String key = DataInputStream.readUTF(inputTracker);
    +                ParameterType parameterType = ParameterType.byName.get(key);
    +                long valueLength = VIntCoding.readUnsignedVInt(inputTracker);
    --- End diff --
    
    Unused variable.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #253: 13630

Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r215735492
  
    --- Diff: src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java ---
    @@ -393,6 +393,18 @@ private Channel getOrCreateChannel()
                 }
             }
     
    +        private void onError(Throwable t)
    +        {
    +            try
    +            {
    +                session.onError(t).get(5, TimeUnit.MINUTES);
    --- End diff --
    
    Why do we have a 5 minute timeout? We should pull this out as a constant.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #253: 13630

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r216157093
  
    --- Diff: src/java/org/apache/cassandra/net/MessageIn.java ---
    @@ -231,4 +241,437 @@ public String toString()
             sbuf.append("FROM:").append(from).append(" TYPE:").append(getMessageType()).append(" VERB:").append(verb);
             return sbuf.toString();
         }
    +
    +    public static MessageInProcessor getProcessor(InetAddressAndPort peer, int messagingVersion)
    +    {
    +        return getProcessor(peer, messagingVersion, MessageInProcessor.MESSAGING_SERVICE_CONSUMER);
    +
    +    }
    +
    +    public static MessageInProcessor getProcessor(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
    +    {
    +        return messagingVersion >= MessagingService.VERSION_40
    +               ? new MessageInProcessorAsOf40(peer, messagingVersion, messageConsumer)
    +               : new MessageInProcessorPre40(peer, messagingVersion, messageConsumer);
    +
    +    }
    +
    +    /**
    +     * Implementations contain the mechanics and logic of parsing incoming messages. Allows for both non-blocking
    +     * and blocking styles of interaction via the {@link #process(ByteBuf)} and {@link #process(RebufferingByteBufDataInputPlus)}
    +     * methods, respectively.
    +     *
    +     * Does not contain the actual deserialization code for message fields nor payload. That is left to the
    +     * {@link MessageIn#read(DataInputPlus, int, int)} family of methods.
    +     */
    +    public static abstract class MessageInProcessor
    +    {
    +        /**
    +         * The current state of deserializing an incoming message. This enum is only used in the nonblocking versions.
    +         */
    +        public enum State
    +        {
    +            READ_PREFIX,
    +            READ_IP_ADDRESS,
    +            READ_VERB,
    +            READ_PARAMETERS_SIZE,
    +            READ_PARAMETERS_DATA,
    +            READ_PAYLOAD_SIZE,
    +            READ_PAYLOAD
    +        }
    +
    +        static final int VERB_LENGTH = Integer.BYTES;
    +
    +        /**
    +         * The default target for consuming deserialized {@link MessageIn}.
    +         */
    +        private static final BiConsumer<MessageIn, Integer> MESSAGING_SERVICE_CONSUMER = (messageIn, id) -> MessagingService.instance().receive(messageIn, id);
    +
    +        final InetAddressAndPort peer;
    +        final int messagingVersion;
    +
    +        /**
    +         * Abstracts out depending directly on {@link MessagingService#receive(MessageIn, int)}; this makes tests more sane
    +         * as they don't require nor trigger the entire message processing circus.
    +         */
    +        final BiConsumer<MessageIn, Integer> messageConsumer;
    +
    +        /**
    +         * Captures the current {@link State} of processing a message. Primarily useful in the non-blocking use case.
    +         */
    +        State state = State.READ_PREFIX;
    +
    +        /**
    +         * Captures the current data we've parsed out of in incoming message. Primarily useful in the non-blocking use case.
    +         */
    +        MessageHeader messageHeader;
    +
    +        /**
    +         * Process the buffer in a non-blocking manner. Will try to read out as much of a message(s) as possible,
    +         * and send any fully deserialized messages to {@link #messageConsumer}.
    +         */
    +        public abstract void process(ByteBuf in) throws IOException;
    +
    +        /**
    +         * Process the buffer in a blocking manner. Will read as many messages as possible, blocking for more data,
    +         * and send any fully deserialized messages to {@link #messageConsumer}.
    +         */
    +        public abstract void process(RebufferingByteBufDataInputPlus in) throws IOException;
    +
    +        MessageInProcessor(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
    +        {
    +            this.peer = peer;
    +            this.messagingVersion = messagingVersion;
    +            this.messageConsumer = messageConsumer;
    +        }
    +
    +        /**
    +         * Only applicable in the non-blocking use case, and should ony be used for testing!!!
    +         */
    +        @VisibleForTesting
    +        public MessageHeader getMessageHeader()
    +        {
    +            return messageHeader;
    +        }
    +
    +        /**
    +         * A simple struct to hold the message header data as it is being built up.
    +         */
    +        public static class MessageHeader
    +        {
    +            public int messageId;
    +            long constructionTime;
    +            public InetAddressAndPort from;
    +            public MessagingService.Verb verb;
    +            int payloadSize;
    +
    +            Map<ParameterType, Object> parameters = Collections.emptyMap();
    +
    +            /**
    +             * Length of the parameter data. If the message's version is {@link MessagingService#VERSION_40} or higher,
    +             * this value is the total number of header bytes; else, for legacy messaging, this is the number of
    +             * key/value entries in the header.
    +             */
    +            int parameterLength;
    +        }
    +
    +        MessageHeader readPrefix(DataInputPlus in) throws IOException
    +        {
    +            MessagingService.validateMagic(in.readInt());
    +            MessageHeader messageHeader = new MessageHeader();
    +            messageHeader.messageId = in.readInt();
    +            int messageTimestamp = in.readInt(); // make sure to read the sent timestamp, even if DatabaseDescriptor.hasCrossNodeTimeout() is not enabled
    +            messageHeader.constructionTime = MessageIn.deriveConstructionTime(peer, messageTimestamp, ApproximateTime.currentTimeMillis());
    +
    +            return messageHeader;
    +        }
    +    }
    +
    +    /**
    +     * Reads the incoming stream of bytes in the 4.0 format.
    +     */
    +    static class MessageInProcessorAsOf40 extends MessageInProcessor
    +    {
    +        MessageInProcessorAsOf40(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
    +        {
    +            super(peer, messagingVersion, messageConsumer);
    +            assert messagingVersion >= MessagingService.VERSION_40;
    +        }
    +
    +        @SuppressWarnings("resource")
    +        public void process(ByteBuf in) throws IOException
    +        {
    +            ByteBufDataInputPlus inputPlus = new ByteBufDataInputPlus(in);
    +            while (true)
    +            {
    +                switch (state)
    +                {
    +                    case READ_PREFIX:
    +                        if (in.readableBytes() < MessageOut.MESSAGE_PREFIX_SIZE)
    +                            return;
    +                        MessageHeader header = readPrefix(inputPlus);
    +                        if (header == null)
    +                            return;
    +                        header.from = peer;
    +                        messageHeader = header;
    +                        state = State.READ_VERB;
    +                        // fall-through
    +                    case READ_VERB:
    +                        if (in.readableBytes() < VERB_LENGTH)
    +                            return;
    +                        messageHeader.verb = MessagingService.Verb.fromId(in.readInt());
    +                        state = State.READ_PARAMETERS_SIZE;
    +                        // fall-through
    +                    case READ_PARAMETERS_SIZE:
    +                        long length = VIntCoding.readUnsignedVInt(in);
    +                        if (length < 0)
    +                            return;
    +                        messageHeader.parameterLength = Ints.checkedCast(length);
    +                        messageHeader.parameters = messageHeader.parameterLength == 0 ? Collections.emptyMap() : new EnumMap<>(ParameterType.class);
    +                        state = State.READ_PARAMETERS_DATA;
    +                        // fall-through
    +                    case READ_PARAMETERS_DATA:
    +                        if (messageHeader.parameterLength > 0)
    +                        {
    +                            if (in.readableBytes() < messageHeader.parameterLength)
    +                                return;
    +                            readParameters(inputPlus, messageHeader.parameterLength, messageHeader.parameters);
    +                        }
    +                        state = State.READ_PAYLOAD_SIZE;
    +                        // fall-through
    +                    case READ_PAYLOAD_SIZE:
    +                        length = VIntCoding.readUnsignedVInt(in);
    +                        if (length < 0)
    +                            return;
    +                        messageHeader.payloadSize = (int) length;
    +                        state = State.READ_PAYLOAD;
    +                        // fall-through
    +                    case READ_PAYLOAD:
    +                        if (in.readableBytes() < messageHeader.payloadSize)
    +                            return;
    +
    +                        MessageIn<Object> messageIn = MessageIn.read(inputPlus, messagingVersion,
    +                                                                     messageHeader.messageId, messageHeader.constructionTime, messageHeader.from,
    +                                                                     messageHeader.payloadSize, messageHeader.verb, messageHeader.parameters);
    +
    +                        if (messageIn != null)
    +                            messageConsumer.accept(messageIn, messageHeader.messageId);
    +
    +                        state = State.READ_PREFIX;
    +                        messageHeader = null;
    +                        break;
    +                    default:
    +                        throw new IllegalStateException("unknown/unhandled state: " + state);
    +                }
    +            }
    +        }
    +
    +        private void readParameters(DataInputPlus inputPlus, int parameterLength, Map<ParameterType, Object> parameters) throws IOException
    +        {
    +            TrackedDataInputPlus inputTracker = new TrackedDataInputPlus(inputPlus);
    +
    +            while (inputTracker.getBytesRead() < parameterLength)
    +            {
    +                String key = DataInputStream.readUTF(inputTracker);
    +                ParameterType parameterType = ParameterType.byName.get(key);
    +                long valueLength = VIntCoding.readUnsignedVInt(inputTracker);
    +                parameters.put(parameterType, parameterType.serializer.deserialize(inputTracker, messagingVersion));
    +            }
    +        }
    +
    +        public void process(RebufferingByteBufDataInputPlus in) throws IOException
    +        {
    +            while (in.isOpen() && !in.isEmpty())
    +            {
    +                messageHeader = readPrefix(in);
    +                messageHeader.from = peer;
    +                messageHeader.verb = MessagingService.Verb.fromId(in.readInt());
    +                messageHeader.parameterLength = Ints.checkedCast(VIntCoding.readUnsignedVInt(in));
    +                messageHeader.parameters = messageHeader.parameterLength == 0 ? Collections.emptyMap() : new EnumMap<>(ParameterType.class);
    +                if (messageHeader.parameterLength > 0)
    +                    readParameters(in, messageHeader.parameterLength, messageHeader.parameters);
    +
    +                messageHeader.payloadSize = Ints.checkedCast(VIntCoding.readUnsignedVInt(in));
    +                MessageIn<Object> messageIn = MessageIn.read(in, messagingVersion,
    +                                                             messageHeader.messageId, messageHeader.constructionTime, messageHeader.from,
    +                                                             messageHeader.payloadSize, messageHeader.verb, messageHeader.parameters);
    +                if (messageIn != null)
    +                    messageConsumer.accept(messageIn, messageHeader.messageId);
    +            }
    +        }
    +    }
    +
    +    /**
    +     * Reads the incoming stream of bytes in the pre-4.0 format.
    +     */
    +    static class MessageInProcessorPre40 extends MessageInProcessor
    +    {
    +        private static final int PARAMETERS_SIZE_LENGTH = Integer.BYTES;
    +        private static final int PARAMETERS_VALUE_SIZE_LENGTH = Integer.BYTES;
    +        private static final int PAYLOAD_SIZE_LENGTH = Integer.BYTES;
    +
    +        MessageInProcessorPre40(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
    +        {
    +            super(peer, messagingVersion, messageConsumer);
    +            assert messagingVersion < MessagingService.VERSION_40;
    +        }
    +
    +        public void process(ByteBuf in) throws IOException
    +        {
    +            ByteBufDataInputPlus inputPlus = new ByteBufDataInputPlus(in);
    +            while (true)
    +            {
    +                switch (state)
    +                {
    +                    case READ_PREFIX:
    +                        if (in.readableBytes() < MessageOut.MESSAGE_PREFIX_SIZE)
    +                            return;
    +                        MessageHeader header = readPrefix(inputPlus);
    +                        if (header == null)
    +                            return;
    +                        messageHeader = header;
    +                        state = State.READ_IP_ADDRESS;
    +                        // fall-through
    +                    case READ_IP_ADDRESS:
    +                        // unfortunately, this assumes knowledge of how CompactEndpointSerializationHelper serializes data (the first byte is the size).
    +                        // first, check that we can actually read the size byte, then check if we can read that number of bytes.
    +                        // the "+ 1" is to make sure we have the size byte in addition to the serialized IP addr count of bytes in the buffer.
    +                        int readableBytes = in.readableBytes();
    +                        if (readableBytes < 1 || readableBytes < in.getByte(in.readerIndex()) + 1)
    +                            return;
    +                        messageHeader.from = CompactEndpointSerializationHelper.instance.deserialize(inputPlus, messagingVersion);
    +                        state = State.READ_VERB;
    +                        // fall-through
    +                    case READ_VERB:
    +                        if (in.readableBytes() < VERB_LENGTH)
    +                            return;
    +                        messageHeader.verb = MessagingService.Verb.fromId(in.readInt());
    +                        state = State.READ_PARAMETERS_SIZE;
    +                        // fall-through
    +                    case READ_PARAMETERS_SIZE:
    +                        if (in.readableBytes() < PARAMETERS_SIZE_LENGTH)
    +                            return;
    +                        messageHeader.parameterLength = in.readInt();
    +                        messageHeader.parameters = messageHeader.parameterLength == 0 ? Collections.emptyMap() : new EnumMap<>(ParameterType.class);
    +                        state = State.READ_PARAMETERS_DATA;
    +                        // fall-through
    +                    case READ_PARAMETERS_DATA:
    +                        if (messageHeader.parameterLength > 0)
    +                        {
    +                            if (!readParameters(in, inputPlus, messageHeader.parameterLength, messageHeader.parameters))
    +                                return;
    +                        }
    +                        state = State.READ_PAYLOAD_SIZE;
    +                        // fall-through
    +                    case READ_PAYLOAD_SIZE:
    +                        if (in.readableBytes() < PAYLOAD_SIZE_LENGTH)
    +                            return;
    +                        messageHeader.payloadSize = in.readInt();
    +                        state = State.READ_PAYLOAD;
    +                        // fall-through
    +                    case READ_PAYLOAD:
    +                        if (in.readableBytes() < messageHeader.payloadSize)
    +                            return;
    +
    +                        MessageIn<Object> messageIn = MessageIn.read(inputPlus, messagingVersion,
    +                                                                     messageHeader.messageId, messageHeader.constructionTime, messageHeader.from,
    +                                                                     messageHeader.payloadSize, messageHeader.verb, messageHeader.parameters);
    +
    +                        if (messageIn != null)
    +                            messageConsumer.accept(messageIn, messageHeader.messageId);
    +
    +                        state = State.READ_PREFIX;
    +                        messageHeader = null;
    +                        break;
    +                    default:
    +                        throw new IllegalStateException("unknown/unhandled state: " + state);
    +                }
    +            }
    +        }
    +
    +        /**
    +         * @return <code>true</code> if all the parameters have been read from the {@link ByteBuf}; else, <code>false</code>.
    +         */
    +        private boolean readParameters(ByteBuf in, ByteBufDataInputPlus inputPlus, int parameterCount, Map<ParameterType, Object> parameters) throws IOException
    +        {
    +            // makes the assumption that map.size() is a constant time function (HashMap.size() is)
    +            while (parameters.size() < parameterCount)
    +            {
    +                if (!canReadNextParam(in))
    +                    return false;
    +
    +                String key = DataInputStream.readUTF(inputPlus);
    +                ParameterType parameterType = ParameterType.byName.get(key);
    +                byte[] value = new byte[in.readInt()];
    +                in.readBytes(value);
    +                try (DataInputBuffer buffer = new DataInputBuffer(value))
    +                {
    +                    parameters.put(parameterType, parameterType.serializer.deserialize(buffer, messagingVersion));
    +                }
    +            }
    +
    +            return true;
    +        }
    +
    +        /**
    +         * Determine if we can read the next parameter from the {@link ByteBuf}. This method will *always* set the {@code in}
    +         * readIndex back to where it was when this method was invoked.
    +         * <p>
    +         * NOTE: this function would be sooo much simpler if we included a parameters length int in the messaging format,
    +         * instead of checking the remaining readable bytes for each field as we're parsing it. c'est la vie ...
    +         */
    +        @VisibleForTesting
    +        boolean canReadNextParam(ByteBuf in)
    +        {
    +            in.markReaderIndex();
    +            // capture the readableBytes value here to avoid all the virtual function calls.
    +            // subtract 6 as we know we'll be reading a short and an int (for the utf and value lengths).
    +            final int minimumBytesRequired = 6;
    +            int readableBytes = in.readableBytes() - minimumBytesRequired;
    +            if (readableBytes < 0)
    +                return false;
    +
    +            // this is a tad invasive, but since we know the UTF string is prefaced with a 2-byte length,
    +            // read that to make sure we have enough bytes to read the string itself.
    +            short strLen = in.readShort();
    +            // check if we can read that many bytes for the UTF
    +            if (strLen > readableBytes)
    +            {
    +                in.resetReaderIndex();
    +                return false;
    +            }
    +            in.skipBytes(strLen);
    +            readableBytes -= strLen;
    +
    +            // check if we can read the value length
    +            if (readableBytes < PARAMETERS_VALUE_SIZE_LENGTH)
    +            {
    +                in.resetReaderIndex();
    +                return false;
    +            }
    +            int valueLength = in.readInt();
    +            // check if we read that many bytes for the value
    +            if (valueLength > readableBytes)
    +            {
    +                in.resetReaderIndex();
    +                return false;
    +            }
    +
    +            in.resetReaderIndex();
    +            return true;
    +        }
    +
    +        public void process(RebufferingByteBufDataInputPlus in) throws IOException
    +        {
    +            while (in.isOpen() && !in.isEmpty())
    +            {
    +                messageHeader = readPrefix(in);
    +                messageHeader.from = CompactEndpointSerializationHelper.instance.deserialize(in, messagingVersion);
    +                messageHeader.verb = MessagingService.Verb.fromId(in.readInt());
    +                messageHeader.parameterLength = in.readInt();
    +                messageHeader.parameters = messageHeader.parameterLength == 0 ? Collections.emptyMap() : new EnumMap<>(ParameterType.class);
    +                if (messageHeader.parameterLength > 0)
    +                    readParameters(in, messageHeader.parameterLength, messageHeader.parameters);
    +
    +                messageHeader.payloadSize = in.readInt();
    +                MessageIn<Object> messageIn = MessageIn.read(in, messagingVersion,
    +                                                             messageHeader.messageId, messageHeader.constructionTime, messageHeader.from,
    +                                                             messageHeader.payloadSize, messageHeader.verb, messageHeader.parameters);
    +                if (messageIn != null)
    +                    messageConsumer.accept(messageIn, messageHeader.messageId);
    +            }
    +        }
    +
    +        private void readParameters(RebufferingByteBufDataInputPlus in, int parameterCount, Map<ParameterType, Object> parameters) throws IOException
    +        {
    +            // makes the assumption that map.size() is a constant time function (HashMap.size() is)
    +            while (parameters.size() < parameterCount)
    +            {
    +                String key = DataInputStream.readUTF(in);
    +                ParameterType parameterType = ParameterType.byName.get(key);
    +                int valueLength = in.readInt();
    --- End diff --
    
    removed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #253: 13630

Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r216047177
  
    --- Diff: src/java/org/apache/cassandra/net/async/MessageOutHandler.java ---
    @@ -140,8 +125,17 @@ public void write(ChannelHandlerContext ctx, Object o, ChannelPromise promise)
     
                 out = ctx.alloc().ioBuffer((int)currentFrameSize);
     
    -            captureTracingInfo(msg);
    -            serializeMessage(msg, out);
    +            @SuppressWarnings("resource")
    +            ByteBufDataOutputPlus outputPlus = new ByteBufDataOutputPlus(out);
    +            msg.message.serialize(outputPlus, targetMessagingVersion, connectionId, msg.id, msg.timestampNanos);
    +
    +            // next few lines are for debugging ... massively helpful!!
    +            // if we allocated too much buffer for this message, we'll log here.
    +            // if we allocated to little buffer space, we would have hit an exception when trying to write more bytes to it
    +            if (out.isWritable())
    +                errorLogger.error("{} reported message size {}, actual message size {}, msg {}",
    --- End diff --
    
    It's fine. We can leave it in.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #253: 13630

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r216157094
  
    --- Diff: src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java ---
    @@ -196,10 +220,12 @@ protected void doFlush(int count) throws IOException
                 int byteCount = buffer.position();
                 currentBuf.writerIndex(byteCount);
     
    -            if (!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, byteCount, 2, TimeUnit.MINUTES))
    +            if (!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, byteCount, rateLimiterBlockTime, rateLimiterBlockTimeUnit))
    --- End diff --
    
    Good catch. ftr, the only place where this went from 2 up to 5 minutes was in `NettyStreamingMessageSender.FIleSendTask` where I [defaulted to 5 minutes](https://github.com/apache/cassandra/pull/253/files#diff-6d6dd6c9e52acdc8462538a0d3695c0aR327). I've switched that call site to default to 2 minutes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #253: 13630

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r216157099
  
    --- Diff: src/java/org/apache/cassandra/net/MessageOut.java ---
    @@ -180,6 +199,73 @@ public String toString()
             return sbuf.toString();
         }
     
    +    /**
    +     * The main entry point for sending an internode message to a peer node in the cluster.
    +     */
    +    public void serialize(DataOutputPlus out, int messagingVersion, OutboundConnectionIdentifier destinationId, int id, long timestampNanos) throws IOException
    +    {
    +        captureTracingInfo(destinationId);
    +
    +        out.writeInt(MessagingService.PROTOCOL_MAGIC);
    +        out.writeInt(id);
    +
    +        // int cast cuts off the high-order half of the timestamp, which we can assume remains
    +        // the same between now and when the recipient reconstructs it.
    +        out.writeInt((int) NanoTimeToCurrentTimeMillis.convert(timestampNanos));
    +        serialize(out, messagingVersion);
    +    }
    +
    +    /**
    +     * Record any tracing data, if enabled on this message.
    +     */
    +    @VisibleForTesting
    +    void captureTracingInfo(OutboundConnectionIdentifier destinationId)
    +    {
    +        try
    +        {
    +            UUID sessionId =  (UUID)getParameter(ParameterType.TRACE_SESSION);
    --- End diff --
    
    fixed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #253: 13630

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r215955652
  
    --- Diff: src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java ---
    @@ -183,6 +195,11 @@ public int available() throws EOFException
             return availableBytes;
         }
     
    +    public boolean isEmpty() throws EOFException
    +    {
    +        return available() == 0;
    --- End diff --
    
    So, looking at this again, I needed to figure out why `available()` does the autoRead check. For posterity, here's the reasoning behind it:
    
    There a case where we enqueue a buffer, and disable autoRead. Then read all of the buffer exactly (and no more bytes are available to read in the instance). In an effort to not block indefinitely (so I could check if `StreamingInboundHandler.close` is true), i called available() (and did the autoRead check) instead of attempt to read a value from the inputPlus (which would call `reBuffer()` and block either indefinitely or throw an exception if I added a timeout). I didn't want the indefinite block nor an exception to be thrown, so I bolted the autoRead check onto `available()`.
    
    I think it still makes sense to move this into a separate method call.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #253: 13630

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r216103840
  
    --- Diff: src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java ---
    @@ -36,6 +37,11 @@
     
     public class RebufferingByteBufDataInputPlus extends RebufferingInputStream implements ReadableByteChannel
     {
    +    /**
    +     * Default to a very large value.
    +     */
    +    private static final long DEFAULT_REBUFFER_BLOCK_IN_MILLIS = TimeUnit.DAYS.toMillis(2);
    --- End diff --
    
    I've lowered it to 3 minutes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #253: 13630

Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r215733891
  
    --- Diff: src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java ---
    @@ -36,6 +37,11 @@
     
     public class RebufferingByteBufDataInputPlus extends RebufferingInputStream implements ReadableByteChannel
     {
    +    /**
    +     * Default to a very large value.
    +     */
    +    private static final long DEFAULT_REBUFFER_BLOCK_IN_MILLIS = TimeUnit.DAYS.toMillis(2);
    --- End diff --
    
    Its risky to default to a very large value. Why do we want to default to such a large value?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #253: 13630

Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r216106193
  
    --- Diff: src/java/org/apache/cassandra/net/MessageOut.java ---
    @@ -180,6 +199,73 @@ public String toString()
             return sbuf.toString();
         }
     
    +    /**
    +     * The main entry point for sending an internode message to a peer node in the cluster.
    +     */
    +    public void serialize(DataOutputPlus out, int messagingVersion, OutboundConnectionIdentifier destinationId, int id, long timestampNanos) throws IOException
    +    {
    +        captureTracingInfo(destinationId);
    +
    +        out.writeInt(MessagingService.PROTOCOL_MAGIC);
    +        out.writeInt(id);
    +
    +        // int cast cuts off the high-order half of the timestamp, which we can assume remains
    +        // the same between now and when the recipient reconstructs it.
    +        out.writeInt((int) NanoTimeToCurrentTimeMillis.convert(timestampNanos));
    +        serialize(out, messagingVersion);
    +    }
    +
    +    /**
    +     * Record any tracing data, if enabled on this message.
    +     */
    +    @VisibleForTesting
    +    void captureTracingInfo(OutboundConnectionIdentifier destinationId)
    +    {
    +        try
    +        {
    +            UUID sessionId =  (UUID)getParameter(ParameterType.TRACE_SESSION);
    --- End diff --
    
    Nit: Spacing


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #253: 13630

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r216118578
  
    --- Diff: src/java/org/apache/cassandra/net/MessageOut.java ---
    @@ -180,6 +199,73 @@ public String toString()
             return sbuf.toString();
         }
     
    +    /**
    +     * The main entry point for sending an internode message to a peer node in the cluster.
    +     */
    +    public void serialize(DataOutputPlus out, int messagingVersion, OutboundConnectionIdentifier destinationId, int id, long timestampNanos) throws IOException
    +    {
    +        captureTracingInfo(destinationId);
    +
    +        out.writeInt(MessagingService.PROTOCOL_MAGIC);
    +        out.writeInt(id);
    +
    +        // int cast cuts off the high-order half of the timestamp, which we can assume remains
    +        // the same between now and when the recipient reconstructs it.
    +        out.writeInt((int) NanoTimeToCurrentTimeMillis.convert(timestampNanos));
    +        serialize(out, messagingVersion);
    +    }
    +
    +    /**
    +     * Record any tracing data, if enabled on this message.
    +     */
    +    @VisibleForTesting
    +    void captureTracingInfo(OutboundConnectionIdentifier destinationId)
    +    {
    +        try
    +        {
    +            UUID sessionId =  (UUID)getParameter(ParameterType.TRACE_SESSION);
    +            if (sessionId != null)
    --- End diff --
    
    Yes. Further, this method is basically taken verbatim from 3.11's [OutboundTcpConnection.writeConnected()](https://github.com/apache/cassandra/blob/cassandra-3.11/src/java/org/apache/cassandra/net/OutboundTcpConnection.java#L312) method.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #253: 13630

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r215939675
  
    --- Diff: src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java ---
    @@ -36,6 +37,11 @@
     
     public class RebufferingByteBufDataInputPlus extends RebufferingInputStream implements ReadableByteChannel
     {
    +    /**
    +     * Default to a very large value.
    +     */
    +    private static final long DEFAULT_REBUFFER_BLOCK_IN_MILLIS = TimeUnit.DAYS.toMillis(2);
    --- End diff --
    
    The corresponding pre-4.0 code just does blocking IO on a socket, with no timeouts. Thus it blocks forever. 2 days seems shorter than forever, but I'm game to change this to any time-bounded value.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #253: 13630

Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r216105848
  
    --- Diff: src/java/org/apache/cassandra/net/MessageOut.java ---
    @@ -180,6 +199,73 @@ public String toString()
             return sbuf.toString();
         }
     
    +    /**
    +     * The main entry point for sending an internode message to a peer node in the cluster.
    +     */
    +    public void serialize(DataOutputPlus out, int messagingVersion, OutboundConnectionIdentifier destinationId, int id, long timestampNanos) throws IOException
    +    {
    +        captureTracingInfo(destinationId);
    +
    +        out.writeInt(MessagingService.PROTOCOL_MAGIC);
    +        out.writeInt(id);
    +
    +        // int cast cuts off the high-order half of the timestamp, which we can assume remains
    +        // the same between now and when the recipient reconstructs it.
    +        out.writeInt((int) NanoTimeToCurrentTimeMillis.convert(timestampNanos));
    +        serialize(out, messagingVersion);
    +    }
    +
    +    /**
    +     * Record any tracing data, if enabled on this message.
    +     */
    +    @VisibleForTesting
    +    void captureTracingInfo(OutboundConnectionIdentifier destinationId)
    +    {
    +        try
    +        {
    +            UUID sessionId =  (UUID)getParameter(ParameterType.TRACE_SESSION);
    +            if (sessionId != null)
    +            {
    +                TraceState state = Tracing.instance.get(sessionId);
    +                String logMessage = String.format("Sending %s message to %s", verb, destinationId.connectionAddress());
    +                // session may have already finished; see CASSANDRA-5668
    +                if (state == null)
    +                {
    +                    Tracing.TraceType traceType = (Tracing.TraceType)getParameter(ParameterType.TRACE_TYPE);
    +                    traceType = traceType == null ? Tracing.TraceType.QUERY : traceType;
    +                    Tracing.instance.trace(ByteBuffer.wrap(UUIDGen.decompose(sessionId)), logMessage, traceType.getTTL());
    +                }
    +                else
    +                {
    +                    state.trace(logMessage);
    +                    if (verb == MessagingService.Verb.REQUEST_RESPONSE)
    +                        Tracing.instance.doneWithNonLocalSession(state);
    +                }
    +            }
    +        }
    +        catch (Exception e)
    +        {
    +            logger.warn("failed to capture the tracing info for an outbound message to {}, ignoring", destinationId, e);
    +        }
    +    }
    +
    +    private Object getParameter(ParameterType type)
    +    {
    +        for (int ii = 0; ii < parameters.size(); ii += PARAMETER_TUPLE_SIZE)
    +        {
    +            if (((ParameterType)parameters.get(ii + PARAMETER_TUPLE_TYPE_OFFSET)).equals(type))
    --- End diff --
    
    Don't need the typecast to `ParameterType`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #253: 13630

Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r216102238
  
    --- Diff: test/unit/org/apache/cassandra/net/MessageInProcessorPre40Test.java ---
    @@ -0,0 +1,115 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.net;
    +
    +import java.io.IOException;
    +import java.net.UnknownHostException;
    +
    +import org.junit.After;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +
    +import io.netty.buffer.ByteBuf;
    +import io.netty.buffer.Unpooled;
    +import org.apache.cassandra.config.DatabaseDescriptor;
    +import org.apache.cassandra.locator.InetAddressAndPort;
    +import org.apache.cassandra.net.MessageIn.MessageInProcessorPre40;
    +import org.apache.cassandra.net.async.ByteBufDataOutputPlus;
    +
    +public class MessageInProcessorPre40Test
    +{
    +    private static InetAddressAndPort addr;
    +
    +    private MessageInProcessorPre40 processor;
    +    private ByteBuf buf;
    +
    +    @BeforeClass
    +    public static void before() throws UnknownHostException
    +    {
    +        DatabaseDescriptor.daemonInitialization();
    +        addr = InetAddressAndPort.getByName("127.0.0.1");
    +    }
    +
    +    @Before
    +    public void setup()
    +    {
    +        processor = new MessageInProcessorPre40(addr, MessagingService.VERSION_30, (messageIn, integer) -> {});
    +    }
    +
    +    @After
    +    public void tearDown()
    +    {
    +        if (buf != null && buf.refCnt() > 0)
    +            buf.release();
    +    }
    +
    +    @Test
    +    public void canReadNextParam_HappyPath() throws IOException
    +    {
    +        buildParamBufPre40(13);
    +        Assert.assertTrue(processor.canReadNextParam(buf));
    +    }
    +
    +    @Test
    +    public void canReadNextParam_OnlyFirstByte() throws IOException
    +    {
    +        buildParamBufPre40(13);
    +        buf.writerIndex(1);
    +        Assert.assertFalse(processor.canReadNextParam(buf));
    +    }
    +
    +    @Test
    +    public void canReadNextParam_PartialUTF() throws IOException
    +    {
    +        buildParamBufPre40(13);
    +        buf.writerIndex(5);
    +        Assert.assertFalse(processor.canReadNextParam(buf));
    +    }
    +
    +    @Test
    +    public void canReadNextParam_TruncatedValueLength() throws IOException
    +    {
    +        buildParamBufPre40(13);
    +        buf.writerIndex(buf.writerIndex() - 13 - 2);
    +        Assert.assertFalse(processor.canReadNextParam(buf));
    +    }
    +
    +    @Test
    +    public void canReadNextParam_MissingLastBytes() throws IOException
    +    {
    +        buildParamBufPre40(13);
    +        buf.writerIndex(buf.writerIndex() - 2);
    +        Assert.assertFalse(processor.canReadNextParam(buf));
    +    }
    +
    +    private void buildParamBufPre40(int valueLength) throws IOException
    +    {
    +        buf = Unpooled.buffer(1024, 1024); // 1k should be enough for everybody!
    --- End diff --
    
    🤣


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #253: CASSANDRA-13630

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r216185533
  
    --- Diff: test/unit/org/apache/cassandra/net/async/ChannelWriterTest.java ---
    @@ -70,32 +86,71 @@ public static void before()
         @Before
         public void setup()
         {
    -        OutboundConnectionIdentifier id = OutboundConnectionIdentifier.small(InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 0),
    -                                                                             InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 0));
    +        if (type == ChannelWriterType.LARGE_MESSAGE)
    +        {
    +            id = OutboundConnectionIdentifier.large(InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 0),
    +                                                    InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 0));
    +        }
    +        else
    +        {
    +            id = OutboundConnectionIdentifier.small(InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 0),
    +                                                    InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 0));
    +        }
    +
             channel = new EmbeddedChannel();
             omc = new NonSendingOutboundMessagingConnection(id, null, Optional.empty());
    -        channelWriter = ChannelWriter.create(channel, omc::handleMessageResult, Optional.empty());
    -        channel.pipeline().addFirst(new MessageOutHandler(id, MessagingService.current_version, channelWriter, () -> null));
    -        coalescingStrategy = CoalescingStrategies.newCoalescingStrategy(CoalescingStrategies.Strategy.FIXED.name(), COALESCE_WINDOW_MS, null, "test");
    +        wrapper = new MessageOutWrapper();
    +        OutboundConnectionParams.Builder builder = OutboundConnectionParams.builder()
    +                                                                           .messageResultConsumer(this::handleMessageResult)
    +                                                                           .coalescingStrategy(Optional.empty())
    +                                                                           .protocolVersion(MessagingService.current_version)
    +                                                                           .connectionId(id);
    +
    +        if (type == ChannelWriterType.COALESCING)
    +            coalescingStrategy = CoalescingStrategies.newCoalescingStrategy(CoalescingStrategies.Strategy.FIXED.name(), COALESCE_WINDOW_MS, null, "test");
    +        else
    +            coalescingStrategy = Optional.empty();
    --- End diff --
    
    Wait for CASSANDRA-14503 :)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #253: CASSANDRA-13630

Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r216180830
  
    --- Diff: test/unit/org/apache/cassandra/net/async/ChannelWriterTest.java ---
    @@ -70,32 +86,71 @@ public static void before()
         @Before
         public void setup()
         {
    -        OutboundConnectionIdentifier id = OutboundConnectionIdentifier.small(InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 0),
    -                                                                             InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 0));
    +        if (type == ChannelWriterType.LARGE_MESSAGE)
    +        {
    +            id = OutboundConnectionIdentifier.large(InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 0),
    +                                                    InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 0));
    +        }
    +        else
    +        {
    +            id = OutboundConnectionIdentifier.small(InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 0),
    +                                                    InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 0));
    +        }
    +
             channel = new EmbeddedChannel();
             omc = new NonSendingOutboundMessagingConnection(id, null, Optional.empty());
    -        channelWriter = ChannelWriter.create(channel, omc::handleMessageResult, Optional.empty());
    -        channel.pipeline().addFirst(new MessageOutHandler(id, MessagingService.current_version, channelWriter, () -> null));
    -        coalescingStrategy = CoalescingStrategies.newCoalescingStrategy(CoalescingStrategies.Strategy.FIXED.name(), COALESCE_WINDOW_MS, null, "test");
    +        wrapper = new MessageOutWrapper();
    +        OutboundConnectionParams.Builder builder = OutboundConnectionParams.builder()
    +                                                                           .messageResultConsumer(this::handleMessageResult)
    +                                                                           .coalescingStrategy(Optional.empty())
    +                                                                           .protocolVersion(MessagingService.current_version)
    +                                                                           .connectionId(id);
    +
    +        if (type == ChannelWriterType.COALESCING)
    +            coalescingStrategy = CoalescingStrategies.newCoalescingStrategy(CoalescingStrategies.Strategy.FIXED.name(), COALESCE_WINDOW_MS, null, "test");
    +        else
    +            coalescingStrategy = Optional.empty();
    --- End diff --
    
    This is beyond the scope of this PR, however, we should refactor `OutboundConnectionParams` to not store an `Optional`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #253: 13630

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r216157096
  
    --- Diff: src/java/org/apache/cassandra/net/async/MessageInHandler.java ---
    @@ -18,143 +18,296 @@
     
     package org.apache.cassandra.net.async;
     
    -import java.io.DataInputStream;
    +import java.io.EOFException;
     import java.io.IOException;
    -import java.util.Collections;
    -import java.util.EnumMap;
    -import java.util.List;
    -import java.util.Map;
    -import java.util.function.BiConsumer;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.RejectedExecutionHandler;
    +import java.util.concurrent.ThreadPoolExecutor;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
     
    -import com.google.common.primitives.Ints;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
     import io.netty.buffer.ByteBuf;
     import io.netty.channel.ChannelHandlerContext;
    -import org.apache.cassandra.io.util.DataInputBuffer;
    +import io.netty.channel.ChannelInboundHandlerAdapter;
    +import io.netty.handler.codec.ByteToMessageDecoder;
    +import io.netty.util.ReferenceCountUtil;
    +import org.apache.cassandra.concurrent.NamedThreadFactory;
    +import org.apache.cassandra.exceptions.UnknownTableException;
     import org.apache.cassandra.locator.InetAddressAndPort;
    -import org.apache.cassandra.net.MessageIn;
    -import org.apache.cassandra.net.MessagingService;
    -import org.apache.cassandra.net.ParameterType;
    -import org.apache.cassandra.utils.vint.VIntCoding;
    +import org.apache.cassandra.net.MessageIn.MessageInProcessor;
     
     /**
      * Parses incoming messages as per the 4.0 internode messaging protocol.
      */
    -public class MessageInHandler extends BaseMessageInHandler
    +public class MessageInHandler extends ChannelInboundHandlerAdapter
     {
         public static final Logger logger = LoggerFactory.getLogger(MessageInHandler.class);
     
    -    private MessageHeader messageHeader;
    +    private final InetAddressAndPort peer;
     
    -    MessageInHandler(InetAddressAndPort peer, int messagingVersion)
    +    private final BufferHandler bufferHandler;
    +    private volatile boolean closed;
    +
    +    public MessageInHandler(InetAddressAndPort peer, MessageInProcessor messageProcessor, boolean handlesLargeMessages)
    +    {
    +        this.peer = peer;
    +
    +        bufferHandler = handlesLargeMessages
    +                        ? new BlockingBufferHandler(messageProcessor)
    +                        : new NonblockingBufferHandler(messageProcessor);
    +    }
    +
    +    public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException
    +    {
    +        if (!closed)
    +        {
    +            bufferHandler.channelRead(ctx, (ByteBuf) msg);
    +        }
    +        else
    +        {
    +            ReferenceCountUtil.release(msg);
    +            ctx.close();
    +        }
    +    }
    +
    +    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
    +    {
    +        if (cause instanceof EOFException)
    +            logger.trace("eof reading from socket; closing", cause);
    +        else if (cause instanceof UnknownTableException)
    +            logger.warn("Got message from unknown table while reading from socket; closing", cause);
    +        else if (cause instanceof IOException)
    +            logger.trace("IOException reading from socket; closing", cause);
    +        else
    +            logger.warn("Unexpected exception caught in inbound channel pipeline from " + ctx.channel().remoteAddress(), cause);
    +
    +        close();
    +        ctx.close();
    +    }
    +
    +    public void channelInactive(ChannelHandlerContext ctx)
    +    {
    +        logger.trace("received channel closed message for peer {} on local addr {}", ctx.channel().remoteAddress(), ctx.channel().localAddress());
    +        close();
    +        ctx.fireChannelInactive();
    +    }
    +
    +    void close()
         {
    -        this (peer, messagingVersion, MESSAGING_SERVICE_CONSUMER);
    +        closed = true;
    +        bufferHandler.close();
         }
     
    -    public MessageInHandler(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
    +    boolean isClosed()
         {
    -        super(peer, messagingVersion, messageConsumer);
    +        return closed;
    +    }
     
    -        assert messagingVersion >= MessagingService.VERSION_40 : String.format("wrong messaging version for this handler: got %d, but expect %d or higher",
    -                                                                              messagingVersion, MessagingService.VERSION_40);
    -        state = State.READ_FIRST_CHUNK;
    +    /**
    +     * An abstraction around how incoming buffers are handled: either in a non-blocking manner ({@link NonblockingBufferHandler})
    +     * or in a blocking manner ({@link BlockingBufferHandler}).
    +     *
    +     * The methods declared here will only be invoked on the netty event loop.
    +     */
    +    interface BufferHandler
    +    {
    +        void channelRead(ChannelHandlerContext ctx, ByteBuf in) throws IOException;
    +
    +        void close();
         }
     
         /**
    -     * For each new message coming in, builds up a {@link MessageHeader} instance incrementally. This method
    -     * attempts to deserialize as much header information as it can out of the incoming {@link ByteBuf}, and
    -     * maintains a trivial state machine to remember progress across invocations.
    +     * Processes incoming buffers on the netty event loop, in a non-blocking manner. If buffers are not completely consumed,
    +     * it is stashed in {@link #retainedInlineBuffer}, and the next incoming buffer is combined with it.
          */
    -    @SuppressWarnings("resource")
    -    public void handleDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception
    +    class NonblockingBufferHandler implements BufferHandler
    --- End diff --
    
    done


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #253: 13630

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r216118489
  
    --- Diff: src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java ---
    @@ -196,10 +220,12 @@ protected void doFlush(int count) throws IOException
                 int byteCount = buffer.position();
                 currentBuf.writerIndex(byteCount);
     
    -            if (!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, byteCount, 2, TimeUnit.MINUTES))
    +            if (!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, byteCount, rateLimiterBlockTime, rateLimiterBlockTimeUnit))
    --- End diff --
    
    yes, i intentionally set it to 2 minutes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #253: 13630

Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r216087977
  
    --- Diff: src/java/org/apache/cassandra/net/MessageIn.java ---
    @@ -231,4 +241,437 @@ public String toString()
             sbuf.append("FROM:").append(from).append(" TYPE:").append(getMessageType()).append(" VERB:").append(verb);
             return sbuf.toString();
         }
    +
    +    public static MessageInProcessor getProcessor(InetAddressAndPort peer, int messagingVersion)
    +    {
    +        return getProcessor(peer, messagingVersion, MessageInProcessor.MESSAGING_SERVICE_CONSUMER);
    +
    +    }
    +
    +    public static MessageInProcessor getProcessor(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
    +    {
    +        return messagingVersion >= MessagingService.VERSION_40
    +               ? new MessageInProcessorAsOf40(peer, messagingVersion, messageConsumer)
    +               : new MessageInProcessorPre40(peer, messagingVersion, messageConsumer);
    +
    +    }
    +
    +    /**
    +     * Implementations contain the mechanics and logic of parsing incoming messages. Allows for both non-blocking
    +     * and blocking styles of interaction via the {@link #process(ByteBuf)} and {@link #process(RebufferingByteBufDataInputPlus)}
    +     * methods, respectively.
    +     *
    +     * Does not contain the actual deserialization code for message fields nor payload. That is left to the
    +     * {@link MessageIn#read(DataInputPlus, int, int)} family of methods.
    +     */
    +    public static abstract class MessageInProcessor
    +    {
    +        /**
    +         * The current state of deserializing an incoming message. This enum is only used in the nonblocking versions.
    +         */
    +        public enum State
    +        {
    +            READ_PREFIX,
    +            READ_IP_ADDRESS,
    +            READ_VERB,
    +            READ_PARAMETERS_SIZE,
    +            READ_PARAMETERS_DATA,
    +            READ_PAYLOAD_SIZE,
    +            READ_PAYLOAD
    +        }
    +
    +        static final int VERB_LENGTH = Integer.BYTES;
    +
    +        /**
    +         * The default target for consuming deserialized {@link MessageIn}.
    +         */
    +        private static final BiConsumer<MessageIn, Integer> MESSAGING_SERVICE_CONSUMER = (messageIn, id) -> MessagingService.instance().receive(messageIn, id);
    +
    +        final InetAddressAndPort peer;
    +        final int messagingVersion;
    +
    +        /**
    +         * Abstracts out depending directly on {@link MessagingService#receive(MessageIn, int)}; this makes tests more sane
    +         * as they don't require nor trigger the entire message processing circus.
    +         */
    +        final BiConsumer<MessageIn, Integer> messageConsumer;
    +
    +        /**
    +         * Captures the current {@link State} of processing a message. Primarily useful in the non-blocking use case.
    +         */
    +        State state = State.READ_PREFIX;
    +
    +        /**
    +         * Captures the current data we've parsed out of in incoming message. Primarily useful in the non-blocking use case.
    +         */
    +        MessageHeader messageHeader;
    +
    +        /**
    +         * Process the buffer in a non-blocking manner. Will try to read out as much of a message(s) as possible,
    +         * and send any fully deserialized messages to {@link #messageConsumer}.
    +         */
    +        public abstract void process(ByteBuf in) throws IOException;
    +
    +        /**
    +         * Process the buffer in a blocking manner. Will read as many messages as possible, blocking for more data,
    +         * and send any fully deserialized messages to {@link #messageConsumer}.
    +         */
    +        public abstract void process(RebufferingByteBufDataInputPlus in) throws IOException;
    +
    +        MessageInProcessor(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
    +        {
    +            this.peer = peer;
    +            this.messagingVersion = messagingVersion;
    +            this.messageConsumer = messageConsumer;
    +        }
    +
    +        /**
    +         * Only applicable in the non-blocking use case, and should ony be used for testing!!!
    +         */
    +        @VisibleForTesting
    +        public MessageHeader getMessageHeader()
    +        {
    +            return messageHeader;
    +        }
    +
    +        /**
    +         * A simple struct to hold the message header data as it is being built up.
    +         */
    +        public static class MessageHeader
    +        {
    +            public int messageId;
    +            long constructionTime;
    +            public InetAddressAndPort from;
    +            public MessagingService.Verb verb;
    +            int payloadSize;
    +
    +            Map<ParameterType, Object> parameters = Collections.emptyMap();
    +
    +            /**
    +             * Length of the parameter data. If the message's version is {@link MessagingService#VERSION_40} or higher,
    +             * this value is the total number of header bytes; else, for legacy messaging, this is the number of
    +             * key/value entries in the header.
    +             */
    +            int parameterLength;
    +        }
    +
    +        MessageHeader readPrefix(DataInputPlus in) throws IOException
    +        {
    +            MessagingService.validateMagic(in.readInt());
    +            MessageHeader messageHeader = new MessageHeader();
    +            messageHeader.messageId = in.readInt();
    +            int messageTimestamp = in.readInt(); // make sure to read the sent timestamp, even if DatabaseDescriptor.hasCrossNodeTimeout() is not enabled
    +            messageHeader.constructionTime = MessageIn.deriveConstructionTime(peer, messageTimestamp, ApproximateTime.currentTimeMillis());
    +
    +            return messageHeader;
    +        }
    +    }
    +
    +    /**
    +     * Reads the incoming stream of bytes in the 4.0 format.
    +     */
    +    static class MessageInProcessorAsOf40 extends MessageInProcessor
    +    {
    +        MessageInProcessorAsOf40(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
    +        {
    +            super(peer, messagingVersion, messageConsumer);
    +            assert messagingVersion >= MessagingService.VERSION_40;
    +        }
    +
    +        @SuppressWarnings("resource")
    +        public void process(ByteBuf in) throws IOException
    +        {
    +            ByteBufDataInputPlus inputPlus = new ByteBufDataInputPlus(in);
    +            while (true)
    +            {
    +                switch (state)
    +                {
    +                    case READ_PREFIX:
    +                        if (in.readableBytes() < MessageOut.MESSAGE_PREFIX_SIZE)
    +                            return;
    +                        MessageHeader header = readPrefix(inputPlus);
    +                        if (header == null)
    +                            return;
    +                        header.from = peer;
    +                        messageHeader = header;
    +                        state = State.READ_VERB;
    +                        // fall-through
    +                    case READ_VERB:
    +                        if (in.readableBytes() < VERB_LENGTH)
    +                            return;
    +                        messageHeader.verb = MessagingService.Verb.fromId(in.readInt());
    +                        state = State.READ_PARAMETERS_SIZE;
    +                        // fall-through
    +                    case READ_PARAMETERS_SIZE:
    +                        long length = VIntCoding.readUnsignedVInt(in);
    +                        if (length < 0)
    +                            return;
    +                        messageHeader.parameterLength = Ints.checkedCast(length);
    +                        messageHeader.parameters = messageHeader.parameterLength == 0 ? Collections.emptyMap() : new EnumMap<>(ParameterType.class);
    +                        state = State.READ_PARAMETERS_DATA;
    +                        // fall-through
    +                    case READ_PARAMETERS_DATA:
    +                        if (messageHeader.parameterLength > 0)
    +                        {
    +                            if (in.readableBytes() < messageHeader.parameterLength)
    +                                return;
    +                            readParameters(inputPlus, messageHeader.parameterLength, messageHeader.parameters);
    +                        }
    +                        state = State.READ_PAYLOAD_SIZE;
    +                        // fall-through
    +                    case READ_PAYLOAD_SIZE:
    +                        length = VIntCoding.readUnsignedVInt(in);
    +                        if (length < 0)
    +                            return;
    +                        messageHeader.payloadSize = (int) length;
    +                        state = State.READ_PAYLOAD;
    +                        // fall-through
    +                    case READ_PAYLOAD:
    +                        if (in.readableBytes() < messageHeader.payloadSize)
    +                            return;
    +
    +                        MessageIn<Object> messageIn = MessageIn.read(inputPlus, messagingVersion,
    +                                                                     messageHeader.messageId, messageHeader.constructionTime, messageHeader.from,
    +                                                                     messageHeader.payloadSize, messageHeader.verb, messageHeader.parameters);
    +
    +                        if (messageIn != null)
    +                            messageConsumer.accept(messageIn, messageHeader.messageId);
    +
    +                        state = State.READ_PREFIX;
    +                        messageHeader = null;
    +                        break;
    +                    default:
    +                        throw new IllegalStateException("unknown/unhandled state: " + state);
    +                }
    +            }
    +        }
    +
    +        private void readParameters(DataInputPlus inputPlus, int parameterLength, Map<ParameterType, Object> parameters) throws IOException
    +        {
    +            TrackedDataInputPlus inputTracker = new TrackedDataInputPlus(inputPlus);
    +
    +            while (inputTracker.getBytesRead() < parameterLength)
    +            {
    +                String key = DataInputStream.readUTF(inputTracker);
    +                ParameterType parameterType = ParameterType.byName.get(key);
    +                long valueLength = VIntCoding.readUnsignedVInt(inputTracker);
    +                parameters.put(parameterType, parameterType.serializer.deserialize(inputTracker, messagingVersion));
    +            }
    +        }
    +
    +        public void process(RebufferingByteBufDataInputPlus in) throws IOException
    +        {
    +            while (in.isOpen() && !in.isEmpty())
    +            {
    +                messageHeader = readPrefix(in);
    +                messageHeader.from = peer;
    +                messageHeader.verb = MessagingService.Verb.fromId(in.readInt());
    +                messageHeader.parameterLength = Ints.checkedCast(VIntCoding.readUnsignedVInt(in));
    +                messageHeader.parameters = messageHeader.parameterLength == 0 ? Collections.emptyMap() : new EnumMap<>(ParameterType.class);
    +                if (messageHeader.parameterLength > 0)
    +                    readParameters(in, messageHeader.parameterLength, messageHeader.parameters);
    +
    +                messageHeader.payloadSize = Ints.checkedCast(VIntCoding.readUnsignedVInt(in));
    +                MessageIn<Object> messageIn = MessageIn.read(in, messagingVersion,
    +                                                             messageHeader.messageId, messageHeader.constructionTime, messageHeader.from,
    +                                                             messageHeader.payloadSize, messageHeader.verb, messageHeader.parameters);
    +                if (messageIn != null)
    +                    messageConsumer.accept(messageIn, messageHeader.messageId);
    +            }
    +        }
    +    }
    +
    +    /**
    +     * Reads the incoming stream of bytes in the pre-4.0 format.
    +     */
    +    static class MessageInProcessorPre40 extends MessageInProcessor
    +    {
    +        private static final int PARAMETERS_SIZE_LENGTH = Integer.BYTES;
    +        private static final int PARAMETERS_VALUE_SIZE_LENGTH = Integer.BYTES;
    +        private static final int PAYLOAD_SIZE_LENGTH = Integer.BYTES;
    +
    +        MessageInProcessorPre40(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
    +        {
    +            super(peer, messagingVersion, messageConsumer);
    +            assert messagingVersion < MessagingService.VERSION_40;
    +        }
    +
    +        public void process(ByteBuf in) throws IOException
    +        {
    +            ByteBufDataInputPlus inputPlus = new ByteBufDataInputPlus(in);
    +            while (true)
    +            {
    +                switch (state)
    +                {
    +                    case READ_PREFIX:
    +                        if (in.readableBytes() < MessageOut.MESSAGE_PREFIX_SIZE)
    +                            return;
    +                        MessageHeader header = readPrefix(inputPlus);
    +                        if (header == null)
    +                            return;
    +                        messageHeader = header;
    +                        state = State.READ_IP_ADDRESS;
    +                        // fall-through
    +                    case READ_IP_ADDRESS:
    +                        // unfortunately, this assumes knowledge of how CompactEndpointSerializationHelper serializes data (the first byte is the size).
    +                        // first, check that we can actually read the size byte, then check if we can read that number of bytes.
    +                        // the "+ 1" is to make sure we have the size byte in addition to the serialized IP addr count of bytes in the buffer.
    +                        int readableBytes = in.readableBytes();
    +                        if (readableBytes < 1 || readableBytes < in.getByte(in.readerIndex()) + 1)
    +                            return;
    +                        messageHeader.from = CompactEndpointSerializationHelper.instance.deserialize(inputPlus, messagingVersion);
    +                        state = State.READ_VERB;
    +                        // fall-through
    +                    case READ_VERB:
    +                        if (in.readableBytes() < VERB_LENGTH)
    +                            return;
    +                        messageHeader.verb = MessagingService.Verb.fromId(in.readInt());
    +                        state = State.READ_PARAMETERS_SIZE;
    +                        // fall-through
    +                    case READ_PARAMETERS_SIZE:
    +                        if (in.readableBytes() < PARAMETERS_SIZE_LENGTH)
    +                            return;
    +                        messageHeader.parameterLength = in.readInt();
    +                        messageHeader.parameters = messageHeader.parameterLength == 0 ? Collections.emptyMap() : new EnumMap<>(ParameterType.class);
    +                        state = State.READ_PARAMETERS_DATA;
    +                        // fall-through
    +                    case READ_PARAMETERS_DATA:
    +                        if (messageHeader.parameterLength > 0)
    +                        {
    +                            if (!readParameters(in, inputPlus, messageHeader.parameterLength, messageHeader.parameters))
    +                                return;
    +                        }
    +                        state = State.READ_PAYLOAD_SIZE;
    +                        // fall-through
    +                    case READ_PAYLOAD_SIZE:
    +                        if (in.readableBytes() < PAYLOAD_SIZE_LENGTH)
    +                            return;
    +                        messageHeader.payloadSize = in.readInt();
    +                        state = State.READ_PAYLOAD;
    +                        // fall-through
    +                    case READ_PAYLOAD:
    +                        if (in.readableBytes() < messageHeader.payloadSize)
    +                            return;
    +
    +                        MessageIn<Object> messageIn = MessageIn.read(inputPlus, messagingVersion,
    +                                                                     messageHeader.messageId, messageHeader.constructionTime, messageHeader.from,
    +                                                                     messageHeader.payloadSize, messageHeader.verb, messageHeader.parameters);
    +
    +                        if (messageIn != null)
    +                            messageConsumer.accept(messageIn, messageHeader.messageId);
    +
    +                        state = State.READ_PREFIX;
    +                        messageHeader = null;
    +                        break;
    +                    default:
    +                        throw new IllegalStateException("unknown/unhandled state: " + state);
    +                }
    +            }
    +        }
    +
    +        /**
    +         * @return <code>true</code> if all the parameters have been read from the {@link ByteBuf}; else, <code>false</code>.
    +         */
    +        private boolean readParameters(ByteBuf in, ByteBufDataInputPlus inputPlus, int parameterCount, Map<ParameterType, Object> parameters) throws IOException
    +        {
    +            // makes the assumption that map.size() is a constant time function (HashMap.size() is)
    +            while (parameters.size() < parameterCount)
    +            {
    +                if (!canReadNextParam(in))
    +                    return false;
    +
    +                String key = DataInputStream.readUTF(inputPlus);
    +                ParameterType parameterType = ParameterType.byName.get(key);
    +                byte[] value = new byte[in.readInt()];
    +                in.readBytes(value);
    +                try (DataInputBuffer buffer = new DataInputBuffer(value))
    +                {
    +                    parameters.put(parameterType, parameterType.serializer.deserialize(buffer, messagingVersion));
    +                }
    +            }
    +
    +            return true;
    +        }
    +
    +        /**
    +         * Determine if we can read the next parameter from the {@link ByteBuf}. This method will *always* set the {@code in}
    +         * readIndex back to where it was when this method was invoked.
    +         * <p>
    +         * NOTE: this function would be sooo much simpler if we included a parameters length int in the messaging format,
    +         * instead of checking the remaining readable bytes for each field as we're parsing it. c'est la vie ...
    +         */
    +        @VisibleForTesting
    +        boolean canReadNextParam(ByteBuf in)
    +        {
    +            in.markReaderIndex();
    +            // capture the readableBytes value here to avoid all the virtual function calls.
    +            // subtract 6 as we know we'll be reading a short and an int (for the utf and value lengths).
    +            final int minimumBytesRequired = 6;
    +            int readableBytes = in.readableBytes() - minimumBytesRequired;
    +            if (readableBytes < 0)
    +                return false;
    +
    +            // this is a tad invasive, but since we know the UTF string is prefaced with a 2-byte length,
    +            // read that to make sure we have enough bytes to read the string itself.
    +            short strLen = in.readShort();
    +            // check if we can read that many bytes for the UTF
    +            if (strLen > readableBytes)
    +            {
    +                in.resetReaderIndex();
    +                return false;
    +            }
    +            in.skipBytes(strLen);
    +            readableBytes -= strLen;
    +
    +            // check if we can read the value length
    +            if (readableBytes < PARAMETERS_VALUE_SIZE_LENGTH)
    +            {
    +                in.resetReaderIndex();
    +                return false;
    +            }
    +            int valueLength = in.readInt();
    +            // check if we read that many bytes for the value
    +            if (valueLength > readableBytes)
    +            {
    +                in.resetReaderIndex();
    +                return false;
    +            }
    +
    +            in.resetReaderIndex();
    +            return true;
    +        }
    +
    +        public void process(RebufferingByteBufDataInputPlus in) throws IOException
    +        {
    +            while (in.isOpen() && !in.isEmpty())
    +            {
    +                messageHeader = readPrefix(in);
    +                messageHeader.from = CompactEndpointSerializationHelper.instance.deserialize(in, messagingVersion);
    +                messageHeader.verb = MessagingService.Verb.fromId(in.readInt());
    +                messageHeader.parameterLength = in.readInt();
    +                messageHeader.parameters = messageHeader.parameterLength == 0 ? Collections.emptyMap() : new EnumMap<>(ParameterType.class);
    +                if (messageHeader.parameterLength > 0)
    +                    readParameters(in, messageHeader.parameterLength, messageHeader.parameters);
    +
    +                messageHeader.payloadSize = in.readInt();
    +                MessageIn<Object> messageIn = MessageIn.read(in, messagingVersion,
    +                                                             messageHeader.messageId, messageHeader.constructionTime, messageHeader.from,
    +                                                             messageHeader.payloadSize, messageHeader.verb, messageHeader.parameters);
    +                if (messageIn != null)
    +                    messageConsumer.accept(messageIn, messageHeader.messageId);
    +            }
    +        }
    +
    +        private void readParameters(RebufferingByteBufDataInputPlus in, int parameterCount, Map<ParameterType, Object> parameters) throws IOException
    +        {
    +            // makes the assumption that map.size() is a constant time function (HashMap.size() is)
    +            while (parameters.size() < parameterCount)
    +            {
    +                String key = DataInputStream.readUTF(in);
    +                ParameterType parameterType = ParameterType.byName.get(key);
    +                int valueLength = in.readInt();
    --- End diff --
    
    Unused variable.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #253: 13630

Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r216098891
  
    --- Diff: src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java ---
    @@ -196,10 +220,12 @@ protected void doFlush(int count) throws IOException
                 int byteCount = buffer.position();
                 currentBuf.writerIndex(byteCount);
     
    -            if (!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, byteCount, 2, TimeUnit.MINUTES))
    +            if (!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, byteCount, rateLimiterBlockTime, rateLimiterBlockTimeUnit))
    --- End diff --
    
    This is going up from 2 minutes to potentially 5 minutes. I think this should be ok but just making sure.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #253: 13630

Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r216086521
  
    --- Diff: src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java ---
    @@ -36,6 +37,11 @@
     
     public class RebufferingByteBufDataInputPlus extends RebufferingInputStream implements ReadableByteChannel
     {
    +    /**
    +     * Default to a very large value.
    +     */
    +    private static final long DEFAULT_REBUFFER_BLOCK_IN_MILLIS = TimeUnit.DAYS.toMillis(2);
    --- End diff --
    
    I think it doesn't make sense to wait for more than 5 minutes to rebuffer. Given buffer sizes are on the order of a few megabytes at most, 5 minutes is an eternity. It's best to keep it short so failures are exposed sooner than later and rather than threads being stuck waiting on a call that doesn't timeout in a reasonable amount of time.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #253: 13630

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r215944830
  
    --- Diff: src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java ---
    @@ -183,6 +195,11 @@ public int available() throws EOFException
             return availableBytes;
         }
     
    +    public boolean isEmpty() throws EOFException
    +    {
    +        return available() == 0;
    --- End diff --
    
    Nice find. I added that side effect to 
    [StreamingInboundHandler to help deserialization](https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java#L167). Clearly, I could move that into a separate method, and have `StreamingInboundHandler` invoke that after the `available()` call. wdyt?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #253: 13630

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r216157097
  
    --- Diff: src/java/org/apache/cassandra/net/MessageOut.java ---
    @@ -180,6 +199,73 @@ public String toString()
             return sbuf.toString();
         }
     
    +    /**
    +     * The main entry point for sending an internode message to a peer node in the cluster.
    +     */
    +    public void serialize(DataOutputPlus out, int messagingVersion, OutboundConnectionIdentifier destinationId, int id, long timestampNanos) throws IOException
    +    {
    +        captureTracingInfo(destinationId);
    +
    +        out.writeInt(MessagingService.PROTOCOL_MAGIC);
    +        out.writeInt(id);
    +
    +        // int cast cuts off the high-order half of the timestamp, which we can assume remains
    +        // the same between now and when the recipient reconstructs it.
    +        out.writeInt((int) NanoTimeToCurrentTimeMillis.convert(timestampNanos));
    +        serialize(out, messagingVersion);
    +    }
    +
    +    /**
    +     * Record any tracing data, if enabled on this message.
    +     */
    +    @VisibleForTesting
    +    void captureTracingInfo(OutboundConnectionIdentifier destinationId)
    +    {
    +        try
    +        {
    +            UUID sessionId =  (UUID)getParameter(ParameterType.TRACE_SESSION);
    +            if (sessionId != null)
    +            {
    +                TraceState state = Tracing.instance.get(sessionId);
    +                String logMessage = String.format("Sending %s message to %s", verb, destinationId.connectionAddress());
    +                // session may have already finished; see CASSANDRA-5668
    +                if (state == null)
    +                {
    +                    Tracing.TraceType traceType = (Tracing.TraceType)getParameter(ParameterType.TRACE_TYPE);
    +                    traceType = traceType == null ? Tracing.TraceType.QUERY : traceType;
    +                    Tracing.instance.trace(ByteBuffer.wrap(UUIDGen.decompose(sessionId)), logMessage, traceType.getTTL());
    +                }
    +                else
    +                {
    +                    state.trace(logMessage);
    +                    if (verb == MessagingService.Verb.REQUEST_RESPONSE)
    +                        Tracing.instance.doneWithNonLocalSession(state);
    +                }
    +            }
    +        }
    +        catch (Exception e)
    +        {
    +            logger.warn("failed to capture the tracing info for an outbound message to {}, ignoring", destinationId, e);
    +        }
    +    }
    +
    +    private Object getParameter(ParameterType type)
    +    {
    +        for (int ii = 0; ii < parameters.size(); ii += PARAMETER_TUPLE_SIZE)
    +        {
    +            if (((ParameterType)parameters.get(ii + PARAMETER_TUPLE_TYPE_OFFSET)).equals(type))
    --- End diff --
    
    done


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra issue #253: CASSANDRA-13630

Posted by jasobrown <gi...@git.apache.org>.
Github user jasobrown commented on the issue:

    https://github.com/apache/cassandra/pull/253
  
    Wait for CASSANDRA-14503 :)
    
    On Sun, Sep 9, 2018 at 18:37 Dinesh <no...@github.com> wrote:
    
    > *@dineshjoshi* commented on this pull request.
    > ------------------------------
    >
    > In test/unit/org/apache/cassandra/net/async/ChannelWriterTest.java
    > <https://github.com/apache/cassandra/pull/253#discussion_r216180830>:
    >
    > >          channel = new EmbeddedChannel();
    >          omc = new NonSendingOutboundMessagingConnection(id, null, Optional.empty());
    > -        channelWriter = ChannelWriter.create(channel, omc::handleMessageResult, Optional.empty());
    > -        channel.pipeline().addFirst(new MessageOutHandler(id, MessagingService.current_version, channelWriter, () -> null));
    > -        coalescingStrategy = CoalescingStrategies.newCoalescingStrategy(CoalescingStrategies.Strategy.FIXED.name(), COALESCE_WINDOW_MS, null, "test");
    > +        wrapper = new MessageOutWrapper();
    > +        OutboundConnectionParams.Builder builder = OutboundConnectionParams.builder()
    > +                                                                           .messageResultConsumer(this::handleMessageResult)
    > +                                                                           .coalescingStrategy(Optional.empty())
    > +                                                                           .protocolVersion(MessagingService.current_version)
    > +                                                                           .connectionId(id);
    > +
    > +        if (type == ChannelWriterType.COALESCING)
    > +            coalescingStrategy = CoalescingStrategies.newCoalescingStrategy(CoalescingStrategies.Strategy.FIXED.name(), COALESCE_WINDOW_MS, null, "test");
    > +        else
    > +            coalescingStrategy = Optional.empty();
    >
    > This is beyond the scope of this PR, however, we should refactor
    > OutboundConnectionParams to not store an Optional.
    >
    > —
    > You are receiving this because you authored the thread.
    > Reply to this email directly, view it on GitHub
    > <https://github.com/apache/cassandra/pull/253#pullrequestreview-153604787>,
    > or mute the thread
    > <https://github.com/notifications/unsubscribe-auth/ABVYQazZAIsQIg5RPNy6jGI5KELiH5Cdks5uZcJYgaJpZM4WGkHi>
    > .
    >



---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #253: 13630

Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r215739220
  
    --- Diff: src/java/org/apache/cassandra/net/MessageIn.java ---
    @@ -231,4 +241,437 @@ public String toString()
             sbuf.append("FROM:").append(from).append(" TYPE:").append(getMessageType()).append(" VERB:").append(verb);
             return sbuf.toString();
         }
    +
    +    public static MessageInProcessor getProcessor(InetAddressAndPort peer, int messagingVersion)
    +    {
    +        return getProcessor(peer, messagingVersion, MessageInProcessor.MESSAGING_SERVICE_CONSUMER);
    +
    +    }
    +
    +    public static MessageInProcessor getProcessor(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
    +    {
    +        return messagingVersion >= MessagingService.VERSION_40
    +               ? new MessageInProcessorAsOf40(peer, messagingVersion, messageConsumer)
    +               : new MessageInProcessorPre40(peer, messagingVersion, messageConsumer);
    +
    +    }
    +
    +    /**
    +     * Implementations contain the mechanics and logic of parsing incoming messages. Allows for both non-blocking
    +     * and blocking styles of interaction via the {@link #process(ByteBuf)} and {@link #process(RebufferingByteBufDataInputPlus)}
    +     * methods, respectively.
    +     *
    +     * Does not contain the actual deserialization code for message fields nor payload. That is left to the
    +     * {@link MessageIn#read(DataInputPlus, int, int)} family of methods.
    +     */
    +    public static abstract class MessageInProcessor
    +    {
    +        /**
    +         * The current state of deserializing an incoming message. This enum is only used in the nonblocking versions.
    +         */
    +        public enum State
    +        {
    +            READ_PREFIX,
    +            READ_IP_ADDRESS,
    +            READ_VERB,
    +            READ_PARAMETERS_SIZE,
    +            READ_PARAMETERS_DATA,
    +            READ_PAYLOAD_SIZE,
    +            READ_PAYLOAD
    +        }
    +
    +        static final int VERB_LENGTH = Integer.BYTES;
    +
    +        /**
    +         * The default target for consuming deserialized {@link MessageIn}.
    +         */
    +        private static final BiConsumer<MessageIn, Integer> MESSAGING_SERVICE_CONSUMER = (messageIn, id) -> MessagingService.instance().receive(messageIn, id);
    +
    +        final InetAddressAndPort peer;
    +        final int messagingVersion;
    +
    +        /**
    +         * Abstracts out depending directly on {@link MessagingService#receive(MessageIn, int)}; this makes tests more sane
    +         * as they don't require nor trigger the entire message processing circus.
    +         */
    +        final BiConsumer<MessageIn, Integer> messageConsumer;
    +
    +        /**
    +         * Captures the current {@link State} of processing a message. Primarily useful in the non-blocking use case.
    +         */
    +        State state = State.READ_PREFIX;
    +
    +        /**
    +         * Captures the current data we've parsed out of in incoming message. Primarily useful in the non-blocking use case.
    +         */
    +        MessageHeader messageHeader;
    +
    +        /**
    +         * Process the buffer in a non-blocking manner. Will try to read out as much of a message(s) as possible,
    +         * and send any fully deserialized messages to {@link #messageConsumer}.
    +         */
    +        public abstract void process(ByteBuf in) throws IOException;
    +
    +        /**
    +         * Process the buffer in a blocking manner. Will read as many messages as possible, blocking for more data,
    +         * and send any fully deserialized messages to {@link #messageConsumer}.
    +         */
    +        public abstract void process(RebufferingByteBufDataInputPlus in) throws IOException;
    +
    +        MessageInProcessor(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
    +        {
    +            this.peer = peer;
    +            this.messagingVersion = messagingVersion;
    +            this.messageConsumer = messageConsumer;
    +        }
    +
    +        /**
    +         * Only applicable in the non-blocking use case, and should ony be used for testing!!!
    +         */
    +        @VisibleForTesting
    +        public MessageHeader getMessageHeader()
    +        {
    +            return messageHeader;
    +        }
    +
    +        /**
    +         * A simple struct to hold the message header data as it is being built up.
    +         */
    +        public static class MessageHeader
    +        {
    +            public int messageId;
    +            long constructionTime;
    +            public InetAddressAndPort from;
    +            public MessagingService.Verb verb;
    +            int payloadSize;
    +
    +            Map<ParameterType, Object> parameters = Collections.emptyMap();
    +
    +            /**
    +             * Length of the parameter data. If the message's version is {@link MessagingService#VERSION_40} or higher,
    +             * this value is the total number of header bytes; else, for legacy messaging, this is the number of
    +             * key/value entries in the header.
    +             */
    +            int parameterLength;
    +        }
    +
    +        MessageHeader readPrefix(DataInputPlus in) throws IOException
    +        {
    +            MessagingService.validateMagic(in.readInt());
    +            MessageHeader messageHeader = new MessageHeader();
    +            messageHeader.messageId = in.readInt();
    +            int messageTimestamp = in.readInt(); // make sure to read the sent timestamp, even if DatabaseDescriptor.hasCrossNodeTimeout() is not enabled
    +            messageHeader.constructionTime = MessageIn.deriveConstructionTime(peer, messageTimestamp, ApproximateTime.currentTimeMillis());
    +
    +            return messageHeader;
    +        }
    +    }
    +
    +    /**
    +     * Reads the incoming stream of bytes in the 4.0 format.
    +     */
    +    static class MessageInProcessorAsOf40 extends MessageInProcessor
    +    {
    +        MessageInProcessorAsOf40(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer)
    +        {
    +            super(peer, messagingVersion, messageConsumer);
    +            assert messagingVersion >= MessagingService.VERSION_40;
    +        }
    +
    +        @SuppressWarnings("resource")
    +        public void process(ByteBuf in) throws IOException
    --- End diff --
    
    This method is a bit complicated to understand. Could you refactor it? For example`RedisDecoder` provides a nice example of this - https://github.com/netty/netty/blob/4.1/codec-redis/src/main/java/io/netty/handler/codec/redis/RedisDecoder.java


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] cassandra pull request #253: 13630

Posted by dineshjoshi <gi...@git.apache.org>.
Github user dineshjoshi commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r215733539
  
    --- Diff: src/java/org/apache/cassandra/net/async/MessageOutHandler.java ---
    @@ -140,8 +125,17 @@ public void write(ChannelHandlerContext ctx, Object o, ChannelPromise promise)
     
                 out = ctx.alloc().ioBuffer((int)currentFrameSize);
     
    -            captureTracingInfo(msg);
    -            serializeMessage(msg, out);
    +            @SuppressWarnings("resource")
    +            ByteBufDataOutputPlus outputPlus = new ByteBufDataOutputPlus(out);
    +            msg.message.serialize(outputPlus, targetMessagingVersion, connectionId, msg.id, msg.timestampNanos);
    +
    +            // next few lines are for debugging ... massively helpful!!
    +            // if we allocated too much buffer for this message, we'll log here.
    +            // if we allocated to little buffer space, we would have hit an exception when trying to write more bytes to it
    +            if (out.isWritable())
    +                errorLogger.error("{} reported message size {}, actual message size {}, msg {}",
    --- End diff --
    
    How likely are we to cause log spam?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org