You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/01/25 20:12:05 UTC

[08/19] cassandra git commit: Allow storage port to be configurable per node

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 48e1b2f..29b45b4 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -37,7 +37,8 @@ public interface StorageServiceMBean extends NotificationEmitter
      *
      * @return set of IP addresses, as Strings
      */
-    public List<String> getLiveNodes();
+    @Deprecated public List<String> getLiveNodes();
+    public List<String> getLiveNodesWithPort();
 
     /**
      * Retrieve the list of unreachable nodes in the cluster, as determined
@@ -45,28 +46,32 @@ public interface StorageServiceMBean extends NotificationEmitter
      *
      * @return set of IP addresses, as Strings
      */
-    public List<String> getUnreachableNodes();
+    @Deprecated public List<String> getUnreachableNodes();
+    public List<String> getUnreachableNodesWithPort();
 
     /**
      * Retrieve the list of nodes currently bootstrapping into the ring.
      *
      * @return set of IP addresses, as Strings
      */
-    public List<String> getJoiningNodes();
+    @Deprecated public List<String> getJoiningNodes();
+    public List<String> getJoiningNodesWithPort();
 
     /**
      * Retrieve the list of nodes currently leaving the ring.
      *
      * @return set of IP addresses, as Strings
      */
-    public List<String> getLeavingNodes();
+    @Deprecated public List<String> getLeavingNodes();
+    public List<String> getLeavingNodesWithPort();
 
     /**
      * Retrieve the list of nodes currently moving in the ring.
      *
      * @return set of IP addresses, as Strings
      */
-    public List<String> getMovingNodes();
+    @Deprecated public List<String> getMovingNodes();
+    public List<String> getMovingNodesWithPort();
 
     /**
      * Fetch string representations of the tokens for this node.
@@ -120,7 +125,8 @@ public interface StorageServiceMBean extends NotificationEmitter
      *
      * @return mapping of ranges to end points
      */
-    public Map<List<String>, List<String>> getRangeToEndpointMap(String keyspace);
+    @Deprecated public Map<List<String>, List<String>> getRangeToEndpointMap(String keyspace);
+    public Map<List<String>, List<String>> getRangeToEndpointWithPortMap(String keyspace);
 
     /**
      * Retrieve a map of range to rpc addresses that describe the ring topology
@@ -128,7 +134,8 @@ public interface StorageServiceMBean extends NotificationEmitter
      *
      * @return mapping of ranges to rpc addresses
      */
-    public Map<List<String>, List<String>> getRangeToRpcaddressMap(String keyspace);
+    @Deprecated public Map<List<String>, List<String>> getRangeToRpcaddressMap(String keyspace);
+    public Map<List<String>, List<String>> getRangeToNativeaddressWithPortMap(String keyspace);
 
     /**
      * The same as {@code describeRing(String)} but converts TokenRange to the String for JMX compatibility
@@ -137,14 +144,16 @@ public interface StorageServiceMBean extends NotificationEmitter
      *
      * @return a List of TokenRange(s) converted to String for the given keyspace
      */
-    public List <String> describeRingJMX(String keyspace) throws IOException;
+    @Deprecated public List <String> describeRingJMX(String keyspace) throws IOException;
+    public List<String> describeRingWithPortJMX(String keyspace) throws IOException;
 
     /**
      * Retrieve a map of pending ranges to endpoints that describe the ring topology
      * @param keyspace the keyspace to get the pending range map for.
      * @return a map of pending ranges to endpoints
      */
-    public Map<List<String>, List<String>> getPendingRangeToEndpointMap(String keyspace);
+    @Deprecated public Map<List<String>, List<String>> getPendingRangeToEndpointMap(String keyspace);
+    public Map<List<String>, List<String>> getPendingRangeToEndpointWithPortMap(String keyspace);
 
     /**
      * Retrieve a map of tokens to endpoints, including the bootstrapping
@@ -152,7 +161,8 @@ public interface StorageServiceMBean extends NotificationEmitter
      *
      * @return a map of tokens to endpoints in ascending order
      */
-    public Map<String, String> getTokenToEndpointMap();
+    @Deprecated public Map<String, String> getTokenToEndpointMap();
+    public Map<String, String> getTokenToEndpointWithPortMap();
 
     /** Retrieve this hosts unique ID */
     public String getLocalHostId();
@@ -162,16 +172,19 @@ public interface StorageServiceMBean extends NotificationEmitter
     public Map<String, String> getHostIdMap();
 
     /** Retrieve the mapping of endpoint to host ID */
-    public Map<String, String> getEndpointToHostId();
+    @Deprecated public Map<String, String> getEndpointToHostId();
+    public Map<String, String> getEndpointWithPortToHostId();
 
     /** Retrieve the mapping of host ID to endpoint */
-    public Map<String, String> getHostIdToEndpoint();
+    @Deprecated public Map<String, String> getHostIdToEndpoint();
+    public Map<String, String> getHostIdToEndpointWithPort();
 
     /** Human-readable load value */
     public String getLoadString();
 
     /** Human-readable load value.  Keys are IP addresses. */
-    public Map<String, String> getLoadMap();
+    @Deprecated public Map<String, String> getLoadMap();
+    public Map<String, String> getLoadMapWithPort();
 
     /**
      * Return the generation value for this node.
@@ -189,8 +202,10 @@ public interface StorageServiceMBean extends NotificationEmitter
      * @param key - key for which we need to find the endpoint return value -
      * the endpoint responsible for this key
      */
-    public List<InetAddress> getNaturalEndpoints(String keyspaceName, String cf, String key);
-    public List<InetAddress> getNaturalEndpoints(String keyspaceName, ByteBuffer key);
+    @Deprecated public List<InetAddress> getNaturalEndpoints(String keyspaceName, String cf, String key);
+    public List<String> getNaturalEndpointsWithPort(String keyspaceName, String cf, String key);
+    @Deprecated public List<InetAddress> getNaturalEndpoints(String keyspaceName, ByteBuffer key);
+    public List<String> getNaturalEndpointsWithPort(String keysapceName, ByteBuffer key);
 
     /**
      * @deprecated use {@link #takeSnapshot(String tag, Map options, String... entities)} instead.
@@ -353,7 +368,8 @@ public interface StorageServiceMBean extends NotificationEmitter
     /**
      * Get the status of a token removal.
      */
-    public String getRemovalStatus();
+    @Deprecated public String getRemovalStatus();
+    public String getRemovalStatusWithPort();
 
     /**
      * Force a remove operation to finish.
@@ -408,7 +424,8 @@ public interface StorageServiceMBean extends NotificationEmitter
      * given a list of tokens (representing the nodes in the cluster), returns
      *   a mapping from {@code "token -> %age of cluster owned by that token"}
      */
-    public Map<InetAddress, Float> getOwnership();
+    @Deprecated public Map<InetAddress, Float> getOwnership();
+    public Map<String, Float> getOwnershipWithPort();
 
     /**
      * Effective ownership is % of the data each node owns given the keyspace
@@ -417,7 +434,8 @@ public interface StorageServiceMBean extends NotificationEmitter
      * in the cluster have the same replication strategies and if yes then we will
      * use the first else a empty Map is returned.
      */
-    public Map<InetAddress, Float> effectiveOwnership(String keyspace) throws IllegalStateException;
+    @Deprecated public Map<InetAddress, Float> effectiveOwnership(String keyspace) throws IllegalStateException;
+    public Map<String, Float> effectiveOwnershipWithPort(String keyspace) throws IllegalStateException;
 
     public List<String> getKeyspaces();
 
@@ -425,7 +443,8 @@ public interface StorageServiceMBean extends NotificationEmitter
 
     public List<String> getNonLocalStrategyKeyspaces();
 
-    public Map<String, String> getViewBuildStatuses(String keyspace, String view);
+    @Deprecated public Map<String, String> getViewBuildStatuses(String keyspace, String view);
+    public Map<String, String> getViewBuildStatusesWithPort(String keyspace, String view);
 
     /**
      * Change endpointsnitch class and dynamic-ness (and dynamic attributes) at runtime.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/TokenRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/TokenRange.java b/src/java/org/apache/cassandra/service/TokenRange.java
index 0e46910..a1f9aee 100644
--- a/src/java/org/apache/cassandra/service/TokenRange.java
+++ b/src/java/org/apache/cassandra/service/TokenRange.java
@@ -17,13 +17,13 @@
  */
 package org.apache.cassandra.service;
 
