You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/11/17 23:17:34 UTC

svn commit: r1203394 - in /cassandra/trunk/src/java/org/apache/cassandra/streaming: IncomingStreamReader.java StreamHeader.java

Author: brandonwilliams
Date: Thu Nov 17 22:17:34 2011
New Revision: 1203394

URL: http://svn.apache.org/viewvc?rev=1203394&view=rev
Log:
Streaming uses BroadcastAddress instead of the remote socket.
Patch by Vijay, reviewed by brandonwilliams for CASSANDRA-3503

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1203394&r1=1203393&r2=1203394&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Thu Nov 17 22:17:34 2011
@@ -19,6 +19,7 @@
 package org.apache.cassandra.streaming;
 
 import java.io.*;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.util.Collections;
@@ -56,8 +57,9 @@ public class IncomingStreamReader
     public IncomingStreamReader(StreamHeader header, Socket socket) throws IOException
     {
         this.socket = socket;
-        InetSocketAddress remoteAddress = (InetSocketAddress)socket.getRemoteSocketAddress();
-        session = StreamInSession.get(remoteAddress.getAddress(), header.sessionId);
+        InetAddress host = header.broadcastAddress != null ? header.broadcastAddress
+                           : ((InetSocketAddress)socket.getRemoteSocketAddress()).getAddress();
+        session = StreamInSession.get(host, header.sessionId);
         session.addFiles(header.pendingFiles);
         // set the current file we are streaming so progress shows up in jmx
         session.setCurrentFile(header.file);

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java?rev=1203394&r1=1203393&r2=1203394&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java Thu Nov 17 22:17:34 2011
@@ -22,12 +22,16 @@ package org.apache.cassandra.streaming;
  */
 
 import java.io.*;
+import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class StreamHeader
 {
@@ -54,6 +58,9 @@ public class StreamHeader
     /** files to add to the session */
     public final Collection<PendingFile> pendingFiles;
 
+    /** Address of the sender **/
+    public final InetAddress broadcastAddress;
+
     public StreamHeader(String table, long sessionId, PendingFile file)
     {
         this(table, sessionId, file, Collections.<PendingFile>emptyList());
@@ -61,10 +68,16 @@ public class StreamHeader
 
     public StreamHeader(String table, long sessionId, PendingFile first, Collection<PendingFile> pendingFiles)
     {
+        this(table, sessionId, first, pendingFiles, FBUtilities.getBroadcastAddress());
+    }
+    
+    public StreamHeader(String table, long sessionId, PendingFile first, Collection<PendingFile> pendingFiles, InetAddress broadcastAddress)
+    {
         this.table = table;
         this.sessionId  = sessionId;
         this.file = first;
         this.pendingFiles = pendingFiles;
+        this.broadcastAddress = broadcastAddress;
     }
 
     private static class StreamHeaderSerializer implements IVersionedSerializer<StreamHeader>
@@ -79,6 +92,7 @@ public class StreamHeader
             {
                 PendingFile.serializer().serialize(file, dos, version);
             }
+            CompactEndpointSerializationHelper.serialize(sh.broadcastAddress, dos);
         }
 
         public StreamHeader deserialize(DataInput dis, int version) throws IOException
@@ -93,8 +107,10 @@ public class StreamHeader
             {
                 pendingFiles.add(PendingFile.serializer().deserialize(dis, version));
             }
-
-            return new StreamHeader(table, sessionId, file, pendingFiles);
+            InetAddress bca = null;
+            if (version > MessagingService.VERSION_10)
+                bca = CompactEndpointSerializationHelper.deserialize(dis);
+            return new StreamHeader(table, sessionId, file, pendingFiles, bca);
         }
 
         public long serializedSize(StreamHeader streamHeader, int version)