You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2012/11/13 23:20:10 UTC
[6/6] git commit: change stream session ID from (host, counter) to
TimeUUID; patch by yukim reviewed by Michael Kjellman for CASSANDRA-4813
change stream session ID from (host, counter) to TimeUUID; patch by yukim reviewed by Michael Kjellman for CASSANDRA-4813
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/901a54a6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/901a54a6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/901a54a6
Branch: refs/heads/cassandra-1.2
Commit: 901a54a6d6c7b6f7276088e67b45f95dcc7b57f7
Parents: 228d1cf
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Nov 13 16:18:07 2012 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Nov 13 16:18:07 2012 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/net/OutboundTcpConnectionPool.java | 2 +-
.../cassandra/streaming/AbstractStreamSession.java | 20 ++---
.../apache/cassandra/streaming/FileStreamTask.java | 4 +-
.../cassandra/streaming/IncomingStreamReader.java | 9 +-
.../apache/cassandra/streaming/StreamHeader.java | 54 +++--------
.../cassandra/streaming/StreamInSession.java | 70 +++++----------
.../cassandra/streaming/StreamOutSession.java | 53 ++++-------
.../apache/cassandra/streaming/StreamReply.java | 10 ++-
.../streaming/StreamReplyVerbHandler.java | 2 +-
.../apache/cassandra/streaming/StreamRequest.java | 21 +++--
.../serialization/1.2/streaming.StreamHeader.bin | Bin 175902 -> 175917 bytes
.../serialization/1.2/streaming.StreamReply.bin | Bin 73 -> 89 bytes
.../1.2/streaming.StreamRequestMessage.bin | Bin 7167 -> 7215 bytes
.../cassandra/streaming/SerializationsTest.java | 27 +++---
.../cassandra/streaming/StreamingTransferTest.java | 6 +-
16 files changed, 106 insertions(+), 173 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 518d3ec..4f54598 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@
* Exclude gcable tombstones from merkle-tree computation (CASSANDRA-4905)
* Better printing of AbstractBounds for tracing (CASSANDRA-4931)
* Optimize mostRecentTomstone check in CC.collectAllData (CASSANDRA-4883)
+ * Change stream session ID to UUID to avoid collision from same node (CASSANDRA-4813)
Merged from 1.1:
* reset getRangeSlice filter after finishing a row for get_paged_slice
(CASSANDRA-4919)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
index 4d9ce63..237363d 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
@@ -124,7 +124,7 @@ public class OutboundTcpConnectionPool
else
{
Socket socket = SocketChannel.open(new InetSocketAddress(endPoint(), DatabaseDescriptor.getStoragePort())).socket();
- if (Config.getOutboundBindAny())
+ if (Config.getOutboundBindAny() && !socket.isBound())
socket.bind(new InetSocketAddress(FBUtilities.getLocalAddress(), 0));
return socket;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java b/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java
index d190506..dd7d922 100644
--- a/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.streaming;
import java.net.InetAddress;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.LoggerFactory;
@@ -32,33 +33,30 @@ public abstract class AbstractStreamSession implements IEndpointStateChangeSubsc
{
private static final Logger logger = LoggerFactory.getLogger(AbstractStreamSession.class);
+ protected final InetAddress host;
+ protected final UUID sessionId;
protected String table;
- protected Pair<InetAddress, Long> context;
protected final IStreamCallback callback;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
- protected AbstractStreamSession(String table, Pair<InetAddress, Long> context, IStreamCallback callback)
+ protected AbstractStreamSession(String table, InetAddress host, UUID sessionId, IStreamCallback callback)
{
+ this.host = host;
+ this.sessionId = sessionId;
this.table = table;
- this.context = context;
this.callback = callback;
Gossiper.instance.register(this);
FailureDetector.instance.registerFailureDetectionEventListener(this);
}
- public int getSourceFlag()
+ public UUID getSessionId()
{
- return (int)(context.right >> 32);
- }
-
- public long getSessionId()
- {
- return context.right;
+ return sessionId;
}
public InetAddress getHost()
{
- return context.left;
+ return host;
}
public void close(boolean success)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/src/java/org/apache/cassandra/streaming/FileStreamTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/FileStreamTask.java b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
index f4162de..67d5c35 100644
--- a/src/java/org/apache/cassandra/streaming/FileStreamTask.java
+++ b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
@@ -90,7 +90,7 @@ public class FileStreamTask extends WrappedRunnable
// (at this point, if we fail, it is the receiver's job to re-request)
stream();
- StreamOutSession session = StreamOutSession.get(to, header.sessionId);
+ StreamOutSession session = StreamOutSession.get(header.sessionId);
if (session == null)
{
logger.info("Found no stream out session at end of file stream task - this is expected if the receiver went down");
@@ -104,7 +104,7 @@ public class FileStreamTask extends WrappedRunnable
}
catch (IOException e)
{
- StreamOutSession session = StreamOutSession.get(to, header.sessionId);
+ StreamOutSession session = StreamOutSession.get(header.sessionId);
if (session != null)
session.close(false);
throw e;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
index b4bea58..656a99d 100644
--- a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
@@ -60,16 +60,15 @@ public class IncomingStreamReader
public IncomingStreamReader(StreamHeader header, Socket socket) throws IOException
{
socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
- InetAddress host = header.broadcastAddress != null ? header.broadcastAddress
- : ((InetSocketAddress)socket.getRemoteSocketAddress()).getAddress();
+ InetAddress host = ((InetSocketAddress)socket.getRemoteSocketAddress()).getAddress();
if (header.pendingFiles.isEmpty() && header.file != null)
{
// StreamInSession should be created already when receiving 2nd and after files
- if (!StreamInSession.hasSession(host, header.sessionId))
+ if (!StreamInSession.hasSession(header.sessionId))
{
StreamReply reply = new StreamReply("", header.sessionId, StreamReply.Status.SESSION_FAILURE);
OutboundTcpConnection.write(reply.createMessage(),
- Long.toString(header.sessionId),
+ header.sessionId.toString(),
System.currentTimeMillis(),
new DataOutputStream(socket.getOutputStream()),
MessagingService.instance().getVersion(host));
@@ -98,7 +97,7 @@ public class IncomingStreamReader
{
underliningStream = null;
}
- metrics = StreamingMetrics.get(socket.getInetAddress());
+ metrics = StreamingMetrics.get(host);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/src/java/org/apache/cassandra/streaming/StreamHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamHeader.java b/src/java/org/apache/cassandra/streaming/StreamHeader.java
index 00a9a43..7f3e654 100644
--- a/src/java/org/apache/cassandra/streaming/StreamHeader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamHeader.java
@@ -17,59 +17,42 @@
*/
package org.apache.cassandra.streaming;
-import java.io.*;
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.*;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.net.CompactEndpointSerializationHelper;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDSerializer;
public class StreamHeader
{
public static final IVersionedSerializer<StreamHeader> serializer = new StreamHeaderSerializer();
- // Streaming sessionId flags, used to avoid duplicate session id's between nodes.
- // See StreamInSession and StreamOutSession
- public static final int STREAM_IN_SOURCE_FLAG = 0;
- public static final int STREAM_OUT_SOURCE_FLAG = 1;
-
public final String table;
/** file being sent on initial stream */
public final PendingFile file;
- /** session is tuple of (host, sessionid) */
- public final long sessionId;
+ /** session ID */
+ public final UUID sessionId;
/** files to add to the session */
public final Collection<PendingFile> pendingFiles;
- /** Address of the sender **/
- public final InetAddress broadcastAddress;
-
- public StreamHeader(String table, long sessionId, PendingFile file)
+ public StreamHeader(String table, UUID sessionId, PendingFile file)
{
this(table, sessionId, file, Collections.<PendingFile>emptyList());
}
- public StreamHeader(String table, long sessionId, PendingFile first, Collection<PendingFile> pendingFiles)
- {
- this(table, sessionId, first, pendingFiles, FBUtilities.getBroadcastAddress());
- }
-
- public StreamHeader(String table, long sessionId, PendingFile first, Collection<PendingFile> pendingFiles, InetAddress broadcastAddress)
+ public StreamHeader(String table, UUID sessionId, PendingFile first, Collection<PendingFile> pendingFiles)
{
this.table = table;
this.sessionId = sessionId;
this.file = first;
this.pendingFiles = pendingFiles;
- this.broadcastAddress = broadcastAddress;
}
private static class StreamHeaderSerializer implements IVersionedSerializer<StreamHeader>
@@ -77,32 +60,24 @@ public class StreamHeader
public void serialize(StreamHeader sh, DataOutput dos, int version) throws IOException
{
dos.writeUTF(sh.table);
- dos.writeLong(sh.sessionId);
+ UUIDSerializer.serializer.serialize(sh.sessionId, dos, MessagingService.current_version);
PendingFile.serializer.serialize(sh.file, dos, version);
dos.writeInt(sh.pendingFiles.size());
- for(PendingFile file : sh.pendingFiles)
- {
+ for (PendingFile file : sh.pendingFiles)
PendingFile.serializer.serialize(file, dos, version);
- }
- CompactEndpointSerializationHelper.serialize(sh.broadcastAddress, dos);
}
public StreamHeader deserialize(DataInput dis, int version) throws IOException
{
String table = dis.readUTF();
- long sessionId = dis.readLong();
+ UUID sessionId = UUIDSerializer.serializer.deserialize(dis, MessagingService.current_version);
PendingFile file = PendingFile.serializer.deserialize(dis, version);
int size = dis.readInt();
List<PendingFile> pendingFiles = new ArrayList<PendingFile>(size);
for (int i = 0; i < size; i++)
- {
pendingFiles.add(PendingFile.serializer.deserialize(dis, version));
- }
- InetAddress bca = null;
- if (version > MessagingService.VERSION_10)
- bca = CompactEndpointSerializationHelper.deserialize(dis);
- return new StreamHeader(table, sessionId, file, pendingFiles, bca);
+ return new StreamHeader(table, sessionId, file, pendingFiles);
}
public long serializedSize(StreamHeader sh, int version)
@@ -111,9 +86,8 @@ public class StreamHeader
size += TypeSizes.NATIVE.sizeof(sh.sessionId);
size += PendingFile.serializer.serializedSize(sh.file, version);
size += TypeSizes.NATIVE.sizeof(sh.pendingFiles.size());
- for(PendingFile file : sh.pendingFiles)
+ for (PendingFile file : sh.pendingFiles)
size += PendingFile.serializer.serializedSize(file, version);
- size += CompactEndpointSerializationHelper.serializedSize(sh.broadcastAddress);
return size;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/src/java/org/apache/cassandra/streaming/StreamInSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java b/src/java/org/apache/cassandra/streaming/StreamInSession.java
index f8bde91..a61ceb7 100644
--- a/src/java/org/apache/cassandra/streaming/StreamInSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java
@@ -23,88 +23,62 @@ import java.net.InetAddress;
import java.net.Socket;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.MessagingService;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Table;
-import org.apache.cassandra.gms.*;
+import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.OutboundTcpConnection;
-import org.apache.cassandra.utils.Pair;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-import org.cliffc.high_scale_lib.NonBlockingHashSet;
+import org.apache.cassandra.utils.UUIDGen;
/** each context gets its own StreamInSession. So there may be >1 Session per host */
public class StreamInSession extends AbstractStreamSession
{
private static final Logger logger = LoggerFactory.getLogger(StreamInSession.class);
- private static final ConcurrentMap<Pair<InetAddress, Long>, StreamInSession> sessions = new NonBlockingHashMap<Pair<InetAddress, Long>, StreamInSession>();
+ private static final ConcurrentMap<UUID, StreamInSession> sessions = new NonBlockingHashMap<UUID, StreamInSession>();
private final Set<PendingFile> files = new NonBlockingHashSet<PendingFile>();
private final List<SSTableReader> readers = new ArrayList<SSTableReader>();
private PendingFile current;
private Socket socket;
private volatile int retries;
- private final static AtomicInteger sessionIdCounter = new AtomicInteger(0);
-
- /**
- * The next session id is a combination of a local integer counter and a flag used to avoid collisions
- * between session id's generated on different machines. Nodes can may have StreamOutSessions with the
- * following contexts:
- *
- * <1.1.1.1, (stream_in_flag, 6)>
- * <1.1.1.1, (stream_out_flag, 6)>
- *
- * The first is an out stream created in response to a request from node 1.1.1.1. The id (6) was created by
- * the requesting node. The second is an out stream created by this node to push to 1.1.1.1. The id (6) was
- * created by this node.
- *
- * Note: The StreamInSession results in a StreamOutSession on the target that uses the StreamInSession sessionId.
- *
- * @return next StreamInSession sessionId
- */
- private static long nextSessionId()
- {
- return (((long)StreamHeader.STREAM_IN_SOURCE_FLAG << 32) + sessionIdCounter.incrementAndGet());
- }
- private StreamInSession(Pair<InetAddress, Long> context, IStreamCallback callback)
+ private StreamInSession(InetAddress host, UUID sessionId, IStreamCallback callback)
{
- super(null, context, callback);
+ super(null, host, sessionId, callback);
}
public static StreamInSession create(InetAddress host, IStreamCallback callback)
{
- Pair<InetAddress, Long> context = Pair.create(host, nextSessionId());
- StreamInSession session = new StreamInSession(context, callback);
- sessions.put(context, session);
+ StreamInSession session = new StreamInSession(host, UUIDGen.makeType1UUIDFromHost(host), callback);
+ sessions.put(session.getSessionId(), session);
return session;
}
- public static StreamInSession get(InetAddress host, long sessionId)
+ public static StreamInSession get(InetAddress host, UUID sessionId)
{
- Pair<InetAddress, Long> context = Pair.create(host, sessionId);
- StreamInSession session = sessions.get(context);
+ StreamInSession session = sessions.get(sessionId);
if (session == null)
{
- StreamInSession possibleNew = new StreamInSession(context, null);
- if ((session = sessions.putIfAbsent(context, possibleNew)) == null)
+ StreamInSession possibleNew = new StreamInSession(host, sessionId, null);
+ if ((session = sessions.putIfAbsent(sessionId, possibleNew)) == null)
session = possibleNew;
}
return session;
}
- public static boolean hasSession(InetAddress host, long sessionId)
+ public static boolean hasSession(UUID sessionId)
{
- Pair<InetAddress, Long> context = Pair.create(host, sessionId);
- return sessions.get(context) != null;
+ return sessions.get(sessionId) != null;
}
public void setCurrentFile(PendingFile file)
@@ -227,7 +201,7 @@ public class StreamInSession extends AbstractStreamSession
{
if (socket != null)
OutboundTcpConnection.write(reply.createMessage(),
- context.right.toString(),
+ sessionId.toString(),
System.currentTimeMillis(),
new DataOutputStream(socket.getOutputStream()),
MessagingService.instance().getVersion(getHost()));
@@ -246,7 +220,7 @@ public class StreamInSession extends AbstractStreamSession
protected void closeInternal(boolean success)
{
- sessions.remove(context);
+ sessions.remove(sessionId);
if (!success && FailureDetector.instance.isAlive(getHost()))
{
StreamReply reply = new StreamReply("", getSessionId(), StreamReply.Status.SESSION_FAILURE);
@@ -269,11 +243,11 @@ public class StreamInSession extends AbstractStreamSession
public static Set<PendingFile> getIncomingFiles(InetAddress host)
{
Set<PendingFile> set = new HashSet<PendingFile>();
- for (Map.Entry<Pair<InetAddress, Long>, StreamInSession> entry : sessions.entrySet())
+ for (Map.Entry<UUID, StreamInSession> entry : sessions.entrySet())
{
- if (entry.getKey().left.equals(host))
+ StreamInSession session = entry.getValue();
+ if (session.getHost().equals(host))
{
- StreamInSession session = entry.getValue();
if (session.current != null)
set.add(session.current);
set.addAll(session.files);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/src/java/org/apache/cassandra/streaming/StreamOutSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamOutSession.java b/src/java/org/apache/cassandra/streaming/StreamOutSession.java
index e1f42dc..ad312ff 100644
--- a/src/java/org/apache/cassandra/streaming/StreamOutSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamOutSession.java
@@ -28,6 +28,8 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.UUIDGen;
+
import org.cliffc.high_scale_lib.NonBlockingHashMap;
/**
@@ -38,57 +40,37 @@ public class StreamOutSession extends AbstractStreamSession
private static final Logger logger = LoggerFactory.getLogger(StreamOutSession.class);
// one host may have multiple stream sessions.
- private static final ConcurrentMap<Pair<InetAddress, Long>, StreamOutSession> streams = new NonBlockingHashMap<Pair<InetAddress, Long>, StreamOutSession>();
- private final static AtomicInteger sessionIdCounter = new AtomicInteger(0);
-
- /**
- * The next session id is a combination of a local integer counter and a flag used to avoid collisions
- * between session id's generated on different machines. Nodes can may have StreamOutSessions with the
- * following contexts:
- *
- * <1.1.1.1, (stream_in_flag, 6)>
- * <1.1.1.1, (stream_out_flag, 6)>
- *
- * The first is an out stream created in response to a request from node 1.1.1.1. The id (6) was created by
- * the requesting node. The second is an out stream created by this node to push to 1.1.1.1. The id (6) was
- * created by this node.
- * @return next StreamOutSession sessionId
- */
- private static long nextSessionId()
- {
- return (((long)StreamHeader.STREAM_OUT_SOURCE_FLAG << 32) + sessionIdCounter.incrementAndGet());
- }
+ private static final ConcurrentMap<UUID, StreamOutSession> streams = new NonBlockingHashMap<UUID, StreamOutSession>();
public static StreamOutSession create(String table, InetAddress host, IStreamCallback callback)
{
- return create(table, host, nextSessionId(), callback);
+ return create(table, host, UUIDGen.makeType1UUIDFromHost(host), callback);
}
- public static StreamOutSession create(String table, InetAddress host, long sessionId)
+ public static StreamOutSession create(String table, InetAddress host, UUID sessionId)
{
return create(table, host, sessionId, null);
}
- public static StreamOutSession create(String table, InetAddress host, long sessionId, IStreamCallback callback)
+ public static StreamOutSession create(String table, InetAddress host, UUID sessionId, IStreamCallback callback)
{
- Pair<InetAddress, Long> context = Pair.create(host, sessionId);
- StreamOutSession session = new StreamOutSession(table, context, callback);
- streams.put(context, session);
+ StreamOutSession session = new StreamOutSession(table, host, sessionId, callback);
+ streams.put(sessionId, session);
return session;
}
- public static StreamOutSession get(InetAddress host, long sessionId)
+ public static StreamOutSession get(UUID sessionId)
{
- return streams.get(Pair.create(host, sessionId));
+ return streams.get(sessionId);
}
private final Map<String, PendingFile> files = new NonBlockingHashMap<String, PendingFile>();
private volatile String currentFile;
- private StreamOutSession(String table, Pair<InetAddress, Long> context, IStreamCallback callback)
+ private StreamOutSession(String table, InetAddress host, UUID sessionId, IStreamCallback callback)
{
- super(table, context, callback);
+ super(table, host, sessionId, callback);
}
public void addFilesToStream(List<PendingFile> pendingFiles)
@@ -129,13 +111,13 @@ public class StreamOutSession extends AbstractStreamSession
// Release reference on last file (or any uncompleted ones)
for (PendingFile file : files.values())
file.sstable.releaseReference();
- streams.remove(context);
+ streams.remove(sessionId);
}
/** convenience method for use when testing */
void await() throws InterruptedException
{
- while (streams.containsKey(context))
+ while (streams.containsKey(sessionId))
Thread.sleep(10);
}
@@ -157,10 +139,11 @@ public class StreamOutSession extends AbstractStreamSession
public static List<PendingFile> getOutgoingFiles(InetAddress host)
{
List<PendingFile> list = new ArrayList<PendingFile>();
- for (Map.Entry<Pair<InetAddress, Long>, StreamOutSession> entry : streams.entrySet())
+ for (Map.Entry<UUID, StreamOutSession> entry : streams.entrySet())
{
- if (entry.getKey().left.equals(host))
- list.addAll(entry.getValue().getFiles());
+ StreamOutSession session = entry.getValue();
+ if (session.getHost().equals(host))
+ list.addAll(session.getFiles());
}
return list;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/src/java/org/apache/cassandra/streaming/StreamReply.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReply.java b/src/java/org/apache/cassandra/streaming/StreamReply.java
index bfb65e3..eee8e37 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReply.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReply.java
@@ -20,11 +20,13 @@ package org.apache.cassandra.streaming;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.UUID;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.UUIDSerializer;
public class StreamReply
{
@@ -38,11 +40,11 @@ public class StreamReply
public static final IVersionedSerializer<StreamReply> serializer = new FileStatusSerializer();
- public final long sessionId;
+ public final UUID sessionId;
public final String file;
public final Status action;
- public StreamReply(String file, long sessionId, Status action)
+ public StreamReply(String file, UUID sessionId, Status action)
{
this.file = file;
this.action = action;
@@ -68,14 +70,14 @@ public class StreamReply
{
public void serialize(StreamReply reply, DataOutput dos, int version) throws IOException
{
- dos.writeLong(reply.sessionId);
+ UUIDSerializer.serializer.serialize(reply.sessionId, dos, MessagingService.current_version);
dos.writeUTF(reply.file);
dos.writeInt(reply.action.ordinal());
}
public StreamReply deserialize(DataInput dis, int version) throws IOException
{
- long sessionId = dis.readLong();
+ UUID sessionId = UUIDSerializer.serializer.deserialize(dis, MessagingService.current_version);
String targetFile = dis.readUTF();
Status action = Status.values()[dis.readInt()];
return new StreamReply(targetFile, sessionId, action);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java b/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
index 714f76a..ebcee8a 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
@@ -31,7 +31,7 @@ public class StreamReplyVerbHandler implements IVerbHandler<StreamReply>
{
StreamReply reply = message.payload;
logger.debug("Received StreamReply {}", reply);
- StreamOutSession session = StreamOutSession.get(message.from, reply.sessionId);
+ StreamOutSession session = StreamOutSession.get(reply.sessionId);
if (session == null)
{
logger.debug("Received stream action " + reply.action + " for an unknown session from " + message.from);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/src/java/org/apache/cassandra/streaming/StreamRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamRequest.java b/src/java/org/apache/cassandra/streaming/StreamRequest.java
index a8de4a6..b49fb36 100644
--- a/src/java/org/apache/cassandra/streaming/StreamRequest.java
+++ b/src/java/org/apache/cassandra/streaming/StreamRequest.java
@@ -24,6 +24,7 @@ import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.UUID;
import com.google.common.collect.Iterables;
@@ -38,6 +39,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.net.CompactEndpointSerializationHelper;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.UUIDSerializer;
/**
* This class encapsulates the message that needs to be sent to nodes
@@ -50,19 +52,19 @@ public class StreamRequest
{
public static final IVersionedSerializer<StreamRequest> serializer = new StreamRequestSerializer();
- protected final long sessionId;
+ protected final UUID sessionId;
protected final InetAddress target;
// if this is specified, ranges and table should not be.
protected final PendingFile file;
- // if these are specified, file shoud not be.
+ // if these are specified, file should not be.
protected final Collection<Range<Token>> ranges;
protected final String table;
protected final Iterable<ColumnFamilyStore> columnFamilies;
protected final OperationType type;
- StreamRequest(InetAddress target, Collection<Range<Token>> ranges, String table, Iterable<ColumnFamilyStore> columnFamilies, long sessionId, OperationType type)
+ StreamRequest(InetAddress target, Collection<Range<Token>> ranges, String table, Iterable<ColumnFamilyStore> columnFamilies, UUID sessionId, OperationType type)
{
this.target = target;
this.ranges = ranges;
@@ -73,7 +75,7 @@ public class StreamRequest
file = null;
}
- StreamRequest(InetAddress target, PendingFile file, long sessionId)
+ StreamRequest(InetAddress target, PendingFile file, UUID sessionId)
{
this.target = target;
this.file = file;
@@ -100,7 +102,7 @@ public class StreamRequest
sb.append("@");
sb.append(target);
sb.append("------->");
- for ( Range<Token> range : ranges )
+ for (Range<Token> range : ranges)
{
sb.append(range);
sb.append(" ");
@@ -118,7 +120,7 @@ public class StreamRequest
{
public void serialize(StreamRequest srm, DataOutput dos, int version) throws IOException
{
- dos.writeLong(srm.sessionId);
+ UUIDSerializer.serializer.serialize(srm.sessionId, dos, MessagingService.current_version);
CompactEndpointSerializationHelper.serialize(srm.target, dos);
if (srm.file != null)
{
@@ -143,7 +145,7 @@ public class StreamRequest
public StreamRequest deserialize(DataInput dis, int version) throws IOException
{
- long sessionId = dis.readLong();
+ UUID sessionId = UUIDSerializer.serializer.deserialize(dis, MessagingService.current_version);
InetAddress target = CompactEndpointSerializationHelper.deserialize(dis);
boolean singleFile = dis.readBoolean();
if (singleFile)
@@ -156,10 +158,9 @@ public class StreamRequest
String table = dis.readUTF();
int size = dis.readInt();
List<Range<Token>> ranges = (size == 0) ? null : new ArrayList<Range<Token>>(size);
- for( int i = 0; i < size; ++i )
+ for (int i = 0; i < size; ++i)
ranges.add((Range<Token>) AbstractBounds.serializer.deserialize(dis, version).toTokenBounds());
- OperationType type = OperationType.RESTORE_REPLICA_COUNT;
- type = OperationType.valueOf(dis.readUTF());
+ OperationType type = OperationType.valueOf(dis.readUTF());
List<ColumnFamilyStore> stores = new ArrayList<ColumnFamilyStore>();
int cfsSize = dis.readInt();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/test/data/serialization/1.2/streaming.StreamHeader.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/1.2/streaming.StreamHeader.bin b/test/data/serialization/1.2/streaming.StreamHeader.bin
index a9f1d39..ac5b7ac 100644
Binary files a/test/data/serialization/1.2/streaming.StreamHeader.bin and b/test/data/serialization/1.2/streaming.StreamHeader.bin differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/test/data/serialization/1.2/streaming.StreamReply.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/1.2/streaming.StreamReply.bin b/test/data/serialization/1.2/streaming.StreamReply.bin
index 4b74058..6933316 100644
Binary files a/test/data/serialization/1.2/streaming.StreamReply.bin and b/test/data/serialization/1.2/streaming.StreamReply.bin differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/test/data/serialization/1.2/streaming.StreamRequestMessage.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/1.2/streaming.StreamRequestMessage.bin b/test/data/serialization/1.2/streaming.StreamRequestMessage.bin
index 75af388..fd53579 100644
Binary files a/test/data/serialization/1.2/streaming.StreamRequestMessage.bin and b/test/data/serialization/1.2/streaming.StreamRequestMessage.bin differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/SerializationsTest.java b/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
index 95a7d8b..47990ab 100644
--- a/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/streaming/SerializationsTest.java
@@ -22,10 +22,7 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
+import java.util.*;
import org.junit.Test;
@@ -44,6 +41,7 @@ import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.UUIDGen;
public class SerializationsTest extends AbstractSerializationsTester
{
@@ -84,14 +82,15 @@ public class SerializationsTest extends AbstractSerializationsTester
private void testStreamHeaderWrite() throws IOException
{
- StreamHeader sh0 = new StreamHeader("Keyspace1", 123L, makePendingFile(true, 100, OperationType.BOOTSTRAP));
- StreamHeader sh1 = new StreamHeader("Keyspace1", 124L, makePendingFile(false, 100, OperationType.BOOTSTRAP));
+ UUID sessionId = UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress());
+ StreamHeader sh0 = new StreamHeader("Keyspace1", sessionId, makePendingFile(true, 100, OperationType.BOOTSTRAP));
+ StreamHeader sh1 = new StreamHeader("Keyspace1", sessionId, makePendingFile(false, 100, OperationType.BOOTSTRAP));
Collection<PendingFile> files = new ArrayList<PendingFile>();
for (int i = 0; i < 50; i++)
files.add(makePendingFile(i % 2 == 0, 100, OperationType.BOOTSTRAP));
- StreamHeader sh2 = new StreamHeader("Keyspace1", 125L, makePendingFile(true, 100, OperationType.BOOTSTRAP), files);
- StreamHeader sh3 = new StreamHeader("Keyspace1", 125L, null, files);
- StreamHeader sh4 = new StreamHeader("Keyspace1", 125L, makePendingFile(true, 100, OperationType.BOOTSTRAP), new ArrayList<PendingFile>());
+ StreamHeader sh2 = new StreamHeader("Keyspace1", sessionId, makePendingFile(true, 100, OperationType.BOOTSTRAP), files);
+ StreamHeader sh3 = new StreamHeader("Keyspace1", sessionId, null, files);
+ StreamHeader sh4 = new StreamHeader("Keyspace1", sessionId, makePendingFile(true, 100, OperationType.BOOTSTRAP), new ArrayList<PendingFile>());
DataOutputStream out = getOutput("streaming.StreamHeader.bin");
StreamHeader.serializer.serialize(sh0, out, getVersion());
@@ -126,7 +125,8 @@ public class SerializationsTest extends AbstractSerializationsTester
private void testStreamReplyWrite() throws IOException
{
- StreamReply rep = new StreamReply("this is a file", 123L, StreamReply.Status.FILE_FINISHED);
+ UUID sessionId = UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress());
+ StreamReply rep = new StreamReply("this is a file", sessionId, StreamReply.Status.FILE_FINISHED);
DataOutputStream out = getOutput("streaming.StreamReply.bin");
StreamReply.serializer.serialize(rep, out, getVersion());
rep.createMessage().serialize(out, getVersion());
@@ -159,13 +159,14 @@ public class SerializationsTest extends AbstractSerializationsTester
private void testStreamRequestMessageWrite() throws IOException
{
+ UUID sessionId = UUIDGen.makeType1UUIDFromHost(FBUtilities.getLocalAddress());
Collection<Range<Token>> ranges = new ArrayList<Range<Token>>();
for (int i = 0; i < 5; i++)
ranges.add(new Range<Token>(new BytesToken(ByteBufferUtil.bytes(Integer.toString(10*i))), new BytesToken(ByteBufferUtil.bytes(Integer.toString(10*i+5)))));
List<ColumnFamilyStore> stores = Collections.singletonList(Table.open("Keyspace1").getColumnFamilyStore("Standard1"));
- StreamRequest msg0 = new StreamRequest(FBUtilities.getBroadcastAddress(), ranges, "Keyspace1", stores, 123L, OperationType.RESTORE_REPLICA_COUNT);
- StreamRequest msg1 = new StreamRequest(FBUtilities.getBroadcastAddress(), makePendingFile(true, 100, OperationType.BOOTSTRAP), 124L);
- StreamRequest msg2 = new StreamRequest(FBUtilities.getBroadcastAddress(), makePendingFile(false, 100, OperationType.BOOTSTRAP), 124L);
+ StreamRequest msg0 = new StreamRequest(FBUtilities.getBroadcastAddress(), ranges, "Keyspace1", stores, sessionId, OperationType.RESTORE_REPLICA_COUNT);
+ StreamRequest msg1 = new StreamRequest(FBUtilities.getBroadcastAddress(), makePendingFile(true, 100, OperationType.BOOTSTRAP), sessionId);
+ StreamRequest msg2 = new StreamRequest(FBUtilities.getBroadcastAddress(), makePendingFile(false, 100, OperationType.BOOTSTRAP), sessionId);
DataOutputStream out = getOutput("streaming.StreamRequestMessage.bin");
StreamRequest.serializer.serialize(msg0, out, getVersion());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/901a54a6/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 54d32569..2d96030 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -118,7 +118,7 @@ public class StreamingTransferTest extends SchemaLoader
List<Range<Token>> ranges = new ArrayList<Range<Token>>();
ranges.add(new Range<Token>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("key1"))));
ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key2")), p.getMinimumToken()));
- StreamOutSession session = StreamOutSession.create(table.name, LOCAL, null);
+ StreamOutSession session = StreamOutSession.create(table.name, LOCAL, (IStreamCallback)null);
StreamOut.transferSSTables(session, Arrays.asList(sstable), ranges, OperationType.BOOTSTRAP);
session.await();
}
@@ -260,7 +260,7 @@ public class StreamingTransferTest extends SchemaLoader
// Acquiring references, transferSSTables needs it
sstable.acquireReference();
sstable2.acquireReference();
- StreamOutSession session = StreamOutSession.create(tablename, LOCAL, null);
+ StreamOutSession session = StreamOutSession.create(tablename, LOCAL, (IStreamCallback) null);
StreamOut.transferSSTables(session, Arrays.asList(sstable, sstable2), ranges, OperationType.BOOTSTRAP);
session.await();
@@ -316,7 +316,7 @@ public class StreamingTransferTest extends SchemaLoader
if (!SSTableReader.acquireReferences(ssTableReaders))
throw new AssertionError();
- StreamOutSession session = StreamOutSession.create(keyspace, LOCAL, null);
+ StreamOutSession session = StreamOutSession.create(keyspace, LOCAL, (IStreamCallback)null);
StreamOut.transferSSTables(session, ssTableReaders, ranges, OperationType.BOOTSTRAP);
session.await();