You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2020/06/19 15:28:07 UTC

[cassandra] branch trunk updated: Differentiate follower/initator in StreamMessageHeader

This is an automated email from the ASF dual-hosted git repository.

blerer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ebba613  Differentiate follower/initator in StreamMessageHeader
ebba613 is described below

commit ebba613b0b34ea338eed508a3ba6cbb235986fd9
Author: Zhao Yang <zh...@gmail.com>
AuthorDate: Wed Apr 1 15:12:38 2020 +0800

    Differentiate follower/initator in StreamMessageHeader
    
    patch by ZhaoYang; reviewed by Benjamin Lerer for CASSANDRA-15665
---
 CHANGES.txt                                        |  1 +
 .../db/streaming/CassandraIncomingFile.java        |  6 +++
 .../apache/cassandra/streaming/IncomingStream.java |  5 ++
 .../apache/cassandra/streaming/StreamManager.java  | 13 ++---
 .../cassandra/streaming/StreamResultFuture.java    |  2 +-
 .../apache/cassandra/streaming/StreamSession.java  |  5 ++
 .../streaming/async/StreamingInboundHandler.java   | 55 +++-------------------
 .../streaming/messages/IncomingStreamMessage.java  | 20 +++++---
 .../streaming/messages/OutgoingStreamMessage.java  |  3 +-
 .../streaming/messages/StreamInitMessage.java      | 10 ++++
 .../streaming/messages/StreamMessage.java          | 11 +++++
 .../streaming/messages/StreamMessageHeader.java    | 24 +++++++---
 .../microbench/ZeroCopyStreamingBenchmark.java     |  4 +-
 .../CassandraEntireSSTableStreamWriterTest.java    |  3 +-
 .../async/StreamingInboundHandlerTest.java         | 49 +++++++++++--------
 15 files changed, 113 insertions(+), 98 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index aa62351..2d66d05 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha5
+ * Differentiate follower/initator in StreamMessageHeader (CASSANDRA-15665)
  * Add a startup check to detect if LZ4 uses java rather than native implementation (CASSANDRA-15884)
  * Fix missing topology events when running multiple nodes on the same network interface (CASSANDRA-15677)
  * Create config.yml.MIDRES (CASSANDRA-15712)
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
index 807d935..11a18a0 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
@@ -58,6 +58,12 @@ public class CassandraIncomingFile implements IncomingStream
     }
 
     @Override
+    public StreamSession session()
+    {
+        return session;
+    }
+
+    @Override
     public synchronized void read(DataInputPlus in, int version) throws IOException
     {
         CassandraStreamHeader streamHeader = CassandraStreamHeader.serializer.deserialize(in, version);
diff --git a/src/java/org/apache/cassandra/streaming/IncomingStream.java b/src/java/org/apache/cassandra/streaming/IncomingStream.java
index 55fbd4f..0733249 100644
--- a/src/java/org/apache/cassandra/streaming/IncomingStream.java
+++ b/src/java/org/apache/cassandra/streaming/IncomingStream.java
@@ -43,4 +43,9 @@ public interface IncomingStream
     long getSize();
     int getNumFiles();
     TableId getTableId();
+
+    /**
+     * @return stream session used to receive given file
+     */
+    StreamSession session();
 }
diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java b/src/java/org/apache/cassandra/streaming/StreamManager.java
index da77ad2..fa157a8 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManager.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManager.java
@@ -176,17 +176,10 @@ public class StreamManager implements StreamManagerMBean
         return notifier.getNotificationInfo();
     }
 