-import java.net.InetAddress;
 import java.util.*;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
 
 /**
  * Holds token range informations for the sake of {@link StorageService#describeRing}.
@@ -54,13 +54,13 @@ public class TokenRange
         return tokenFactory.toString(tk);
     }
 
-    public static TokenRange create(Token.TokenFactory tokenFactory, Range<Token> range, List<InetAddress> endpoints)
+    public static TokenRange create(Token.TokenFactory tokenFactory, Range<Token> range, List<InetAddressAndPort> endpoints, boolean withPorts)
     {
         List<EndpointDetails> details = new ArrayList<>(endpoints.size());
         IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-        for (InetAddress ep : endpoints)
+        for (InetAddressAndPort ep : endpoints)
             details.add(new EndpointDetails(ep,
-                                            StorageService.instance.getRpcaddress(ep),
+                                            StorageService.instance.getNativeaddress(ep, withPorts),
                                             snitch.getDatacenter(ep),
                                             snitch.getRack(ep)));
         return new TokenRange(tokenFactory, range, details);
@@ -69,6 +69,11 @@ public class TokenRange
     @Override
     public String toString()
     {
+        return toString(false);
+    }
+
+    public String toString(boolean withPorts)
+    {
         StringBuilder sb = new StringBuilder("TokenRange(");
 
         sb.append("start_token:").append(toStr(range.left));
@@ -76,33 +81,43 @@ public class TokenRange
 
         List<String> hosts = new ArrayList<>(endpoints.size());
         List<String> rpcs = new ArrayList<>(endpoints.size());
+        List<String> endpointDetails = new ArrayList<>(endpoints.size());
         for (EndpointDetails ep : endpoints)
         {
-            hosts.add(ep.host.getHostAddress());
-            rpcs.add(ep.rpcAddress);
+            hosts.add(ep.host.getHostAddress(withPorts));
+            rpcs.add(ep.nativeAddress);
+            endpointDetails.add(ep.toString(withPorts));
         }
 
-        sb.append("endpoints:").append(hosts);
-        sb.append("rpc_endpoints:").append(rpcs);
-        sb.append("endpoint_details:").append(endpoints);
-
+        if (withPorts)
+        {
+            sb.append(", endpoints:").append(hosts);
+            sb.append(", rpc_endpoints:").append(rpcs);
+            sb.append(", endpoint_details:").append(endpointDetails);
+        }
+        else
+        {
+            sb.append("endpoints:").append(hosts);
+            sb.append("rpc_endpoints:").append(rpcs);
+            sb.append("endpoint_details:").append(endpointDetails);
+        }
         sb.append(")");
         return sb.toString();
     }
 
     public static class EndpointDetails
     {
-        public final InetAddress host;
-        public final String rpcAddress;
+        public final InetAddressAndPort host;
+        public final String nativeAddress;
         public final String datacenter;
         public final String rack;
 
-        private EndpointDetails(InetAddress host, String rpcAddress, String datacenter, String rack)
+        private EndpointDetails(InetAddressAndPort host, String nativeAddress, String datacenter, String rack)
         {
             // dc and rack can be null, but host shouldn't
             assert host != null;
             this.host = host;
-            this.rpcAddress = rpcAddress;
+            this.nativeAddress = nativeAddress;
             this.datacenter = datacenter;
             this.rack = rack;
         }
@@ -110,10 +125,15 @@ public class TokenRange
         @Override
         public String toString()
         {
+            return toString(false);
+        }
+
+        public String toString(boolean withPorts)
+        {
             // Format matters for backward compatibility with describeRing()
             String dcStr = datacenter == null ? "" : String.format(", datacenter:%s", datacenter);
             String rackStr = rack == null ? "" : String.format(", rack:%s", rack);
-            return String.format("EndpointDetails(host:%s%s%s)", host.getHostAddress(), dcStr, rackStr);
+            return String.format("EndpointDetails(host:%s%s%s)", host.getHostAddress(withPorts), dcStr, rackStr);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/WriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
index 55ca5aa..65efeff 100644
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.service;
 
-import java.net.InetAddress;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -27,6 +26,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.WriteType;
@@ -42,8 +42,8 @@ public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T>
     private static final AtomicIntegerFieldUpdater<WriteResponseHandler> responsesUpdater
             = AtomicIntegerFieldUpdater.newUpdater(WriteResponseHandler.class, "responses");
 
-    public WriteResponseHandler(Collection<InetAddress> writeEndpoints,
-                                Collection<InetAddress> pendingEndpoints,
+    public WriteResponseHandler(Collection<InetAddressAndPort> writeEndpoints,
+                                Collection<InetAddressAndPort> pendingEndpoints,
                                 ConsistencyLevel consistencyLevel,
                                 Keyspace keyspace,
                                 Runnable callback,
@@ -54,12 +54,12 @@ public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T>
         responses = totalBlockFor();
     }
 
-    public WriteResponseHandler(InetAddress endpoint, WriteType writeType, Runnable callback, long queryStartNanoTime)
+    public WriteResponseHandler(InetAddressAndPort endpoint, WriteType writeType, Runnable callback, long queryStartNanoTime)
     {
-        this(Arrays.asList(endpoint), Collections.<InetAddress>emptyList(), ConsistencyLevel.ONE, null, callback, writeType, queryStartNanoTime);
+        this(Arrays.asList(endpoint), Collections.<InetAddressAndPort>emptyList(), ConsistencyLevel.ONE, null, callback, writeType, queryStartNanoTime);
     }
 
-    public WriteResponseHandler(InetAddress endpoint, WriteType writeType, long queryStartNanoTime)
+    public WriteResponseHandler(InetAddressAndPort endpoint, WriteType writeType, long queryStartNanoTime)
     {
         this(endpoint, writeType, null, queryStartNanoTime);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
index 381c498..ed70e96 100644
--- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
@@ -21,7 +21,6 @@ package org.apache.cassandra.service.paxos;
  */
 
 
-import java.net.InetAddress;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -29,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.DecoratedKey;
@@ -48,7 +48,7 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
     public Commit mostRecentInProgressCommit;
     public Commit mostRecentInProgressCommitWithUpdate;
 
-    private final Map<InetAddress, Commit> commitsByReplica = new ConcurrentHashMap<InetAddress, Commit>();
+    private final Map<InetAddressAndPort, Commit> commitsByReplica = new ConcurrentHashMap<>();
 
     public PrepareCallback(DecoratedKey key, TableMetadata metadata, int targets, ConsistencyLevel consistency, long queryStartNanoTime)
     {
@@ -90,7 +90,7 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
         latch.countDown();
     }
 
