You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2020/06/19 15:28:07 UTC
[cassandra] branch trunk updated: Differentiate follower/initator
in StreamMessageHeader
This is an automated email from the ASF dual-hosted git repository.
blerer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new ebba613 Differentiate follower/initator in StreamMessageHeader
ebba613 is described below
commit ebba613b0b34ea338eed508a3ba6cbb235986fd9
Author: Zhao Yang <zh...@gmail.com>
AuthorDate: Wed Apr 1 15:12:38 2020 +0800
Differentiate follower/initator in StreamMessageHeader
patch by ZhaoYang; reviewed by Benjamin Lerer for CASSANDRA-15665
---
CHANGES.txt | 1 +
.../db/streaming/CassandraIncomingFile.java | 6 +++
.../apache/cassandra/streaming/IncomingStream.java | 5 ++
.../apache/cassandra/streaming/StreamManager.java | 13 ++---
.../cassandra/streaming/StreamResultFuture.java | 2 +-
.../apache/cassandra/streaming/StreamSession.java | 5 ++
.../streaming/async/StreamingInboundHandler.java | 55 +++-------------------
.../streaming/messages/IncomingStreamMessage.java | 20 +++++---
.../streaming/messages/OutgoingStreamMessage.java | 3 +-
.../streaming/messages/StreamInitMessage.java | 10 ++++
.../streaming/messages/StreamMessage.java | 11 +++++
.../streaming/messages/StreamMessageHeader.java | 24 +++++++---
.../microbench/ZeroCopyStreamingBenchmark.java | 4 +-
.../CassandraEntireSSTableStreamWriterTest.java | 3 +-
.../async/StreamingInboundHandlerTest.java | 49 +++++++++++--------
15 files changed, 113 insertions(+), 98 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index aa62351..2d66d05 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-alpha5
+ * Differentiate follower/initator in StreamMessageHeader (CASSANDRA-15665)
* Add a startup check to detect if LZ4 uses java rather than native implementation (CASSANDRA-15884)
* Fix missing topology events when running multiple nodes on the same network interface (CASSANDRA-15677)
* Create config.yml.MIDRES (CASSANDRA-15712)
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
index 807d935..11a18a0 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
@@ -58,6 +58,12 @@ public class CassandraIncomingFile implements IncomingStream
}
@Override
+ public StreamSession session()
+ {
+ return session;
+ }
+
+ @Override
public synchronized void read(DataInputPlus in, int version) throws IOException
{
CassandraStreamHeader streamHeader = CassandraStreamHeader.serializer.deserialize(in, version);
diff --git a/src/java/org/apache/cassandra/streaming/IncomingStream.java b/src/java/org/apache/cassandra/streaming/IncomingStream.java
index 55fbd4f..0733249 100644
--- a/src/java/org/apache/cassandra/streaming/IncomingStream.java
+++ b/src/java/org/apache/cassandra/streaming/IncomingStream.java
@@ -43,4 +43,9 @@ public interface IncomingStream
long getSize();
int getNumFiles();
TableId getTableId();
+
+ /**
+ * @return stream session used to receive given file
+ */
+ StreamSession session();
}
diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java b/src/java/org/apache/cassandra/streaming/StreamManager.java
index da77ad2..fa157a8 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManager.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManager.java
@@ -176,17 +176,10 @@ public class StreamManager implements StreamManagerMBean
return notifier.getNotificationInfo();
}
- public StreamSession findSession(InetAddressAndPort peer, UUID planId, int sessionIndex)
+ public StreamSession findSession(InetAddressAndPort peer, UUID planId, int sessionIndex, boolean searchInitiatorSessions)
{
- // Search follower session first, because in some tests, eg. StreamingTransferTest, both initiator session
- // and follower session are listening to local host.
- // TODO CASSANDRA-15665 it's more robust to add "isFollower" flag into {@link StreamMessageHeader} to distinguish
- // initiator session and follower session.
- StreamSession session = findSession(followerStreams, peer, planId, sessionIndex);
- if (session != null)
- return session;
-
- return findSession(initiatorStreams, peer, planId, sessionIndex);
+ Map<UUID, StreamResultFuture> streams = searchInitiatorSessions ? initiatorStreams : followerStreams;
+ return findSession(streams, peer, planId, sessionIndex);
}
private StreamSession findSession(Map<UUID, StreamResultFuture> streams, InetAddressAndPort peer, UUID planId, int sessionIndex)
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index 76a33a0..89a6cf1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -227,7 +227,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
}
}
- StreamSession getSession(InetAddressAndPort peer, int sessionIndex)
+ public StreamSession getSession(InetAddressAndPort peer, int sessionIndex)
{
return coordinator.getSessionById(peer, sessionIndex);
}
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index c6ef5f0..f59eaa5 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -241,6 +241,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber
logger.debug("Creating stream session to {} as {}", template, isFollower ? "follower" : "initiator");
}
+ public boolean isFollower()
+ {
+ return isFollower;
+ }
+
public UUID planId()
{
return streamResult == null ? null : streamResult.planId;
diff --git a/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
index edc74e3..3b9c172 100644
--- a/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
+++ b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
@@ -21,10 +21,8 @@ package org.apache.cassandra.streaming.async;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
-import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
@@ -40,15 +38,11 @@ import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocalThread;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.AsyncStreamingInputPlus;
-import org.apache.cassandra.streaming.StreamManager;
import org.apache.cassandra.streaming.StreamReceiveException;
-import org.apache.cassandra.streaming.StreamResultFuture;
import org.apache.cassandra.streaming.StreamSession;
-import org.apache.cassandra.streaming.messages.IncomingStreamMessage;
import org.apache.cassandra.streaming.messages.KeepAliveMessage;
import org.apache.cassandra.streaming.messages.StreamInitMessage;
import org.apache.cassandra.streaming.messages.StreamMessage;
-import org.apache.cassandra.streaming.messages.StreamMessageHeader;
import org.apache.cassandra.utils.JVMStabilityInspector;
import static org.apache.cassandra.streaming.async.NettyStreamingMessageSender.createLogTag;
@@ -61,7 +55,6 @@ import static org.apache.cassandra.streaming.async.NettyStreamingMessageSender.c
public class StreamingInboundHandler extends ChannelInboundHandlerAdapter
{
private static final Logger logger = LoggerFactory.getLogger(StreamingInboundHandler.class);
- private static final Function<SessionIdentifier, StreamSession> DEFAULT_SESSION_PROVIDER = sid -> StreamManager.instance.findSession(sid.from, sid.planId, sid.sessionIndex);
private static volatile boolean trackInboundHandlers = false;
private static Collection<StreamingInboundHandler> inboundHandlers;
private final InetAddressAndPort remoteAddress;
@@ -95,7 +88,7 @@ public class StreamingInboundHandler extends ChannelInboundHandlerAdapter
public void handlerAdded(ChannelHandlerContext ctx)
{
buffers = new AsyncStreamingInputPlus(ctx.channel());
- Thread blockingIOThread = new FastThreadLocalThread(new StreamDeserializingTask(DEFAULT_SESSION_PROVIDER, session, ctx.channel()),
+ Thread blockingIOThread = new FastThreadLocalThread(new StreamDeserializingTask(session, ctx.channel()),
String.format("Stream-Deserializer-%s-%s", remoteAddress.toString(), ctx.channel().id()));
blockingIOThread.setDaemon(true);
blockingIOThread.start();
@@ -146,15 +139,13 @@ public class StreamingInboundHandler extends ChannelInboundHandlerAdapter
*/
class StreamDeserializingTask implements Runnable
{
- private final Function<SessionIdentifier, StreamSession> sessionProvider;
private final Channel channel;
@VisibleForTesting
StreamSession session;
- StreamDeserializingTask(Function<SessionIdentifier, StreamSession> sessionProvider, StreamSession session, Channel channel)
+ StreamDeserializingTask(StreamSession session, Channel channel)
{
- this.sessionProvider = sessionProvider;
this.session = session;
this.channel = channel;
}
@@ -232,26 +223,9 @@ public class StreamingInboundHandler extends ChannelInboundHandlerAdapter
StreamSession deriveSession(StreamMessage message)
{
- StreamSession streamSession = null;
- // StreamInitMessage starts a new channel, and IncomingStreamMessage potentially, as well.
- // IncomingStreamMessage needs a session to be established a priori, though
- if (message instanceof StreamInitMessage)
- {
- assert session == null : "initiator of stream session received a StreamInitMessage";
- StreamInitMessage init = (StreamInitMessage) message;
- StreamResultFuture.createFollower(init.sessionIndex, init.planId, init.streamOperation, init.from, channel, init.pendingRepair, init.previewKind);
- streamSession = sessionProvider.apply(new SessionIdentifier(init.from, init.planId, init.sessionIndex));
- }
- else if (message instanceof IncomingStreamMessage)
- {
- // TODO: it'd be great to check if the session actually exists before slurping in the entire stream,
- // but that's a refactoring for another day
- StreamMessageHeader header = ((IncomingStreamMessage) message).header;
- streamSession = sessionProvider.apply(new SessionIdentifier(header.sender, header.planId, header.sessionIndex));
- }
-
- if (streamSession == null)
- throw new IllegalStateException(createLogTag(null, channel) + " no session found for message " + message);
+ // StreamInitMessage starts a new channel here, but IncomingStreamMessage needs a session
+ // to be established a priori
+ StreamSession streamSession = message.getOrCreateSession(channel);
// Attach this channel to the session: this only happens upon receiving the first init message as a follower;
// in all other cases, no new control channel will be added, as the proper control channel will be already attached.
@@ -260,23 +234,6 @@ public class StreamingInboundHandler extends ChannelInboundHandlerAdapter
}
}
- /**
- * A simple struct to wrap the data points required to lookup a {@link StreamSession}
- */
- static class SessionIdentifier
- {
- final InetAddressAndPort from;
- final UUID planId;
- final int sessionIndex;
-
- SessionIdentifier(InetAddressAndPort from, UUID planId, int sessionIndex)
- {
- this.from = from;
- this.planId = planId;
- this.sessionIndex = sessionIndex;
- }
- }
-
/** Shutdown for in-JVM tests. For any other usage, tracking of active inbound streaming handlers
* should be revisted first and in-JVM shutdown refactored with it.
* This does not prevent new inbound handlers being added after shutdown, nor is not thread-safe
@@ -285,7 +242,7 @@ public class StreamingInboundHandler extends ChannelInboundHandlerAdapter
@VisibleForTesting
public static void shutdown()
{
- assert trackInboundHandlers == true : "in-JVM tests required tracking of inbound streaming handlers";
+ assert trackInboundHandlers : "in-JVM tests required tracking of inbound streaming handlers";
inboundHandlers.forEach(StreamingInboundHandler::close);
inboundHandlers.clear();
diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java
index 5f69a90..e268747 100644
--- a/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.streaming.messages;
import java.io.IOException;
import java.util.Objects;
+import io.netty.channel.Channel;
+
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.io.util.DataInputPlus;
@@ -38,18 +40,18 @@ public class IncomingStreamMessage extends StreamMessage
public IncomingStreamMessage deserialize(DataInputPlus input, int version) throws IOException
{
StreamMessageHeader header = StreamMessageHeader.serializer.deserialize(input, version);
- StreamSession session = StreamManager.instance.findSession(header.sender, header.planId, header.sessionIndex);
+ StreamSession session = StreamManager.instance.findSession(header.sender, header.planId, header.sessionIndex, header.sendByFollower);
if (session == null)
throw new IllegalStateException(String.format("unknown stream session: %s - %d", header.planId, header.sessionIndex));
ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(header.tableId);
if (cfs == null)
throw new StreamReceiveException(session, "CF " + header.tableId + " was dropped during streaming");
- IncomingStream incomingData = cfs.getStreamManager().prepareIncomingStream(session, header);
- incomingData.read(input, version);
-
try
{
+ IncomingStream incomingData = cfs.getStreamManager().prepareIncomingStream(session, header);
+ incomingData.read(input, version);
+
return new IncomingStreamMessage(incomingData, header);
}
catch (Throwable t)
@@ -70,8 +72,8 @@ public class IncomingStreamMessage extends StreamMessage
}
};
- public StreamMessageHeader header;
- public IncomingStream stream;
+ public final StreamMessageHeader header;
+ public final IncomingStream stream;
public IncomingStreamMessage(IncomingStream stream, StreamMessageHeader header)
{
@@ -81,6 +83,12 @@ public class IncomingStreamMessage extends StreamMessage
}
@Override
+ public StreamSession getOrCreateSession(Channel channel)
+ {
+ return stream.session();
+ }
+
+ @Override
public String toString()
{
return "IncomingStreamMessage{" +
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java
index 8406f80..702e806 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java
@@ -58,7 +58,6 @@ public class OutgoingStreamMessage extends StreamMessage
};
public final StreamMessageHeader header;
- private final TableId tableId;
public final OutgoingStream stream;
private boolean completed = false;
private boolean transferring = false;
@@ -66,12 +65,12 @@ public class OutgoingStreamMessage extends StreamMessage
public OutgoingStreamMessage(TableId tableId, StreamSession session, OutgoingStream stream, int sequenceNumber)
{
super(Type.STREAM);
- this.tableId = tableId;
this.stream = stream;
this.header = new StreamMessageHeader(tableId,
FBUtilities.getBroadcastAddressAndPort(),
session.planId(),
+ session.isFollower(),
session.sessionIndex(),
sequenceNumber,
stream.getRepairedAt(),
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index 953f2c4..0d6ef47 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.streaming.messages;
import java.io.IOException;
import java.util.UUID;
+import io.netty.channel.Channel;
+
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
@@ -27,6 +29,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.streaming.StreamOperation;
import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.StreamResultFuture;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.utils.UUIDSerializer;
@@ -61,6 +64,13 @@ public class StreamInitMessage extends StreamMessage
}
@Override
+ public StreamSession getOrCreateSession(Channel channel)
+ {
+ return StreamResultFuture.createFollower(sessionIndex, planId, streamOperation, from, channel, pendingRepair, previewKind)
+ .getSession(from, sessionIndex);
+ }
+
+ @Override
public String toString()
{
StringBuilder sb = new StringBuilder(128);
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
index 2f42f1b..e2f08fd 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.streaming.messages;
import java.io.IOException;
+import io.netty.channel.Channel;
+
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.streaming.StreamSession;
@@ -137,4 +139,13 @@ public abstract class StreamMessage
{
return type.priority;
}
+
+ /**
+ * Get or create a {@link StreamSession} based on this stream message data: not all stream messages support this,
+ * so the default implementation just throws an exception.
+ */
+ public StreamSession getOrCreateSession(Channel channel)
+ {
+ throw new UnsupportedOperationException("Not supported by streaming messages of type: " + this.getClass());
+ }
}
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessageHeader.java
index e76777a..30afbb8 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessageHeader.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.streaming.messages;
import java.io.IOException;
import java.util.UUID;
+import com.google.common.base.Objects;
+
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -32,7 +34,7 @@ import org.apache.cassandra.utils.UUIDSerializer;
import static org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAddressAndPortSerializer;
/**
- * StreamingFileHeader is appended before sending actual data to describe what it's sending.
+ * StreamMessageHeader is appended before sending actual data to describe what it's sending.
*/
public class StreamMessageHeader
{
@@ -40,6 +42,8 @@ public class StreamMessageHeader
public final TableId tableId;
public UUID planId;
+ // it tells us if the file was sent by a follower stream session
+ public final boolean sendByFollower;
public int sessionIndex;
public final int sequenceNumber;
public final long repairedAt;
@@ -49,6 +53,7 @@ public class StreamMessageHeader
public StreamMessageHeader(TableId tableId,
InetAddressAndPort sender,
UUID planId,
+ boolean sendByFollower,
int sessionIndex,
int sequenceNumber,
long repairedAt,
@@ -57,6 +62,7 @@ public class StreamMessageHeader
this.tableId = tableId;
this.sender = sender;
this.planId = planId;
+ this.sendByFollower = sendByFollower;
this.sessionIndex = sessionIndex;
this.sequenceNumber = sequenceNumber;
this.repairedAt = repairedAt;
@@ -71,6 +77,7 @@ public class StreamMessageHeader
sb.append(", #").append(sequenceNumber);
sb.append(", repairedAt: ").append(repairedAt);
sb.append(", pendingRepair: ").append(pendingRepair);
+ sb.append(", sendByFollower: ").append(sendByFollower);
sb.append(')');
return sb.toString();
}
@@ -81,15 +88,15 @@ public class StreamMessageHeader
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
StreamMessageHeader that = (StreamMessageHeader) o;
- return sequenceNumber == that.sequenceNumber && tableId.equals(that.tableId);
+ return sendByFollower == that.sendByFollower &&
+ sequenceNumber == that.sequenceNumber &&
+ Objects.equal(tableId, that.tableId);
}
@Override
public int hashCode()
{
- int result = tableId.hashCode();
- result = 31 * result + sequenceNumber;
- return result;
+ return Objects.hashCode(tableId, sendByFollower, sequenceNumber);
}
public void addSessionInfo(StreamSession session)
@@ -98,13 +105,14 @@ public class StreamMessageHeader
sessionIndex = session.sessionIndex();
}
- static class FileMessageHeaderSerializer
+ public static class FileMessageHeaderSerializer
{
public void serialize(StreamMessageHeader header, DataOutputPlus out, int version) throws IOException
{
header.tableId.serialize(out);
inetAddressAndPortSerializer.serialize(header.sender, out, version);
UUIDSerializer.serializer.serialize(header.planId, out, version);
+ out.writeBoolean(header.sendByFollower);
out.writeInt(header.sessionIndex);
out.writeInt(header.sequenceNumber);
out.writeLong(header.repairedAt);
@@ -120,12 +128,13 @@ public class StreamMessageHeader
TableId tableId = TableId.deserialize(in);
InetAddressAndPort sender = inetAddressAndPortSerializer.deserialize(in, version);
UUID planId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
+ boolean sendByFollower = in.readBoolean();
int sessionIndex = in.readInt();
int sequenceNumber = in.readInt();
long repairedAt = in.readLong();
UUID pendingRepair = in.readBoolean() ? UUIDSerializer.serializer.deserialize(in, version) : null;
- return new StreamMessageHeader(tableId, sender, planId, sessionIndex, sequenceNumber, repairedAt, pendingRepair);
+ return new StreamMessageHeader(tableId, sender, planId, sendByFollower, sessionIndex, sequenceNumber, repairedAt, pendingRepair);
}
public long serializedSize(StreamMessageHeader header, int version)
@@ -133,6 +142,7 @@ public class StreamMessageHeader
long size = header.tableId.serializedSize();
size += inetAddressAndPortSerializer.serializedSize(header.sender, version);
size += UUIDSerializer.serializer.serializedSize(header.planId, version);
+ size += TypeSizes.sizeof(header.sendByFollower);
size += TypeSizes.sizeof(header.sessionIndex);
size += TypeSizes.sizeof(header.sequenceNumber);
size += TypeSizes.sizeof(header.repairedAt);
diff --git a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
index 744750e..8ecf6cb 100644
--- a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
+++ b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
@@ -144,7 +144,7 @@ public class ZeroCopyStreamingBenchmark
.build();
blockStreamReader = new CassandraEntireSSTableStreamReader(new StreamMessageHeader(sstable.metadata().id,
- peer, session.planId(),
+ peer, session.planId(), false,
0, 0, 0,
null), entireSSTableStreamHeader, session);
@@ -167,7 +167,7 @@ public class ZeroCopyStreamingBenchmark
.build();
partialStreamReader = new CassandraStreamReader(new StreamMessageHeader(sstable.metadata().id,
- peer, session.planId(),
+ peer, session.planId(), false,
0, 0, 0,
null),
partialSSTableStreamHeader, session);
diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
index b8115f4..00a48d1 100644
--- a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
@@ -63,6 +63,7 @@ import org.apache.cassandra.utils.FBUtilities;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class CassandraEntireSSTableStreamWriterTest
{
@@ -153,7 +154,7 @@ public class CassandraEntireSSTableStreamWriterTest
.withTableId(sstable.metadata().id)
.build();
- CassandraEntireSSTableStreamReader reader = new CassandraEntireSSTableStreamReader(new StreamMessageHeader(sstable.metadata().id, peer, session.planId(), 0, 0, 0, null), header, session);
+ CassandraEntireSSTableStreamReader reader = new CassandraEntireSSTableStreamReader(new StreamMessageHeader(sstable.metadata().id, peer, session.planId(), false, 0, 0, 0, null), header, session);
SSTableMultiWriter sstableWriter = reader.read(new DataInputBuffer(serializedFile.nioBuffer(), false));
Collection<SSTableReader> newSstables = sstableWriter.finished();
diff --git a/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java b/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java
index 11f6757..d573a15 100644
--- a/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java
+++ b/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java
@@ -18,6 +18,8 @@
package org.apache.cassandra.streaming.async;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.UUID;
import com.google.common.net.InetAddresses;
@@ -30,6 +32,10 @@ import org.junit.Test;
import io.netty.buffer.ByteBuf;
import io.netty.channel.embedded.EmbeddedChannel;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.AsyncStreamingInputPlus;
@@ -39,7 +45,6 @@ import org.apache.cassandra.streaming.StreamManager;
import org.apache.cassandra.streaming.StreamOperation;
import org.apache.cassandra.streaming.StreamResultFuture;
import org.apache.cassandra.streaming.StreamSession;
-import org.apache.cassandra.streaming.async.StreamingInboundHandler.SessionIdentifier;
import org.apache.cassandra.streaming.messages.CompleteMessage;
import org.apache.cassandra.streaming.messages.IncomingStreamMessage;
import org.apache.cassandra.streaming.messages.StreamInitMessage;
@@ -120,45 +125,49 @@ public class StreamingInboundHandlerTest
public void StreamDeserializingTask_deriveSession_StreamInitMessage()
{
StreamInitMessage msg = new StreamInitMessage(REMOTE_ADDR, 0, UUID.randomUUID(), StreamOperation.REPAIR, UUID.randomUUID(), PreviewKind.ALL);
- StreamingInboundHandler.StreamDeserializingTask task = handler.new StreamDeserializingTask(sid -> createSession(sid), null, channel);
+ StreamingInboundHandler.StreamDeserializingTask task = handler.new StreamDeserializingTask(null, channel);
StreamSession session = task.deriveSession(msg);
Assert.assertNotNull(session);
}
- private StreamSession createSession(SessionIdentifier sid)
- {
- return new StreamSession(StreamOperation.BOOTSTRAP, sid.from, (template, messagingVersion) -> null, true, sid.sessionIndex, UUID.randomUUID(), PreviewKind.ALL);
- }
-
- @Test (expected = IllegalStateException.class)
+ @Test (expected = UnsupportedOperationException.class)
public void StreamDeserializingTask_deriveSession_NoSession()
{
CompleteMessage msg = new CompleteMessage();
- StreamingInboundHandler.StreamDeserializingTask task = handler.new StreamDeserializingTask(sid -> createSession(sid), null, channel);
+ StreamingInboundHandler.StreamDeserializingTask task = handler.new StreamDeserializingTask(null, channel);
task.deriveSession(msg);
}
@Test (expected = IllegalStateException.class)
- public void StreamDeserializingTask_deriveSession_IFM_NoSession()
+ public void StreamDeserializingTask_deserialize_ISM_NoSession() throws IOException
{
- StreamMessageHeader header = new StreamMessageHeader(TableId.generate(), REMOTE_ADDR, UUID.randomUUID(),
+ StreamMessageHeader header = new StreamMessageHeader(TableId.generate(), REMOTE_ADDR, UUID.randomUUID(), true,
0, 0, 0, UUID.randomUUID());
- IncomingStreamMessage msg = new IncomingStreamMessage(null, header);
- StreamingInboundHandler.StreamDeserializingTask task = handler.new StreamDeserializingTask(sid -> StreamManager.instance.findSession(sid.from, sid.planId, sid.sessionIndex), null, channel);
- task.deriveSession(msg);
+
+ ByteBuffer temp = ByteBuffer.allocate(1024);
+ DataOutputPlus out = new DataOutputBuffer(temp);
+ StreamMessageHeader.serializer.serialize(header, out, MessagingService.current_version);
+
+ temp.flip();
+ DataInputPlus in = new DataInputBuffer(temp, false);
+ // session not found
+ IncomingStreamMessage.serializer.deserialize(in, MessagingService.current_version);
}
@Test
- public void StreamDeserializingTask_deriveSession_IFM_HasSession()
+ public void StreamDeserializingTask_deserialize_ISM_HasSession()
{
UUID planId = UUID.randomUUID();
StreamResultFuture future = StreamResultFuture.createFollower(0, planId, StreamOperation.REPAIR, REMOTE_ADDR, channel, UUID.randomUUID(), PreviewKind.ALL);
StreamManager.instance.registerFollower(future);
- StreamMessageHeader header = new StreamMessageHeader(TableId.generate(), REMOTE_ADDR, planId, 0,
- 0, 0, UUID.randomUUID());
- IncomingStreamMessage msg = new IncomingStreamMessage(null, header);
- StreamingInboundHandler.StreamDeserializingTask task = handler.new StreamDeserializingTask(sid -> StreamManager.instance.findSession(sid.from, sid.planId, sid.sessionIndex), null, channel);
- StreamSession session = task.deriveSession(msg);
+ StreamMessageHeader header = new StreamMessageHeader(TableId.generate(), REMOTE_ADDR, planId, false,
+ 0, 0, 0, UUID.randomUUID());
+
+ // IncomingStreamMessage.serializer.deserialize
+ StreamSession session = StreamManager.instance.findSession(header.sender, header.planId, header.sessionIndex, header.sendByFollower);
Assert.assertNotNull(session);
+
+ session = StreamManager.instance.findSession(header.sender, header.planId, header.sessionIndex, !header.sendByFollower);
+ Assert.assertNull(session);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org