-    public StreamSession findSession(InetAddressAndPort peer, UUID planId, int sessionIndex)
+    public StreamSession findSession(InetAddressAndPort peer, UUID planId, int sessionIndex, boolean searchInitiatorSessions)
     {
-        // Search follower session first, because in some tests, eg. StreamingTransferTest, both initiator session
-        // and follower session are listening to local host.
-        // TODO CASSANDRA-15665 it's more robust to add "isFollower" flag into {@link  StreamMessageHeader} to distinguish
-        // initiator session and follower session.
-        StreamSession session = findSession(followerStreams, peer, planId, sessionIndex);
-        if (session !=  null)
-            return session;
-
-        return findSession(initiatorStreams, peer, planId, sessionIndex);
+        Map<UUID, StreamResultFuture> streams = searchInitiatorSessions ? initiatorStreams : followerStreams;
+        return findSession(streams, peer, planId, sessionIndex);
     }
 
     private StreamSession findSession(Map<UUID, StreamResultFuture> streams, InetAddressAndPort peer, UUID planId, int sessionIndex)
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index 76a33a0..89a6cf1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -227,7 +227,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
         }
     }
 
-    StreamSession getSession(InetAddressAndPort peer, int sessionIndex)
+    public StreamSession getSession(InetAddressAndPort peer, int sessionIndex)
     {
         return coordinator.getSessionById(peer, sessionIndex);
     }
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index c6ef5f0..f59eaa5 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -241,6 +241,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         logger.debug("Creating stream session to {} as {}", template, isFollower ? "follower" : "initiator");
     }
 