-    public Iterable<InetAddress> replicasMissingMostRecentCommit(TableMetadata metadata, int nowInSec)
+    public Iterable<InetAddressAndPort> replicasMissingMostRecentCommit(TableMetadata metadata, int nowInSec)
     {
         // In general, we need every replicas that have answered to the prepare (a quorum) to agree on the MRC (see
         // coment in StorageProxy.beginAndRepairPaxos(), but basically we need to make sure at least a quorum of nodes
@@ -105,9 +105,9 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
         if (UUIDGen.unixTimestampInSec(mostRecentCommit.ballot) + paxosTtlSec < nowInSec)
             return Collections.emptySet();
 
-        return Iterables.filter(commitsByReplica.keySet(), new Predicate<InetAddress>()
+        return Iterables.filter(commitsByReplica.keySet(), new Predicate<InetAddressAndPort>()
         {
-            public boolean apply(InetAddress inetAddress)
+            public boolean apply(InetAddressAndPort inetAddress)
             {
                 return (!commitsByReplica.get(inetAddress).ballot.equals(mostRecentCommit.ballot));
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/ProgressInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ProgressInfo.java b/src/java/org/apache/cassandra/streaming/ProgressInfo.java
index fdd3e97..2334599 100644
--- a/src/java/org/apache/cassandra/streaming/ProgressInfo.java
+++ b/src/java/org/apache/cassandra/streaming/ProgressInfo.java
@@ -18,10 +18,11 @@
 package org.apache.cassandra.streaming;
 
 import java.io.Serializable;
-import java.net.InetAddress;
 
 import com.google.common.base.Objects;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
+
 /**
  * ProgressInfo contains file transfer progress.
  */
@@ -48,14 +49,14 @@ public class ProgressInfo implements Serializable
         }
     }
 
-    public final InetAddress peer;
+    public final InetAddressAndPort peer;
     public final int sessionIndex;
     public final String fileName;
     public final Direction direction;
     public final long currentBytes;
     public final long totalBytes;
 
-    public ProgressInfo(InetAddress peer, int sessionIndex, String fileName, Direction direction, long currentBytes, long totalBytes)
+    public ProgressInfo(InetAddressAndPort peer, int sessionIndex, String fileName, Direction direction, long currentBytes, long totalBytes)
     {
         assert totalBytes > 0;
 
@@ -102,13 +103,18 @@ public class ProgressInfo implements Serializable
     @Override
     public String toString()
     {
+        return toString(false);
+    }
+
+    public String toString(boolean withPorts)
+    {
         StringBuilder sb = new StringBuilder(fileName);
         sb.append(" ").append(currentBytes);
         sb.append("/").append(totalBytes).append(" bytes");
         sb.append("(").append(currentBytes*100/totalBytes).append("%) ");
         sb.append(direction == Direction.OUT ? "sent to " : "received from ");
         sb.append("idx:").append(sessionIndex);
-        sb.append(peer);
+        sb.append(peer.toString(withPorts));
         return sb.toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/SessionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/SessionInfo.java b/src/java/org/apache/cassandra/streaming/SessionInfo.java
index 1521614..bbca753 100644
--- a/src/java/org/apache/cassandra/streaming/SessionInfo.java
+++ b/src/java/org/apache/cassandra/streaming/SessionInfo.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.streaming;
 
 import java.io.Serializable;
-import java.net.InetAddress;
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -27,6 +26,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -34,9 +34,9 @@ import org.apache.cassandra.utils.FBUtilities;
  */
 public final class SessionInfo implements Serializable
 {
-    public final InetAddress peer;
+    public final InetAddressAndPort peer;
     public final int sessionIndex;
-    public final InetAddress connecting;
+    public final InetAddressAndPort connecting;
     /** Immutable collection of receiving summaries */
     public final Collection<StreamSummary> receivingSummaries;
     /** Immutable collection of sending summaries*/
@@ -47,9 +47,9 @@ public final class SessionInfo implements Serializable
     private final Map<String, ProgressInfo> receivingFiles;
     private final Map<String, ProgressInfo> sendingFiles;
 
-    public SessionInfo(InetAddress peer,
+    public SessionInfo(InetAddressAndPort peer,
                        int sessionIndex,
-                       InetAddress connecting,
+                       InetAddressAndPort connecting,
                        Collection<StreamSummary> receivingSummaries,
                        Collection<StreamSummary> sendingSummaries,
                        StreamSession.State state)
@@ -195,6 +195,6 @@ public final class SessionInfo implements Serializable
 
     public SessionSummary createSummary()
     {
-        return new SessionSummary(FBUtilities.getBroadcastAddress(), peer, receivingSummaries, sendingSummaries);
+        return new SessionSummary(FBUtilities.getBroadcastAddressAndPort(), peer, receivingSummaries, sendingSummaries);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/SessionSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/SessionSummary.java b/src/java/org/apache/cassandra/streaming/SessionSummary.java
index d52c2ca..cf63a57 100644
--- a/src/java/org/apache/cassandra/streaming/SessionSummary.java
+++ b/src/java/org/apache/cassandra/streaming/SessionSummary.java
@@ -19,7 +19,6 @@
 package org.apache.cassandra.streaming;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -28,19 +27,19 @@ import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.serializers.InetAddressSerializer;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 
 public class SessionSummary
 {
-    public final InetAddress coordinator;
-    public final InetAddress peer;
+    public final InetAddressAndPort coordinator;
+    public final InetAddressAndPort peer;
     /** Immutable collection of receiving summaries */
     public final Collection<StreamSummary> receivingSummaries;
     /** Immutable collection of sending summaries*/
     public final Collection<StreamSummary> sendingSummaries;
 
-    public SessionSummary(InetAddress coordinator, InetAddress peer,
+    public SessionSummary(InetAddressAndPort coordinator, InetAddressAndPort peer,
                           Collection<StreamSummary> receivingSummaries,
                           Collection<StreamSummary> sendingSummaries)
     {
@@ -81,8 +80,8 @@ public class SessionSummary
     {
         public void serialize(SessionSummary summary, DataOutputPlus out, int version) throws IOException
         {
-            ByteBufferUtil.writeWithLength(InetAddressSerializer.instance.serialize(summary.coordinator), out);
-            ByteBufferUtil.writeWithLength(InetAddressSerializer.instance.serialize(summary.peer), out);
+            CompactEndpointSerializationHelper.instance.serialize(summary.coordinator, out, version);
+            CompactEndpointSerializationHelper.instance.serialize(summary.peer, out, version);
 
             out.writeInt(summary.receivingSummaries.size());
             for (StreamSummary streamSummary: summary.receivingSummaries)
@@ -99,8 +98,8 @@ public class SessionSummary
 
         public SessionSummary deserialize(DataInputPlus in, int version) throws IOException
         {
-            InetAddress coordinator = InetAddressSerializer.instance.deserialize(ByteBufferUtil.readWithLength(in));
-            InetAddress peer = InetAddressSerializer.instance.deserialize(ByteBufferUtil.readWithLength(in));
+            InetAddressAndPort coordinator = CompactEndpointSerializationHelper.instance.deserialize(in, version);
+            InetAddressAndPort peer = CompactEndpointSerializationHelper.instance.deserialize(in, version);
 
             int numRcvd = in.readInt();
             List<StreamSummary> receivingSummaries = new ArrayList<>(numRcvd);
@@ -122,8 +121,8 @@ public class SessionSummary
         public long serializedSize(SessionSummary summary, int version)
         {
             long size = 0;
-            size += ByteBufferUtil.serializedSizeWithLength(InetAddressSerializer.instance.serialize(summary.coordinator));
-            size += ByteBufferUtil.serializedSizeWithLength(InetAddressSerializer.instance.serialize(summary.peer));
+            size += CompactEndpointSerializationHelper.instance.serializedSize(summary.coordinator, version);
+            size += CompactEndpointSerializationHelper.instance.serializedSize(summary.peer, version);
 
             size += TypeSizes.sizeof(summary.receivingSummaries.size());
             for (StreamSummary streamSummary: summary.receivingSummaries)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index bb8c702..a22e07d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@ -17,13 +17,13 @@
  */
 package org.apache.cassandra.streaming;
 
-import java.net.InetAddress;
 import java.util.*;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -45,7 +45,7 @@ public class StreamCoordinator
                                                                                                                             FBUtilities.getAvailableProcessors());
     private final boolean connectSequentially;
 
-    private Map<InetAddress, HostStreamingData> peerSessions = new HashMap<>();
+    private Map<InetAddressAndPort, HostStreamingData> peerSessions = new HashMap<>();
     private final int connectionsPerHost;
     private StreamConnectionFactory factory;
     private final boolean keepSSTableLevel;
@@ -143,29 +143,29 @@ public class StreamCoordinator
         if (sessionsToConnect.hasNext())
         {
             StreamSession next = sessionsToConnect.next();
-            logger.debug("Connecting next session {} with {}.", next.planId(), next.peer.getHostAddress());
+            logger.debug("Connecting next session {} with {}.", next.planId(), next.peer.toString());
             streamExecutor.execute(new StreamSessionConnector(next));
         }
         else
             logger.debug("Finished connecting all sessions");
     }
 
-    public synchronized Set<InetAddress> getPeers()
+    public synchronized Set<InetAddressAndPort> getPeers()
     {
         return new HashSet<>(peerSessions.keySet());
     }
 
-    public synchronized StreamSession getOrCreateNextSession(InetAddress peer, InetAddress connecting)
+    public synchronized StreamSession getOrCreateNextSession(InetAddressAndPort peer, InetAddressAndPort connecting)
     {
         return getOrCreateHostData(peer).getOrCreateNextSession(peer, connecting);
     }
 
-    public synchronized StreamSession getOrCreateSessionById(InetAddress peer, int id, InetAddress connecting)
+    public synchronized StreamSession getOrCreateSessionById(InetAddressAndPort peer, int id, InetAddressAndPort connecting)
     {
         return getOrCreateHostData(peer).getOrCreateSessionById(peer, id, connecting);
     }
 
-    public StreamSession getSessionById(InetAddress peer, int id)
+    public StreamSession getSessionById(InetAddressAndPort peer, int id)
     {
         return getHostData(peer).getSessionById(id);
     }
@@ -191,7 +191,7 @@ public class StreamCoordinator
         return result;
     }
 
-    public synchronized void transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)
+    public synchronized void transferFiles(InetAddressAndPort to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)
     {
         HostStreamingData sessionList = getOrCreateHostData(to);
 
@@ -239,7 +239,7 @@ public class StreamCoordinator
         return result;
     }
 
-    private HostStreamingData getHostData(InetAddress peer)
+    private HostStreamingData getHostData(InetAddressAndPort peer)
     {
         HostStreamingData data = peerSessions.get(peer);
         if (data == null)
@@ -247,7 +247,7 @@ public class StreamCoordinator
         return data;
     }
 
-    private HostStreamingData getOrCreateHostData(InetAddress peer)
+    private HostStreamingData getOrCreateHostData(InetAddressAndPort peer)
     {
         HostStreamingData data = peerSessions.get(peer);
         if (data == null)
@@ -297,7 +297,7 @@ public class StreamCoordinator
             return false;
         }
 
-        public StreamSession getOrCreateNextSession(InetAddress peer, InetAddress connecting)
+        public StreamSession getOrCreateNextSession(InetAddressAndPort peer, InetAddressAndPort connecting)
         {
             // create
             if (streamSessions.size() < connectionsPerHost)
@@ -329,7 +329,7 @@ public class StreamCoordinator
             return Collections.unmodifiableCollection(streamSessions.values());
         }
 
-        public StreamSession getOrCreateSessionById(InetAddress peer, int id, InetAddress connecting)
+        public StreamSession getOrCreateSessionById(InetAddressAndPort peer, int id, InetAddressAndPort connecting)
         {
             StreamSession session = streamSessions.get(id);
             if (session == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/StreamEvent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamEvent.java b/src/java/org/apache/cassandra/streaming/StreamEvent.java
index 6ea2814..7ecd081 100644
--- a/src/java/org/apache/cassandra/streaming/StreamEvent.java
+++ b/src/java/org/apache/cassandra/streaming/StreamEvent.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.streaming;
 
-import java.net.InetAddress;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
@@ -27,6 +26,7 @@ import com.google.common.collect.ImmutableSet;
 
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
 
 public abstract class StreamEvent
 {
@@ -48,7 +48,7 @@ public abstract class StreamEvent
 
     public static class SessionCompleteEvent extends StreamEvent
     {
-        public final InetAddress peer;
+        public final InetAddressAndPort peer;
         public final boolean success;
         public final int sessionIndex;
         public final Set<StreamRequest> requests;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/StreamManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java b/src/java/org/apache/cassandra/streaming/StreamManager.java
index a44f02e..81c65c5 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManager.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManager.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.streaming;
 
-import java.net.InetAddress;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -35,6 +34,7 @@ import com.google.common.util.concurrent.RateLimiter;
 
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.streaming.management.StreamEventJMXNotifier;
 import org.apache.cassandra.streaming.management.StreamStateCompositeData;
 
@@ -55,7 +55,7 @@ public class StreamManager implements StreamManagerMBean
      *
      * @return StreamRateLimiter with rate limit set based on peer location.
      */
-    public static StreamRateLimiter getRateLimiter(InetAddress peer)
+    public static StreamRateLimiter getRateLimiter(InetAddressAndPort peer)
     {
         return new StreamRateLimiter(peer);
     }
@@ -67,7 +67,7 @@ public class StreamManager implements StreamManagerMBean
         private static final RateLimiter interDCLimiter = RateLimiter.create(Double.MAX_VALUE);
         private final boolean isLocalDC;
 
-        public StreamRateLimiter(InetAddress peer)
+        public StreamRateLimiter(InetAddressAndPort peer)
         {
             double throughput = DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() * BYTES_PER_MEGABIT;
             mayUpdateThroughput(throughput, limiter);
@@ -176,7 +176,7 @@ public class StreamManager implements StreamManagerMBean
         return notifier.getNotificationInfo();
     }
 
-    public StreamSession findSession(InetAddress peer, UUID planId, int sessionIndex)
+    public StreamSession findSession(InetAddressAndPort peer, UUID planId, int sessionIndex)
     {
         StreamSession session = findSession(initiatedStreams, peer, planId, sessionIndex);
         if (session !=  null)
@@ -185,7 +185,7 @@ public class StreamManager implements StreamManagerMBean
         return findSession(receivingStreams, peer, planId, sessionIndex);
     }
 
-    private StreamSession findSession(Map<UUID, StreamResultFuture> streams, InetAddress peer, UUID planId, int sessionIndex)
+    private StreamSession findSession(Map<UUID, StreamResultFuture> streams, InetAddressAndPort peer, UUID planId, int sessionIndex)
     {
         StreamResultFuture streamResultFuture = streams.get(planId);
         if (streamResultFuture == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 213f74b..43e9068 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -17,11 +17,11 @@
  */
 package org.apache.cassandra.streaming;
 
-import java.net.InetAddress;
 import java.util.*;
 
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.utils.UUIDGen;
 
 import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
@@ -73,7 +73,7 @@ public class StreamPlan
      * @param ranges ranges to fetch
      * @return this object for chaining
      */
-    public StreamPlan requestRanges(InetAddress from, InetAddress connecting, String keyspace, Collection<Range<Token>> ranges)
+    public StreamPlan requestRanges(InetAddressAndPort from, InetAddressAndPort connecting, String keyspace, Collection<Range<Token>> ranges)
     {
         return requestRanges(from, connecting, keyspace, ranges, EMPTY_COLUMN_FAMILIES);
     }
@@ -88,7 +88,7 @@ public class StreamPlan
      * @param columnFamilies specific column families
      * @return this object for chaining
      */
-    public StreamPlan requestRanges(InetAddress from, InetAddress connecting, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
+    public StreamPlan requestRanges(InetAddressAndPort from, InetAddressAndPort connecting, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
     {
         StreamSession session = coordinator.getOrCreateNextSession(from, connecting);
         session.addStreamRequest(keyspace, ranges, Arrays.asList(columnFamilies));
@@ -98,9 +98,9 @@ public class StreamPlan
     /**
      * Add transfer task to send data of specific {@code columnFamilies} under {@code keyspace} and {@code ranges}.
      *
-     * @see #transferRanges(java.net.InetAddress, java.net.InetAddress, String, java.util.Collection, String...)
+     * @see #transferRanges(InetAddressAndPort, InetAddressAndPort, String, java.util.Collection, String...)
      */
-    public StreamPlan transferRanges(InetAddress to, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
+    public StreamPlan transferRanges(InetAddressAndPort to, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
     {
         return transferRanges(to, to, keyspace, ranges, columnFamilies);
     }
@@ -114,7 +114,7 @@ public class StreamPlan
      * @param ranges ranges to send
      * @return this object for chaining
      */
-    public StreamPlan transferRanges(InetAddress to, InetAddress connecting, String keyspace, Collection<Range<Token>> ranges)
+    public StreamPlan transferRanges(InetAddressAndPort to, InetAddressAndPort connecting, String keyspace, Collection<Range<Token>> ranges)
     {
         return transferRanges(to, connecting, keyspace, ranges, EMPTY_COLUMN_FAMILIES);
     }
@@ -129,7 +129,7 @@ public class StreamPlan
      * @param columnFamilies specific column families
      * @return this object for chaining
      */
-    public StreamPlan transferRanges(InetAddress to, InetAddress connecting, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
+    public StreamPlan transferRanges(InetAddressAndPort to, InetAddressAndPort connecting, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
     {
         StreamSession session = coordinator.getOrCreateNextSession(to, connecting);
         session.addTransferRanges(keyspace, ranges, Arrays.asList(columnFamilies), flushBeforeTransfer);
@@ -144,7 +144,7 @@ public class StreamPlan
      *                       this collection will be modified to remove those files that are successfully handed off
      * @return this object for chaining
      */
-    public StreamPlan transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)
+    public StreamPlan transferFiles(InetAddressAndPort to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)
     {
         coordinator.transferFiles(to, sstableDetails);
         return this;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index 0f74c7f..544f37f 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import io.netty.channel.Channel;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -103,7 +104,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
     public static synchronized StreamResultFuture initReceivingSide(int sessionIndex,
                                                                     UUID planId,
                                                                     StreamOperation streamOperation,
-                                                                    InetAddress from,
+                                                                    InetAddressAndPort from,
                                                                     Channel channel,
                                                                     boolean keepSSTableLevel,
                                                                     UUID pendingRepair,
@@ -135,11 +136,15 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
         return coordinator;
     }
 
-    private void attachConnection(InetAddress from, int sessionIndex, Channel channel)
+    private void attachConnection(InetAddressAndPort from, int sessionIndex, Channel channel)
     {
         SocketAddress addr = channel.remoteAddress();
-        InetAddress connecting = (addr instanceof InetSocketAddress ? ((InetSocketAddress) addr).getAddress() : from);
-        StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex, connecting);
+        //In the case of unit tests, if you use the EmbeddedChannel, channel.remoteAddress()
+        //does not return an InetSocketAddress, but an EmbeddedSocketAddress. Hence why we need the type check here
+        InetAddress connecting = (addr instanceof InetSocketAddress ? ((InetSocketAddress) addr).getAddress() : from.address);
+        //Need to turn connecting into a InetAddressAndPort with the correct port. I think getting the port from "from"
+        //Will work since we don't actually have ports diverge across network interfaces
+        StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex, InetAddressAndPort.getByAddressOverrideDefaults(connecting, from.port));
         session.init(this);
         session.attach(channel);
     }
@@ -228,7 +233,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
         }
     }
 
-    StreamSession getSession(InetAddress peer, int sessionIndex)
+    StreamSession getSession(InetAddressAndPort peer, int sessionIndex)
     {
         return coordinator.getSessionById(peer, sessionIndex);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index b6351f9..4085c43 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -49,6 +49,7 @@ import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.*;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.metrics.StreamingMetrics;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.async.OutboundConnectionIdentifier;
@@ -142,14 +143,14 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     /**
      * Streaming endpoint.
      *
-     * Each {@code StreamSession} is identified by this InetAddress which is broadcast address of the node streaming.
+     * Each {@code StreamSession} is identified by this InetAddressAndPort which is broadcast address of the node streaming.
      */
-    public final InetAddress peer;
+    public final InetAddressAndPort peer;
 
     private final int index;
 
     /** Actual connecting address. Can be the same as {@linkplain #peer}. */
-    public final InetAddress connecting;
+    public final InetAddressAndPort connecting;
 
     // should not be null when session is started
     private StreamResultFuture streamResult;
@@ -191,14 +192,14 @@ public class StreamSession implements IEndpointStateChangeSubscriber
      *  @param peer Address of streaming peer
      * @param connecting Actual connecting address
      */
-    public StreamSession(InetAddress peer, InetAddress connecting, StreamConnectionFactory factory, int index, boolean keepSSTableLevel, UUID pendingRepair, PreviewKind previewKind)
+    public StreamSession(InetAddressAndPort peer, InetAddressAndPort connecting, StreamConnectionFactory factory, int index, boolean keepSSTableLevel, UUID pendingRepair, PreviewKind previewKind)
     {
         this.peer = peer;
         this.connecting = connecting;
         this.index = index;
 
-        OutboundConnectionIdentifier id = OutboundConnectionIdentifier.stream(new InetSocketAddress(FBUtilities.getBroadcastAddress(), 0),
-                                                                              new InetSocketAddress(connecting, MessagingService.instance().portFor(connecting)));
+        OutboundConnectionIdentifier id = OutboundConnectionIdentifier.stream(InetAddressAndPort.getByAddressOverrideDefaults(FBUtilities.getBroadcastAddressAndPort().address, 0),
+                                                                              InetAddressAndPort.getByAddressOverrideDefaults(connecting.address, MessagingService.instance().portFor(connecting)));
         this.messageSender = new NettyStreamingMessageSender(this, id, factory, StreamMessage.CURRENT_VERSION, previewKind.isPreview());
         this.metrics = StreamingMetrics.get(connecting);
         this.keepSSTableLevel = keepSSTableLevel;
@@ -607,16 +608,16 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         {
             logger.error("[Stream #{}] Did not receive response from peer {}{} for {} secs. Is peer down? " +
                          "If not, maybe try increasing streaming_keep_alive_period_in_secs.", planId(),
-                         peer.getHostAddress(),
-                         peer.equals(connecting) ? "" : " through " + connecting.getHostAddress(),
+                         peer.getHostAddress(true),
+                         peer.equals(connecting) ? "" : " through " + connecting.getHostAddress(true),
                          2 * DatabaseDescriptor.getStreamingKeepAlivePeriod(),
                          e);
         }
         else
         {
             logger.error("[Stream #{}] Streaming error occurred on session with peer {}{}", planId(),
-                         peer.getHostAddress(),
-                         peer.equals(connecting) ? "" : " through " + connecting.getHostAddress(),
+                         peer.getHostAddress(true),
+                         peer.equals(connecting) ? "" : " through " + connecting.getHostAddress(true),
                          e);
         }
     }
@@ -644,7 +645,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
             prepareReceiving(summary);
 
         PrepareSynAckMessage prepareSynAck = new PrepareSynAckMessage();
-        if (!peer.equals(FBUtilities.getBroadcastAddress()))
+        if (!peer.equals(FBUtilities.getBroadcastAddressAndPort()))
             for (StreamTransferTask task : transfers.values())
                 prepareSynAck.summaries.add(task.getSummary());
         messageSender.sendMessage(prepareSynAck);
@@ -754,7 +755,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
      */
     public synchronized void sessionFailed()
     {
-        logger.error("[Stream #{}] Remote peer {} failed stream session.", planId(), peer.getHostAddress());
+        logger.error("[Stream #{}] Remote peer {} failed stream session.", planId(), peer.toString());
         closeSession(State.FAILED);
     }
 
@@ -784,21 +785,21 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         maybeCompleted();
     }
 
-    public void onJoin(InetAddress endpoint, EndpointState epState) {}
-    public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
-    public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
-    public void onAlive(InetAddress endpoint, EndpointState state) {}
-    public void onDead(InetAddress endpoint, EndpointState state) {}
+    public void onJoin(InetAddressAndPort endpoint, EndpointState epState) {}
+    public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
+    public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) {}
+    public void onAlive(InetAddressAndPort endpoint, EndpointState state) {}
+    public void onDead(InetAddressAndPort endpoint, EndpointState state) {}
 
-    public void onRemove(InetAddress endpoint)
+    public void onRemove(InetAddressAndPort endpoint)
     {
-        logger.error("[Stream #{}] Session failed because remote peer {} has left.", planId(), peer.getHostAddress());
+        logger.error("[Stream #{}] Session failed because remote peer {} has left.", planId(), peer.toString());
         closeSession(State.FAILED);
     }
 
-    public void onRestart(InetAddress endpoint, EndpointState epState)
+    public void onRestart(InetAddressAndPort endpoint, EndpointState epState)
     {
-        logger.error("[Stream #{}] Session failed because remote peer {} was restarted.", planId(), peer.getHostAddress());
+        logger.error("[Stream #{}] Session failed because remote peer {} was restarted.", planId(), peer.toString());
         closeSession(State.FAILED);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
index 0b38760..20b7c87 100644
--- a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
+++ b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
@@ -137,7 +137,7 @@ public class NettyStreamingMessageSender implements StreamingMessageSender
     @Override
     public void initialize() throws IOException
     {
-        StreamInitMessage message = new StreamInitMessage(FBUtilities.getBroadcastAddress(),
+        StreamInitMessage message = new StreamInitMessage(FBUtilities.getBroadcastAddressAndPort(),
                                                           session.sessionIndex(),
                                                           session.planId(),
                                                           session.streamOperation(),
@@ -183,7 +183,7 @@ public class NettyStreamingMessageSender implements StreamingMessageSender
     {
         Channel channel = factory.createConnection(connectionId, protocolVersion);
         ChannelPipeline pipeline = channel.pipeline();
-        pipeline.addLast(NettyFactory.instance.streamingGroup, NettyFactory.INBOUND_STREAM_HANDLER_NAME, new StreamingInboundHandler(connectionId.remoteAddress(), protocolVersion, session));
+        pipeline.addLast(NettyFactory.instance.streamingGroup, NettyFactory.INBOUND_STREAM_HANDLER_NAME, new StreamingInboundHandler(connectionId.remote(), protocolVersion, session));
         channel.attr(TRANSFERRING_FILE_ATTR).set(Boolean.FALSE);
         return channel;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
index cc6f9e0..907572b 100644
--- a/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
+++ b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
@@ -21,7 +21,6 @@ package org.apache.cassandra.streaming.async;
 import java.io.EOFException;
 import java.io.IOException;
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
@@ -38,6 +37,7 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.util.ReferenceCountUtil;
 import io.netty.util.concurrent.FastThreadLocalThread;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus;
 import org.apache.cassandra.streaming.StreamManager;
 import org.apache.cassandra.streaming.StreamReceiveException;
@@ -65,7 +65,7 @@ public class StreamingInboundHandler extends ChannelInboundHandlerAdapter
     private static final int AUTO_READ_LOW_WATER_MARK = 1 << 15;
     private static final int AUTO_READ_HIGH_WATER_MARK = 1 << 16;
 
-    private final InetSocketAddress remoteAddress;
+    private final InetAddressAndPort remoteAddress;
     private final int protocolVersion;
 
     private final StreamSession session;
@@ -82,7 +82,7 @@ public class StreamingInboundHandler extends ChannelInboundHandlerAdapter
 
     private volatile boolean closed;
 
-    public StreamingInboundHandler(InetSocketAddress remoteAddress, int protocolVersion, @Nullable StreamSession session)
+    public StreamingInboundHandler(InetAddressAndPort remoteAddress, int protocolVersion, @Nullable StreamSession session)
     {
         this.remoteAddress = remoteAddress;
         this.protocolVersion = protocolVersion;
@@ -254,11 +254,11 @@ public class StreamingInboundHandler extends ChannelInboundHandlerAdapter
      */
     static class SessionIdentifier
     {
-        final InetAddress from;
+        final InetAddressAndPort from;
         final UUID planId;
         final int sessionIndex;
 
-        SessionIdentifier(InetAddress from, UUID planId, int sessionIndex)
+        SessionIdentifier(InetAddressAndPort from, UUID planId, int sessionIndex)
         {
             this.from = from;
             this.planId = planId;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
index b9e6951..964fe10 100644
--- a/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
+++ b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.streaming.management;
 
-import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.Map;
@@ -26,12 +25,14 @@ import javax.management.openmbean.*;
 
 import com.google.common.base.Throwables;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.streaming.ProgressInfo;
 
 public class ProgressInfoCompositeData
 {
     private static final String[] ITEM_NAMES = new String[]{"planId",
                                                             "peer",
+                                                            "peer storage port",
                                                             "sessionIndex",
                                                             "fileName",
                                                             "direction",
@@ -39,6 +40,7 @@ public class ProgressInfoCompositeData
                                                             "totalBytes"};
     private static final String[] ITEM_DESCS = new String[]{"String representation of Plan ID",
                                                             "Session peer",
+                                                            "Session peer storage port",
                                                             "Index of session",
                                                             "Name of the file",
                                                             "Direction('IN' or 'OUT')",
@@ -47,6 +49,7 @@ public class ProgressInfoCompositeData
     private static final OpenType<?>[] ITEM_TYPES = new OpenType[]{SimpleType.STRING,
                                                                    SimpleType.STRING,
                                                                    SimpleType.INTEGER,
+                                                                   SimpleType.INTEGER,
                                                                    SimpleType.STRING,
                                                                    SimpleType.STRING,
                                                                    SimpleType.LONG,
@@ -73,12 +76,13 @@ public class ProgressInfoCompositeData
     {
         Map<String, Object> valueMap = new HashMap<>();
         valueMap.put(ITEM_NAMES[0], planId.toString());
-        valueMap.put(ITEM_NAMES[1], progressInfo.peer.getHostAddress());
-        valueMap.put(ITEM_NAMES[2], progressInfo.sessionIndex);
-        valueMap.put(ITEM_NAMES[3], progressInfo.fileName);
-        valueMap.put(ITEM_NAMES[4], progressInfo.direction.name());
-        valueMap.put(ITEM_NAMES[5], progressInfo.currentBytes);
-        valueMap.put(ITEM_NAMES[6], progressInfo.totalBytes);
+        valueMap.put(ITEM_NAMES[1], progressInfo.peer.address.getHostAddress());
+        valueMap.put(ITEM_NAMES[2], progressInfo.peer.port);
+        valueMap.put(ITEM_NAMES[3], progressInfo.sessionIndex);
+        valueMap.put(ITEM_NAMES[4], progressInfo.fileName);
+        valueMap.put(ITEM_NAMES[5], progressInfo.direction.name());
+        valueMap.put(ITEM_NAMES[6], progressInfo.currentBytes);
+        valueMap.put(ITEM_NAMES[7], progressInfo.totalBytes);
         try
         {
             return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
@@ -94,12 +98,12 @@ public class ProgressInfoCompositeData
         Object[] values = cd.getAll(ITEM_NAMES);
         try
         {
-            return new ProgressInfo(InetAddress.getByName((String) values[1]),
-                                    (int) values[2],
-                                    (String) values[3],
-                                    ProgressInfo.Direction.valueOf((String)values[4]),
-                                    (long) values[5],
-                                    (long) values[6]);
+            return new ProgressInfo(InetAddressAndPort.getByNameOverrideDefaults((String) values[1], (Integer)values[2]),
+                                    (int) values[3],
+                                    (String) values[4],
+                                    ProgressInfo.Direction.valueOf((String)values[5]),
+                                    (long) values[6],
+                                    (long) values[7]);
         }
         catch (UnknownHostException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/management/SessionCompleteEventCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/SessionCompleteEventCompositeData.java b/src/java/org/apache/cassandra/streaming/management/SessionCompleteEventCompositeData.java
index 516582a..1c0d8c5 100644
--- a/src/java/org/apache/cassandra/streaming/management/SessionCompleteEventCompositeData.java
+++ b/src/java/org/apache/cassandra/streaming/management/SessionCompleteEventCompositeData.java
@@ -29,12 +29,15 @@ public class SessionCompleteEventCompositeData
 {
     private static final String[] ITEM_NAMES = new String[]{"planId",
                                                             "peer",
+                                                            "peer storage port",
                                                             "success"};
     private static final String[] ITEM_DESCS = new String[]{"Plan ID",
                                                             "Session peer",
+                                                            "Session peer storage port",
                                                             "Indicates whether session was successful"};
     private static final OpenType<?>[] ITEM_TYPES = new OpenType[]{SimpleType.STRING,
                                                                    SimpleType.STRING,
+                                                                   SimpleType.INTEGER,
                                                                    SimpleType.BOOLEAN};
 
     public static final CompositeType COMPOSITE_TYPE;
@@ -58,8 +61,9 @@ public class SessionCompleteEventCompositeData
     {
         Map<String, Object> valueMap = new HashMap<>();
         valueMap.put(ITEM_NAMES[0], event.planId.toString());
-        valueMap.put(ITEM_NAMES[1], event.peer.getHostAddress());
-        valueMap.put(ITEM_NAMES[2], event.success);
+        valueMap.put(ITEM_NAMES[1], event.peer.address.getHostAddress());
+        valueMap.put(ITEM_NAMES[2], event.peer.port);
+        valueMap.put(ITEM_NAMES[3], event.success);
         try
         {
             return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
index a6762a8..d20eaf5 100644
--- a/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
+++ b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.streaming.management;
 
-import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
 import javax.management.openmbean.*;
@@ -27,6 +26,7 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.streaming.ProgressInfo;
 import org.apache.cassandra.streaming.SessionInfo;
 import org.apache.cassandra.streaming.StreamSession;
@@ -36,7 +36,9 @@ public class SessionInfoCompositeData
 {
     private static final String[] ITEM_NAMES = new String[]{"planId",
                                                             "peer",
+                                                            "peer_port",
                                                             "connecting",
+                                                            "connecting_port",
                                                             "receivingSummaries",
                                                             "sendingSummaries",
                                                             "state",
@@ -45,7 +47,9 @@ public class SessionInfoCompositeData
                                                             "sessionIndex"};
     private static final String[] ITEM_DESCS = new String[]{"Plan ID",
                                                             "Session peer",
+                                                            "Session peer storage port",
                                                             "Connecting address",
+                                                            "Connecting storage port",
                                                             "Summaries of receiving data",
                                                             "Summaries of sending data",
                                                             "Current session state",
@@ -61,7 +65,9 @@ public class SessionInfoCompositeData
         {
             ITEM_TYPES = new OpenType[]{SimpleType.STRING,
                                         SimpleType.STRING,
+                                        SimpleType.INTEGER,
                                         SimpleType.STRING,
+                                        SimpleType.INTEGER,
                                         ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE),
                                         ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE),
                                         SimpleType.STRING,
@@ -84,8 +90,10 @@ public class SessionInfoCompositeData
     {
         Map<String, Object> valueMap = new HashMap<>();
         valueMap.put(ITEM_NAMES[0], planId.toString());
-        valueMap.put(ITEM_NAMES[1], sessionInfo.peer.getHostAddress());
-        valueMap.put(ITEM_NAMES[2], sessionInfo.connecting.getHostAddress());
+        valueMap.put(ITEM_NAMES[1], sessionInfo.peer.address.getHostAddress());
+        valueMap.put(ITEM_NAMES[2], sessionInfo.peer.port);
+        valueMap.put(ITEM_NAMES[3], sessionInfo.connecting.address.getHostAddress());
+        valueMap.put(ITEM_NAMES[4], sessionInfo.connecting.port);
         Function<StreamSummary, CompositeData> fromStreamSummary = new Function<StreamSummary, CompositeData>()
         {
             public CompositeData apply(StreamSummary input)
@@ -93,9 +101,9 @@ public class SessionInfoCompositeData
                 return StreamSummaryCompositeData.toCompositeData(input);
             }
         };
-        valueMap.put(ITEM_NAMES[3], toArrayOfCompositeData(sessionInfo.receivingSummaries, fromStreamSummary));
-        valueMap.put(ITEM_NAMES[4], toArrayOfCompositeData(sessionInfo.sendingSummaries, fromStreamSummary));
-        valueMap.put(ITEM_NAMES[5], sessionInfo.state.name());
+        valueMap.put(ITEM_NAMES[5], toArrayOfCompositeData(sessionInfo.receivingSummaries, fromStreamSummary));
+        valueMap.put(ITEM_NAMES[6], toArrayOfCompositeData(sessionInfo.sendingSummaries, fromStreamSummary));
+        valueMap.put(ITEM_NAMES[7], sessionInfo.state.name());
         Function<ProgressInfo, CompositeData> fromProgressInfo = new Function<ProgressInfo, CompositeData>()
         {
             public CompositeData apply(ProgressInfo input)
@@ -103,9 +111,9 @@ public class SessionInfoCompositeData
                 return ProgressInfoCompositeData.toCompositeData(planId, input);
             }
         };
-        valueMap.put(ITEM_NAMES[6], toArrayOfCompositeData(sessionInfo.getReceivingFiles(), fromProgressInfo));
-        valueMap.put(ITEM_NAMES[7], toArrayOfCompositeData(sessionInfo.getSendingFiles(), fromProgressInfo));
-        valueMap.put(ITEM_NAMES[8], sessionInfo.sessionIndex);
+        valueMap.put(ITEM_NAMES[8], toArrayOfCompositeData(sessionInfo.getReceivingFiles(), fromProgressInfo));
+        valueMap.put(ITEM_NAMES[9], toArrayOfCompositeData(sessionInfo.getSendingFiles(), fromProgressInfo));
+        valueMap.put(ITEM_NAMES[10], sessionInfo.sessionIndex);
         try
         {
             return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
@@ -121,11 +129,11 @@ public class SessionInfoCompositeData
         assert cd.getCompositeType().equals(COMPOSITE_TYPE);
 
         Object[] values = cd.getAll(ITEM_NAMES);
-        InetAddress peer, connecting;
+        InetAddressAndPort peer, connecting;
         try
         {
-            peer = InetAddress.getByName((String) values[1]);
-            connecting = InetAddress.getByName((String) values[2]);
+            peer = InetAddressAndPort.getByNameOverrideDefaults((String) values[1], (Integer)values[2]);
+            connecting = InetAddressAndPort.getByNameOverrideDefaults((String) values[3], (Integer)values[4]);
         }
         catch (UnknownHostException e)
         {
@@ -139,11 +147,11 @@ public class SessionInfoCompositeData
             }
         };
         SessionInfo info = new SessionInfo(peer,
-                                           (int)values[8],
+                                           (int)values[10],
                                            connecting,
-                                           fromArrayOfCompositeData((CompositeData[]) values[3], toStreamSummary),
-                                           fromArrayOfCompositeData((CompositeData[]) values[4], toStreamSummary),
-                                           StreamSession.State.valueOf((String) values[5]));
+                                           fromArrayOfCompositeData((CompositeData[]) values[5], toStreamSummary),
+                                           fromArrayOfCompositeData((CompositeData[]) values[6], toStreamSummary),
+                                           StreamSession.State.valueOf((String) values[7]));
         Function<CompositeData, ProgressInfo> toProgressInfo = new Function<CompositeData, ProgressInfo>()
         {
             public ProgressInfo apply(CompositeData input)
@@ -151,11 +159,11 @@ public class SessionInfoCompositeData
                 return ProgressInfoCompositeData.fromCompositeData(input);
             }
         };
-        for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[6], toProgressInfo))
+        for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[8], toProgressInfo))
         {
             info.updateProgress(progress);
         }
-        for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[7], toProgressInfo))
+        for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[9], toProgressInfo))
         {
             info.updateProgress(progress);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index fedb971..13a3358 100644
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.streaming.messages;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
@@ -30,6 +29,7 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.TableId;
@@ -67,13 +67,13 @@ public class FileMessageHeader
     public final UUID pendingRepair;
     public final int sstableLevel;
     public final SerializationHeader.Component header;
-    public final InetAddress sender;
+    public final InetAddressAndPort sender;
 
     /* cached size value */
     private transient final long size;
 
     private FileMessageHeader(TableId tableId,
-                             InetAddress sender,
+                             InetAddressAndPort sender,
                              UUID planId,
                              int sessionIndex,
                              int sequenceNumber,
@@ -106,7 +106,7 @@ public class FileMessageHeader
     }
 
     public FileMessageHeader(TableId tableId,
-                             InetAddress sender,
+                             InetAddressAndPort sender,
                              UUID planId,
                              int sessionIndex,
                              int sequenceNumber,
@@ -218,7 +218,7 @@ public class FileMessageHeader
         public CompressionInfo serialize(FileMessageHeader header, DataOutputPlus out, int version) throws IOException
         {
             header.tableId.serialize(out);
-            CompactEndpointSerializationHelper.serialize(header.sender, out);
+            CompactEndpointSerializationHelper.streamingInstance.serialize(header.sender, out, version);
             UUIDSerializer.serializer.serialize(header.planId, out, version);
             out.writeInt(header.sessionIndex);
             out.writeInt(header.sequenceNumber);
@@ -252,7 +252,7 @@ public class FileMessageHeader
         public FileMessageHeader deserialize(DataInputPlus in, int version) throws IOException
         {
             TableId tableId = TableId.deserialize(in);
-            InetAddress sender = CompactEndpointSerializationHelper.deserialize(in);
+            InetAddressAndPort sender = CompactEndpointSerializationHelper.streamingInstance.deserialize(in, version);
             UUID planId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
             int sessionIndex = in.readInt();
             int sequenceNumber = in.readInt();
@@ -276,7 +276,7 @@ public class FileMessageHeader
         public long serializedSize(FileMessageHeader header, int version)
         {
             long size = header.tableId.serializedSize();
-            size += CompactEndpointSerializationHelper.serializedSize(header.sender);
+            size += CompactEndpointSerializationHelper.streamingInstance.serializedSize(header.sender, version);
             size += UUIDSerializer.serializer.serializedSize(header.planId, version);
             size += TypeSizes.sizeof(header.sessionIndex);
             size += TypeSizes.sizeof(header.sequenceNumber);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index f44b41c..8bbcc05 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -79,7 +79,7 @@ public class OutgoingFileMessage extends StreamMessage
         SSTableReader sstable = ref.get();
         filename = sstable.getFilename();
         this.header = new FileMessageHeader(sstable.metadata().id,
-                                            FBUtilities.getBroadcastAddress(),
+                                            FBUtilities.getBroadcastAddressAndPort(),
                                             session.planId(),
                                             session.sessionIndex(),
                                             sequenceNumber,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index 68c6034..fced133 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -18,12 +18,12 @@
 package org.apache.cassandra.streaming.messages;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.UUID;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.streaming.StreamOperation;
@@ -39,7 +39,7 @@ public class StreamInitMessage extends StreamMessage
 {
     public static Serializer<StreamInitMessage> serializer = new StreamInitMessageSerializer();
 
-    public final InetAddress from;
+    public final InetAddressAndPort from;
     public final int sessionIndex;
     public final UUID planId;
     public final StreamOperation streamOperation;
@@ -48,7 +48,7 @@ public class StreamInitMessage extends StreamMessage
     public final UUID pendingRepair;
     public final PreviewKind previewKind;
 
-    public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, StreamOperation streamOperation, boolean keepSSTableLevel, UUID pendingRepair, PreviewKind previewKind)
+    public StreamInitMessage(InetAddressAndPort from, int sessionIndex, UUID planId, StreamOperation streamOperation, boolean keepSSTableLevel, UUID pendingRepair, PreviewKind previewKind)
     {
         super(Type.STREAM_INIT);
         this.from = from;
@@ -73,7 +73,7 @@ public class StreamInitMessage extends StreamMessage
     {
         public void serialize(StreamInitMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
         {
-            CompactEndpointSerializationHelper.serialize(message.from, out);
+            CompactEndpointSerializationHelper.streamingInstance.serialize(message.from, out, version);
             out.writeInt(message.sessionIndex);
             UUIDSerializer.serializer.serialize(message.planId, out, MessagingService.current_version);
             out.writeUTF(message.streamOperation.getDescription());
@@ -89,7 +89,7 @@ public class StreamInitMessage extends StreamMessage
 
         public StreamInitMessage deserialize(DataInputPlus in, int version, StreamSession session) throws IOException
         {
-            InetAddress from = CompactEndpointSerializationHelper.deserialize(in);
+            InetAddressAndPort from = CompactEndpointSerializationHelper.streamingInstance.deserialize(in, version);
             int sessionIndex = in.readInt();
             UUID planId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
             String description = in.readUTF();
@@ -102,7 +102,7 @@ public class StreamInitMessage extends StreamMessage
 
         public long serializedSize(StreamInitMessage message, int version)
         {
-            long size = CompactEndpointSerializationHelper.serializedSize(message.from);
+            long size = CompactEndpointSerializationHelper.streamingInstance.serializedSize(message.from, version);
             size += TypeSizes.sizeof(message.sessionIndex);
             size += UUIDSerializer.serializer.serializedSize(message.planId, MessagingService.current_version);
             size += TypeSizes.sizeof(message.streamOperation.getDescription());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java b/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
index b56d292..cce686f 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
@@ -22,21 +22,21 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 
 import io.netty.channel.Channel;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.net.async.OutboundConnectionIdentifier;
 import org.apache.cassandra.streaming.DefaultConnectionFactory;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.streaming.StreamConnectionFactory;
 
 public class BulkLoadConnectionFactory extends DefaultConnectionFactory implements StreamConnectionFactory
 {
     private final boolean outboundBindAny;
-    private final int storagePort;
     private final int secureStoragePort;
     private final EncryptionOptions.ServerEncryptionOptions encryptionOptions;
 
-    public BulkLoadConnectionFactory(int storagePort, int secureStoragePort, EncryptionOptions.ServerEncryptionOptions encryptionOptions, boolean outboundBindAny)
+    public BulkLoadConnectionFactory(int secureStoragePort, EncryptionOptions.ServerEncryptionOptions encryptionOptions, boolean outboundBindAny)
     {
-        this.storagePort = storagePort;
         this.secureStoragePort = secureStoragePort;
         this.encryptionOptions = encryptionOptions != null && encryptionOptions.internode_encryption == EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.none
                                  ? null
@@ -50,9 +50,9 @@ public class BulkLoadConnectionFactory extends DefaultConnectionFactory implemen
         // When 'all', 'dc' and 'rack', server nodes always have SSL port open, and since thin client like sstableloader
         // does not know which node is in which dc/rack, connecting to SSL port is always the option.
         int port = encryptionOptions != null && encryptionOptions.internode_encryption != EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.none ?
-                   secureStoragePort : storagePort;
+                   secureStoragePort : connectionId.remote().port;
 
-        connectionId = connectionId.withNewConnectionAddress(new InetSocketAddress(connectionId.remote(), port));
+        connectionId = connectionId.withNewConnectionAddress(InetAddressAndPort.getByAddressOverrideDefaults(connectionId.remote().address, port));
         return createConnection(connectionId, protocolVersion, encryptionOptions);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index 0812e53..545d1f7 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -18,7 +18,7 @@
 package org.apache.cassandra.tools;
 
 import java.io.IOException;
-import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.util.Set;
 import javax.net.ssl.SSLContext;
 
@@ -33,6 +33,7 @@ import org.apache.commons.cli.Options;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.io.sstable.SSTableLoader;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.streaming.*;
 import org.apache.cassandra.utils.FBUtilities;
@@ -57,11 +58,12 @@ public class BulkLoader
                 new ExternalClient(
                         options.hosts,
                         options.nativePort,
-                        options.authProvider,
                         options.storagePort,
+                        options.authProvider,
                         options.sslStoragePort,
                         options.serverEncOptions,
-                        buildSSLOptions(options.clientEncOptions)),
+                        buildSSLOptions(options.clientEncOptions),
+                        options.allowServerPortDiscovery),
                         handler,
                         options.connectionsPerHost);
         DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle);
@@ -124,7 +126,7 @@ public class BulkLoader
         private long peak = 0;
         private int totalFiles = 0;
 
-        private final Multimap<InetAddress, SessionInfo> sessionsByHost = HashMultimap.create();
+        private final Multimap<InetAddressAndPort, SessionInfo> sessionsByHost = HashMultimap.create();
 
         public ProgressIndicator()
         {
@@ -165,7 +167,7 @@ public class BulkLoader
 
                 boolean updateTotalFiles = totalFiles == 0;
                 // recalculate progress across all sessions in all hosts and display
-                for (InetAddress peer : sessionsByHost.keySet())
+                for (InetAddressAndPort peer : sessionsByHost.keySet())
                 {
                     sb.append("[").append(peer).append("]");
 
@@ -268,20 +270,19 @@ public class BulkLoader
 
     static class ExternalClient extends NativeSSTableLoaderClient
     {
-        private final int storagePort;
         private final int sslStoragePort;
         private final EncryptionOptions.ServerEncryptionOptions serverEncOptions;
 
-        public ExternalClient(Set<InetAddress> hosts,
-                              int port,
-                              AuthProvider authProvider,
+        public ExternalClient(Set<InetSocketAddress> hosts,
+                              int nativePort,
                               int storagePort,
+                              AuthProvider authProvider,
                               int sslStoragePort,
                               EncryptionOptions.ServerEncryptionOptions serverEncryptionOptions,
-                              SSLOptions sslOptions)
+                              SSLOptions sslOptions,
+                              boolean allowServerPortDiscovery)
         {
-            super(hosts, port, authProvider, sslOptions);
-            this.storagePort = storagePort;
+            super(hosts, nativePort, storagePort, authProvider, sslOptions, allowServerPortDiscovery);
             this.sslStoragePort = sslStoragePort;
             serverEncOptions = serverEncryptionOptions;
         }
@@ -289,7 +290,7 @@ public class BulkLoader
         @Override
         public StreamConnectionFactory getConnectionFactory()
         {
-            return new BulkLoadConnectionFactory(storagePort, sslStoragePort, serverEncOptions, false);
+            return new BulkLoadConnectionFactory(sslStoragePort, serverEncOptions, false);
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org