You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/11/17 23:17:34 UTC
svn commit: r1203394 - in
/cassandra/trunk/src/java/org/apache/cassandra/streaming:
IncomingStreamReader.java StreamHeader.java
Author: brandonwilliams
Date: Thu Nov 17 22:17:34 2011
New Revision: 1203394
URL: http://svn.apache.org/viewvc?rev=1203394&view=rev
Log:
Streaming uses BroadcastAddress instead of the remote socket.
Patch by Vijay, reviewed by brandonwilliams for CASSANDRA-3503
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1203394&r1=1203393&r2=1203394&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Thu Nov 17 22:17:34 2011
@@ -19,6 +19,7 @@
package org.apache.cassandra.streaming;
import java.io.*;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Collections;
@@ -56,8 +57,9 @@ public class IncomingStreamReader
public IncomingStreamReader(StreamHeader header, Socket socket) throws IOException
{
this.socket = socket;
- InetSocketAddress remoteAddress = (InetSocketAddress)socket.getRemoteSocketAddress();
- session = StreamInSession.get(remoteAddress.getAddress(), header.sessionId);
+ InetAddress host = header.broadcastAddress != null ? header.broadcastAddress
+ : ((InetSocketAddress)socket.getRemoteSocketAddress()).getAddress();
+ session = StreamInSession.get(host, header.sessionId);
session.addFiles(header.pendingFiles);
// set the current file we are streaming so progress shows up in jmx
session.setCurrentFile(header.file);
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java?rev=1203394&r1=1203393&r2=1203394&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java Thu Nov 17 22:17:34 2011
@@ -22,12 +22,16 @@ package org.apache.cassandra.streaming;
*/
import java.io.*;
+import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.FBUtilities;
public class StreamHeader
{
@@ -54,6 +58,9 @@ public class StreamHeader
/** files to add to the session */
public final Collection<PendingFile> pendingFiles;
+ /** Address of the sender **/
+ public final InetAddress broadcastAddress;
+
public StreamHeader(String table, long sessionId, PendingFile file)
{
this(table, sessionId, file, Collections.<PendingFile>emptyList());
@@ -61,10 +68,16 @@ public class StreamHeader
public StreamHeader(String table, long sessionId, PendingFile first, Collection<PendingFile> pendingFiles)
{
+ this(table, sessionId, first, pendingFiles, FBUtilities.getBroadcastAddress());
+ }
+
+ public StreamHeader(String table, long sessionId, PendingFile first, Collection<PendingFile> pendingFiles, InetAddress broadcastAddress)
+ {
this.table = table;
this.sessionId = sessionId;
this.file = first;
this.pendingFiles = pendingFiles;
+ this.broadcastAddress = broadcastAddress;
}
private static class StreamHeaderSerializer implements IVersionedSerializer<StreamHeader>
@@ -79,6 +92,7 @@ public class StreamHeader
{
PendingFile.serializer().serialize(file, dos, version);
}
+ CompactEndpointSerializationHelper.serialize(sh.broadcastAddress, dos);
}
public StreamHeader deserialize(DataInput dis, int version) throws IOException
@@ -93,8 +107,10 @@ public class StreamHeader
{
pendingFiles.add(PendingFile.serializer().deserialize(dis, version));
}
-
- return new StreamHeader(table, sessionId, file, pendingFiles);
+ InetAddress bca = null;
+ if (version > MessagingService.VERSION_10)
+ bca = CompactEndpointSerializationHelper.deserialize(dis);
+ return new StreamHeader(table, sessionId, file, pendingFiles, bca);
}
public long serializedSize(StreamHeader streamHeader, int version)