+    public boolean isFollower()
+    {
+        return isFollower;
+    }
+
     public UUID planId()
     {
         return streamResult == null ? null : streamResult.planId;
diff --git a/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
index edc74e3..3b9c172 100644
--- a/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
+++ b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
@@ -21,10 +21,8 @@ package org.apache.cassandra.streaming.async;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
 import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -40,15 +38,11 @@ import io.netty.util.ReferenceCountUtil;
 import io.netty.util.concurrent.FastThreadLocalThread;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.AsyncStreamingInputPlus;
-import org.apache.cassandra.streaming.StreamManager;
 import org.apache.cassandra.streaming.StreamReceiveException;
-import org.apache.cassandra.streaming.StreamResultFuture;
 import org.apache.cassandra.streaming.StreamSession;
-import org.apache.cassandra.streaming.messages.IncomingStreamMessage;
 import org.apache.cassandra.streaming.messages.KeepAliveMessage;
 import org.apache.cassandra.streaming.messages.StreamInitMessage;
 import org.apache.cassandra.streaming.messages.StreamMessage;
-import org.apache.cassandra.streaming.messages.StreamMessageHeader;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
 import static org.apache.cassandra.streaming.async.NettyStreamingMessageSender.createLogTag;
@@ -61,7 +55,6 @@ import static org.apache.cassandra.streaming.async.NettyStreamingMessageSender.c
 public class StreamingInboundHandler extends ChannelInboundHandlerAdapter
 {
     private static final Logger logger = LoggerFactory.getLogger(StreamingInboundHandler.class);
-    private static final Function<SessionIdentifier, StreamSession> DEFAULT_SESSION_PROVIDER = sid -> StreamManager.instance.findSession(sid.from, sid.planId, sid.sessionIndex);
     private static volatile boolean trackInboundHandlers = false;
     private static Collection<StreamingInboundHandler> inboundHandlers;
     private final InetAddressAndPort remoteAddress;
@@ -95,7 +88,7 @@ public class StreamingInboundHandler extends ChannelInboundHandlerAdapter
     public void handlerAdded(ChannelHandlerContext ctx)
     {
         buffers = new AsyncStreamingInputPlus(ctx.channel());
-        Thread blockingIOThread = new FastThreadLocalThread(new StreamDeserializingTask(DEFAULT_SESSION_PROVIDER, session, ctx.channel()),
+        Thread blockingIOThread = new FastThreadLocalThread(new StreamDeserializingTask(session, ctx.channel()),
                                                             String.format("Stream-Deserializer-%s-%s", remoteAddress.toString(), ctx.channel().id()));
         blockingIOThread.setDaemon(true);
         blockingIOThread.start();
@@ -146,15 +139,13 @@ public class StreamingInboundHandler extends ChannelInboundHandlerAdapter
      */
     class StreamDeserializingTask implements Runnable
     {
-        private final Function<SessionIdentifier, StreamSession> sessionProvider;
         private final Channel channel;
 
         @VisibleForTesting
         StreamSession session;
 
-        StreamDeserializingTask(Function<SessionIdentifier, StreamSession> sessionProvider, StreamSession session, Channel channel)
+        StreamDeserializingTask(StreamSession session, Channel channel)
         {
-            this.sessionProvider = sessionProvider;
             this.session = session;
             this.channel = channel;
         }
@@ -232,26 +223,9 @@ public class StreamingInboundHandler extends ChannelInboundHandlerAdapter
 
         StreamSession deriveSession(StreamMessage message)
         {
-            StreamSession streamSession = null;
-            // StreamInitMessage starts a new channel, and IncomingStreamMessage potentially, as well.
-            // IncomingStreamMessage needs a session to be established a priori, though
-            if (message instanceof StreamInitMessage)
-            {
-                assert session == null : "initiator of stream session received a StreamInitMessage";
-                StreamInitMessage init = (StreamInitMessage) message;
-                StreamResultFuture.createFollower(init.sessionIndex, init.planId, init.streamOperation, init.from, channel, init.pendingRepair, init.previewKind);
-                streamSession = sessionProvider.apply(new SessionIdentifier(init.from, init.planId, init.sessionIndex));
-            }
-            else if (message instanceof IncomingStreamMessage)
-            {
-                // TODO: it'd be great to check if the session actually exists before slurping in the entire stream,
-                // but that's a refactoring for another day
-                StreamMessageHeader header = ((IncomingStreamMessage) message).header;
-                streamSession = sessionProvider.apply(new SessionIdentifier(header.sender, header.planId, header.sessionIndex));
-            }
-
-            if (streamSession == null)
-                throw new IllegalStateException(createLogTag(null, channel) + " no session found for message " + message);
+            // StreamInitMessage starts a new channel here, but IncomingStreamMessage needs a session
+            // to be established a priori
+            StreamSession streamSession = message.getOrCreateSession(channel);
 
             // Attach this channel to the session: this only happens upon receiving the first init message as a follower;
             // in all other cases, no new control channel will be added, as the proper control channel will be already attached.
@@ -260,23 +234,6 @@ public class StreamingInboundHandler extends ChannelInboundHandlerAdapter
         }
     }
 
-    /**
-     * A simple struct to wrap the data points required to lookup a {@link StreamSession}
-     */
-    static class SessionIdentifier
-    {
-        final InetAddressAndPort from;
-        final UUID planId;
-        final int sessionIndex;
-
-        SessionIdentifier(InetAddressAndPort from, UUID planId, int sessionIndex)
-        {
-            this.from = from;
-            this.planId = planId;
-            this.sessionIndex = sessionIndex;
-        }
-    }
-
     /** Shutdown for in-JVM tests. For any other usage, tracking of active inbound streaming handlers
      *  should be revisted first and in-JVM shutdown refactored with it.
      *  This does not prevent new inbound handlers being added after shutdown, nor is not thread-safe
@@ -285,7 +242,7 @@ public class StreamingInboundHandler extends ChannelInboundHandlerAdapter
     @VisibleForTesting
     public static void shutdown()
     {
-        assert trackInboundHandlers == true : "in-JVM tests required tracking of inbound streaming handlers";
+        assert trackInboundHandlers : "in-JVM tests required tracking of inbound streaming handlers";
 
         inboundHandlers.forEach(StreamingInboundHandler::close);
         inboundHandlers.clear();
diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java
index 5f69a90..e268747 100644
--- a/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.streaming.messages;
 import java.io.IOException;
 import java.util.Objects;
 
+import io.netty.channel.Channel;
+
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.io.util.DataInputPlus;
 
@@ -38,18 +40,18 @@ public class IncomingStreamMessage extends StreamMessage
         public IncomingStreamMessage deserialize(DataInputPlus input, int version) throws IOException
         {
             StreamMessageHeader header = StreamMessageHeader.serializer.deserialize(input, version);
-            StreamSession session = StreamManager.instance.findSession(header.sender, header.planId, header.sessionIndex);
+            StreamSession session = StreamManager.instance.findSession(header.sender, header.planId, header.sessionIndex, header.sendByFollower);
             if (session == null)
                 throw new IllegalStateException(String.format("unknown stream session: %s - %d", header.planId, header.sessionIndex));
             ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(header.tableId);
             if (cfs == null)
                 throw new StreamReceiveException(session, "CF " + header.tableId + " was dropped during streaming");
 
-            IncomingStream incomingData = cfs.getStreamManager().prepareIncomingStream(session, header);
-            incomingData.read(input, version);
-
             try
             {
+                IncomingStream incomingData = cfs.getStreamManager().prepareIncomingStream(session, header);
+                incomingData.read(input, version);
+
                 return new IncomingStreamMessage(incomingData, header);
             }
             catch (Throwable t)
@@ -70,8 +72,8 @@ public class IncomingStreamMessage extends StreamMessage
         }
     };
 
-    public StreamMessageHeader header;
-    public IncomingStream stream;
+    public final StreamMessageHeader header;
+    public final IncomingStream stream;
 
     public IncomingStreamMessage(IncomingStream stream, StreamMessageHeader header)
     {
@@ -81,6 +83,12 @@ public class IncomingStreamMessage extends StreamMessage
     }
 
     @Override
+    public StreamSession getOrCreateSession(Channel channel)
+    {
+        return stream.session();
+    }
+
+    @Override
     public String toString()
     {
         return "IncomingStreamMessage{" +
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java
index 8406f80..702e806 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java
@@ -58,7 +58,6 @@ public class OutgoingStreamMessage extends StreamMessage
     };
 
     public final StreamMessageHeader header;
-    private final TableId tableId;
     public final OutgoingStream stream;
     private boolean completed = false;
     private boolean transferring = false;
@@ -66,12 +65,12 @@ public class OutgoingStreamMessage extends StreamMessage
     public OutgoingStreamMessage(TableId tableId, StreamSession session, OutgoingStream stream, int sequenceNumber)
     {
         super(Type.STREAM);
-        this.tableId = tableId;
 
         this.stream = stream;
         this.header = new StreamMessageHeader(tableId,
                                               FBUtilities.getBroadcastAddressAndPort(),
                                               session.planId(),
+                                              session.isFollower(),
                                               session.sessionIndex(),
                                               sequenceNumber,
                                               stream.getRepairedAt(),
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index 953f2c4..0d6ef47 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.streaming.messages;
 import java.io.IOException;
 import java.util.UUID;
 
+import io.netty.channel.Channel;
+
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
@@ -27,6 +29,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.streaming.StreamOperation;
 import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.StreamResultFuture;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.utils.UUIDSerializer;
 
@@ -61,6 +64,13 @@ public class StreamInitMessage extends StreamMessage
     }
 
     @Override
+    public StreamSession getOrCreateSession(Channel channel)
+    {
+        return StreamResultFuture.createFollower(sessionIndex, planId, streamOperation, from, channel, pendingRepair, previewKind)
+                                 .getSession(from, sessionIndex);
+    }
+
+    @Override
     public String toString()
     {
         StringBuilder sb = new StringBuilder(128);
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
index 2f42f1b..e2f08fd 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.streaming.messages;
 
 import java.io.IOException;
 
+import io.netty.channel.Channel;
+
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.streaming.StreamSession;
@@ -137,4 +139,13 @@ public abstract class StreamMessage
     {
         return type.priority;
     }
+
+    /**
+     * Get or create a {@link StreamSession} based on this stream message data: not all stream messages support this,
+     * so the default implementation just throws an exception.
+     */
+    public StreamSession getOrCreateSession(Channel channel)
+    {
+        throw new UnsupportedOperationException("Not supported by streaming messages of type: " + this.getClass());
+    }
 }
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessageHeader.java
index e76777a..30afbb8 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessageHeader.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.streaming.messages;
 import java.io.IOException;
 import java.util.UUID;
 
+import com.google.common.base.Objects;
+
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -32,7 +34,7 @@ import org.apache.cassandra.utils.UUIDSerializer;
 import static org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAddressAndPortSerializer;
 
 /**
- * StreamingFileHeader is appended before sending actual data to describe what it's sending.
+ * StreamMessageHeader is appended before sending actual data to describe what it's sending.
  */
 public class StreamMessageHeader
 {
@@ -40,6 +42,8 @@ public class StreamMessageHeader
 
     public final TableId tableId;
     public UUID planId;
+    // it tells us if the file was sent by a follower stream session
+    public final boolean sendByFollower;
     public int sessionIndex;
     public final int sequenceNumber;
     public final long repairedAt;
@@ -49,6 +53,7 @@ public class StreamMessageHeader
     public StreamMessageHeader(TableId tableId,
                                InetAddressAndPort sender,
                                UUID planId,
+                               boolean sendByFollower,
                                int sessionIndex,
                                int sequenceNumber,
                                long repairedAt,
@@ -57,6 +62,7 @@ public class StreamMessageHeader
         this.tableId = tableId;
         this.sender = sender;
         this.planId = planId;
+        this.sendByFollower = sendByFollower;
         this.sessionIndex = sessionIndex;
         this.sequenceNumber = sequenceNumber;
         this.repairedAt = repairedAt;
@@ -71,6 +77,7 @@ public class StreamMessageHeader
         sb.append(", #").append(sequenceNumber);
         sb.append(", repairedAt: ").append(repairedAt);
         sb.append(", pendingRepair: ").append(pendingRepair);
+        sb.append(", sendByFollower: ").append(sendByFollower);
         sb.append(')');
         return sb.toString();
     }
@@ -81,15 +88,15 @@ public class StreamMessageHeader
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         StreamMessageHeader that = (StreamMessageHeader) o;
-        return sequenceNumber == that.sequenceNumber && tableId.equals(that.tableId);
+        return sendByFollower == that.sendByFollower &&
+               sequenceNumber == that.sequenceNumber &&
+               Objects.equal(tableId, that.tableId);
     }
 
     @Override
     public int hashCode()
     {
-        int result = tableId.hashCode();
-        result = 31 * result + sequenceNumber;
-        return result;
+        return Objects.hashCode(tableId, sendByFollower, sequenceNumber);
     }
 
     public void addSessionInfo(StreamSession session)
@@ -98,13 +105,14 @@ public class StreamMessageHeader
         sessionIndex = session.sessionIndex();
     }
 
-    static class FileMessageHeaderSerializer
+    public static class FileMessageHeaderSerializer
     {
         public void serialize(StreamMessageHeader header, DataOutputPlus out, int version) throws IOException
         {
             header.tableId.serialize(out);
             inetAddressAndPortSerializer.serialize(header.sender, out, version);
             UUIDSerializer.serializer.serialize(header.planId, out, version);
+            out.writeBoolean(header.sendByFollower);
             out.writeInt(header.sessionIndex);
             out.writeInt(header.sequenceNumber);
             out.writeLong(header.repairedAt);
@@ -120,12 +128,13 @@ public class StreamMessageHeader
             TableId tableId = TableId.deserialize(in);
             InetAddressAndPort sender = inetAddressAndPortSerializer.deserialize(in, version);
             UUID planId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
+            boolean sendByFollower = in.readBoolean();
             int sessionIndex = in.readInt();
             int sequenceNumber = in.readInt();
             long repairedAt = in.readLong();
             UUID pendingRepair = in.readBoolean() ? UUIDSerializer.serializer.deserialize(in, version) : null;
 
-            return new StreamMessageHeader(tableId, sender, planId, sessionIndex, sequenceNumber, repairedAt, pendingRepair);
+            return new StreamMessageHeader(tableId, sender, planId, sendByFollower, sessionIndex, sequenceNumber, repairedAt, pendingRepair);
         }
 
         public long serializedSize(StreamMessageHeader header, int version)
@@ -133,6 +142,7 @@ public class StreamMessageHeader
             long size = header.tableId.serializedSize();
             size += inetAddressAndPortSerializer.serializedSize(header.sender, version);
             size += UUIDSerializer.serializer.serializedSize(header.planId, version);
+            size += TypeSizes.sizeof(header.sendByFollower);
             size += TypeSizes.sizeof(header.sessionIndex);
             size += TypeSizes.sizeof(header.sequenceNumber);
             size += TypeSizes.sizeof(header.repairedAt);
diff --git a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
index 744750e..8ecf6cb 100644
--- a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
+++ b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
@@ -144,7 +144,7 @@ public class ZeroCopyStreamingBenchmark
                                      .build();
 
             blockStreamReader = new CassandraEntireSSTableStreamReader(new StreamMessageHeader(sstable.metadata().id,
-                                                                                               peer, session.planId(),
+                                                                                               peer, session.planId(), false,
                                                                                                0, 0, 0,
                                                                                                null), entireSSTableStreamHeader, session);
 
@@ -167,7 +167,7 @@ public class ZeroCopyStreamingBenchmark
                                      .build();
 
             partialStreamReader = new CassandraStreamReader(new StreamMessageHeader(sstable.metadata().id,
-                                                                                    peer, session.planId(),
+                                                                                    peer, session.planId(), false,
                                                                                     0, 0, 0,
                                                                                     null),
                                                             partialSSTableStreamHeader, session);
diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
index b8115f4..00a48d1 100644
--- a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
@@ -63,6 +63,7 @@ import org.apache.cassandra.utils.FBUtilities;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class CassandraEntireSSTableStreamWriterTest
 {
@@ -153,7 +154,7 @@ public class CassandraEntireSSTableStreamWriterTest
                                  .withTableId(sstable.metadata().id)
                                  .build();
 
-        CassandraEntireSSTableStreamReader reader = new CassandraEntireSSTableStreamReader(new StreamMessageHeader(sstable.metadata().id, peer, session.planId(), 0, 0, 0, null), header, session);
+        CassandraEntireSSTableStreamReader reader = new CassandraEntireSSTableStreamReader(new StreamMessageHeader(sstable.metadata().id, peer, session.planId(), false, 0, 0, 0, null), header, session);
 
         SSTableMultiWriter sstableWriter = reader.read(new DataInputBuffer(serializedFile.nioBuffer(), false));
         Collection<SSTableReader> newSstables = sstableWriter.finished();
diff --git a/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java b/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java
index 11f6757..d573a15 100644
--- a/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java
+++ b/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.streaming.async;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.UUID;
 
 import com.google.common.net.InetAddresses;
@@ -30,6 +32,10 @@ import org.junit.Test;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.embedded.EmbeddedChannel;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.AsyncStreamingInputPlus;
@@ -39,7 +45,6 @@ import org.apache.cassandra.streaming.StreamManager;
 import org.apache.cassandra.streaming.StreamOperation;
 import org.apache.cassandra.streaming.StreamResultFuture;
 import org.apache.cassandra.streaming.StreamSession;
-import org.apache.cassandra.streaming.async.StreamingInboundHandler.SessionIdentifier;
 import org.apache.cassandra.streaming.messages.CompleteMessage;
 import org.apache.cassandra.streaming.messages.IncomingStreamMessage;
 import org.apache.cassandra.streaming.messages.StreamInitMessage;
@@ -120,45 +125,49 @@ public class StreamingInboundHandlerTest
     public void StreamDeserializingTask_deriveSession_StreamInitMessage()
     {
         StreamInitMessage msg = new StreamInitMessage(REMOTE_ADDR, 0, UUID.randomUUID(), StreamOperation.REPAIR, UUID.randomUUID(), PreviewKind.ALL);
-        StreamingInboundHandler.StreamDeserializingTask task = handler.new StreamDeserializingTask(sid -> createSession(sid), null, channel);
+        StreamingInboundHandler.StreamDeserializingTask task = handler.new StreamDeserializingTask(null, channel);
         StreamSession session = task.deriveSession(msg);
         Assert.assertNotNull(session);
     }
 
-    private StreamSession createSession(SessionIdentifier sid)
-    {
-        return new StreamSession(StreamOperation.BOOTSTRAP, sid.from, (template, messagingVersion) -> null, true, sid.sessionIndex, UUID.randomUUID(), PreviewKind.ALL);
-    }
-
-    @Test (expected = IllegalStateException.class)
+    @Test (expected = UnsupportedOperationException.class)
     public void StreamDeserializingTask_deriveSession_NoSession()
     {
         CompleteMessage msg = new CompleteMessage();
-        StreamingInboundHandler.StreamDeserializingTask task = handler.new StreamDeserializingTask(sid -> createSession(sid), null, channel);
+        StreamingInboundHandler.StreamDeserializingTask task = handler.new StreamDeserializingTask(null, channel);
         task.deriveSession(msg);
     }
 
     @Test (expected = IllegalStateException.class)
-    public void StreamDeserializingTask_deriveSession_IFM_NoSession()
+    public void StreamDeserializingTask_deserialize_ISM_NoSession() throws IOException
     {
-        StreamMessageHeader header = new StreamMessageHeader(TableId.generate(), REMOTE_ADDR, UUID.randomUUID(),
+        StreamMessageHeader header = new StreamMessageHeader(TableId.generate(), REMOTE_ADDR, UUID.randomUUID(), true,
                                                              0, 0, 0, UUID.randomUUID());
-        IncomingStreamMessage msg = new IncomingStreamMessage(null, header);
-        StreamingInboundHandler.StreamDeserializingTask task = handler.new StreamDeserializingTask(sid -> StreamManager.instance.findSession(sid.from, sid.planId, sid.sessionIndex), null, channel);
-        task.deriveSession(msg);
+
+        ByteBuffer temp = ByteBuffer.allocate(1024);
+        DataOutputPlus out = new DataOutputBuffer(temp);
+        StreamMessageHeader.serializer.serialize(header, out, MessagingService.current_version);
+
+        temp.flip();
+        DataInputPlus in = new DataInputBuffer(temp, false);
+        // session not found
+        IncomingStreamMessage.serializer.deserialize(in, MessagingService.current_version);
     }
 
     @Test
-    public void StreamDeserializingTask_deriveSession_IFM_HasSession()
+    public void StreamDeserializingTask_deserialize_ISM_HasSession()
     {
         UUID planId = UUID.randomUUID();
         StreamResultFuture future = StreamResultFuture.createFollower(0, planId, StreamOperation.REPAIR, REMOTE_ADDR, channel, UUID.randomUUID(), PreviewKind.ALL);
         StreamManager.instance.registerFollower(future);
-        StreamMessageHeader header = new StreamMessageHeader(TableId.generate(), REMOTE_ADDR, planId, 0,
-                                                             0, 0, UUID.randomUUID());
-        IncomingStreamMessage msg = new IncomingStreamMessage(null, header);
-        StreamingInboundHandler.StreamDeserializingTask task = handler.new StreamDeserializingTask(sid -> StreamManager.instance.findSession(sid.from, sid.planId, sid.sessionIndex), null, channel);
-        StreamSession session = task.deriveSession(msg);
+        StreamMessageHeader header = new StreamMessageHeader(TableId.generate(), REMOTE_ADDR, planId, false,
+                                                             0, 0, 0, UUID.randomUUID());
+
+        // IncomingStreamMessage.serializer.deserialize
+        StreamSession session = StreamManager.instance.findSession(header.sender, header.planId, header.sessionIndex, header.sendByFollower);
         Assert.assertNotNull(session);
+
+        session = StreamManager.instance.findSession(header.sender, header.planId, header.sessionIndex, !header.sendByFollower);
+        Assert.assertNull(session);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org