You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/01/25 20:12:12 UTC

[15/19] cassandra git commit: Allow storage port to be configurable per node

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/PendingRangeMaps.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/PendingRangeMaps.java b/src/java/org/apache/cassandra/locator/PendingRangeMaps.java
index cfeccc4..92307a3 100644
--- a/src/java/org/apache/cassandra/locator/PendingRangeMaps.java
+++ b/src/java/org/apache/cassandra/locator/PendingRangeMaps.java
@@ -26,10 +26,9 @@ import org.apache.cassandra.dht.Token;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.InetAddress;
 import java.util.*;
 
-public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<InetAddress>>>
+public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<InetAddressAndPort>>>
 {
     private static final Logger logger = LoggerFactory.getLogger(PendingRangeMaps.class);
 
@@ -39,7 +38,7 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I
      * First two are for non-wrap-around ranges, and the last two are for wrap-around ranges.
      */
     // ascendingMap will sort the ranges by the ascending order of right token
-    final NavigableMap<Range<Token>, List<InetAddress>> ascendingMap;
+    final NavigableMap<Range<Token>, List<InetAddressAndPort>> ascendingMap;
     /**
      * sorting end ascending, if ends are same, sorting begin descending, so that token (end, end) will
      * come before (begin, end] with the same end, and (begin, end) will be selected in the tailMap.
@@ -58,7 +57,7 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I
         };
 
     // ascendingMap will sort the ranges by the descending order of left token
-    final NavigableMap<Range<Token>, List<InetAddress>> descendingMap;
+    final NavigableMap<Range<Token>, List<InetAddressAndPort>> descendingMap;
     /**
      * sorting begin descending, if begins are same, sorting end descending, so that token (begin, begin) will
      * come after (begin, end] with the same begin, and (begin, end) won't be selected in the tailMap.
@@ -78,7 +77,7 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I
         };
 
     // these two maps are for warp around ranges.
-    final NavigableMap<Range<Token>, List<InetAddress>> ascendingMapForWrapAround;
+    final NavigableMap<Range<Token>, List<InetAddressAndPort>> ascendingMapForWrapAround;
     /**
      * for wrap around range (begin, end], which begin > end.
      * Sorting end ascending, if ends are same, sorting begin ascending,
@@ -98,7 +97,7 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I
         }
     };
 
-    final NavigableMap<Range<Token>, List<InetAddress>> descendingMapForWrapAround;
+    final NavigableMap<Range<Token>, List<InetAddressAndPort>> descendingMapForWrapAround;
     /**
      * for wrap around ranges, which begin > end.
      * Sorting end ascending, so that token (begin, begin) will come after (begin, end] with the same begin,
@@ -118,28 +117,28 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I
 
     public PendingRangeMaps()
     {
-        this.ascendingMap = new TreeMap<Range<Token>, List<InetAddress>>(ascendingComparator);
-        this.descendingMap = new TreeMap<Range<Token>, List<InetAddress>>(descendingComparator);
-        this.ascendingMapForWrapAround = new TreeMap<Range<Token>, List<InetAddress>>(ascendingComparatorForWrapAround);
-        this.descendingMapForWrapAround = new TreeMap<Range<Token>, List<InetAddress>>(descendingComparatorForWrapAround);
+        this.ascendingMap = new TreeMap<Range<Token>, List<InetAddressAndPort>>(ascendingComparator);
+        this.descendingMap = new TreeMap<Range<Token>, List<InetAddressAndPort>>(descendingComparator);
+        this.ascendingMapForWrapAround = new TreeMap<Range<Token>, List<InetAddressAndPort>>(ascendingComparatorForWrapAround);
+        this.descendingMapForWrapAround = new TreeMap<Range<Token>, List<InetAddressAndPort>>(descendingComparatorForWrapAround);
     }
 
     static final void addToMap(Range<Token> range,
-                               InetAddress address,
-                               NavigableMap<Range<Token>, List<InetAddress>> ascendingMap,
-                               NavigableMap<Range<Token>, List<InetAddress>> descendingMap)
+                               InetAddressAndPort address,
+                               NavigableMap<Range<Token>, List<InetAddressAndPort>> ascendingMap,
+                               NavigableMap<Range<Token>, List<InetAddressAndPort>> descendingMap)
     {
-        List<InetAddress> addresses = ascendingMap.get(range);
+        List<InetAddressAndPort> addresses = ascendingMap.get(range);
         if (addresses == null)
         {
-            addresses = new ArrayList<InetAddress>(1);
+            addresses = new ArrayList<>(1);
             ascendingMap.put(range, addresses);
             descendingMap.put(range, addresses);
         }
         addresses.add(address);
     }
 
-    public void addPendingRange(Range<Token> range, InetAddress address)
+    public void addPendingRange(Range<Token> range, InetAddressAndPort address)
     {
         if (Range.isWrapAround(range.left, range.right))
         {
@@ -151,14 +150,14 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I
         }
     }
 
-    static final void addIntersections(Set<InetAddress> endpointsToAdd,
-                                       NavigableMap<Range<Token>, List<InetAddress>> smallerMap,
-                                       NavigableMap<Range<Token>, List<InetAddress>> biggerMap)
+    static final void addIntersections(Set<InetAddressAndPort> endpointsToAdd,
+                                       NavigableMap<Range<Token>, List<InetAddressAndPort>> smallerMap,
+                                       NavigableMap<Range<Token>, List<InetAddressAndPort>> biggerMap)
     {
         // find the intersection of two sets
         for (Range<Token> range : smallerMap.keySet())
         {
-            List<InetAddress> addresses = biggerMap.get(range);
+            List<InetAddressAndPort> addresses = biggerMap.get(range);
             if (addresses != null)
             {
                 endpointsToAdd.addAll(addresses);
@@ -166,15 +165,15 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I
         }
     }
 
-    public Collection<InetAddress> pendingEndpointsFor(Token token)
+    public Collection<InetAddressAndPort> pendingEndpointsFor(Token token)
     {
-        Set<InetAddress> endpoints = new HashSet<>();
+        Set<InetAddressAndPort> endpoints = new HashSet<>();
 
         Range searchRange = new Range(token, token);
 
         // search for non-wrap-around maps
-        NavigableMap<Range<Token>, List<InetAddress>> ascendingTailMap = ascendingMap.tailMap(searchRange, true);
-        NavigableMap<Range<Token>, List<InetAddress>> descendingTailMap = descendingMap.tailMap(searchRange, false);
+        NavigableMap<Range<Token>, List<InetAddressAndPort>> ascendingTailMap = ascendingMap.tailMap(searchRange, true);
+        NavigableMap<Range<Token>, List<InetAddressAndPort>> descendingTailMap = descendingMap.tailMap(searchRange, false);
 
         // add intersections of two maps
         if (ascendingTailMap.size() < descendingTailMap.size())
@@ -191,11 +190,11 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I
         descendingTailMap = descendingMapForWrapAround.tailMap(searchRange, false);
 
         // add them since they are all necessary.
-        for (Map.Entry<Range<Token>, List<InetAddress>> entry : ascendingTailMap.entrySet())
+        for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : ascendingTailMap.entrySet())
         {
             endpoints.addAll(entry.getValue());
         }
-        for (Map.Entry<Range<Token>, List<InetAddress>> entry : descendingTailMap.entrySet())
+        for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : descendingTailMap.entrySet())
         {
             endpoints.addAll(entry.getValue());
         }
@@ -207,11 +206,11 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I
     {
         StringBuilder sb = new StringBuilder();
 
-        for (Map.Entry<Range<Token>, List<InetAddress>> entry : this)
+        for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : this)
         {
             Range<Token> range = entry.getKey();
 
-            for (InetAddress address : entry.getValue())
+            for (InetAddressAndPort address : entry.getValue())
             {
                 sb.append(address).append(':').append(range);
                 sb.append(System.getProperty("line.separator"));
@@ -222,7 +221,7 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I
     }
 
     @Override
-    public Iterator<Map.Entry<Range<Token>, List<InetAddress>>> iterator()
+    public Iterator<Map.Entry<Range<Token>, List<InetAddressAndPort>>> iterator()
     {
         return Iterators.concat(ascendingMap.entrySet().iterator(), ascendingMapForWrapAround.entrySet().iterator());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java b/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
index 2908976..3a9b161 100644
--- a/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
+++ b/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.locator;
 
 import java.io.InputStream;
-import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -55,7 +54,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
     public static final String SNITCH_PROPERTIES_FILENAME = "cassandra-topology.properties";
     private static final int DEFAULT_REFRESH_PERIOD_IN_SECONDS = 5;
 
-    private static volatile Map<InetAddress, String[]> endpointMap;
+    private static volatile Map<InetAddressAndPort, String[]> endpointMap;
     private static volatile String[] defaultDCRack;
 
     private volatile boolean gossipStarted;
@@ -93,7 +92,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
      * @param endpoint endpoint to process
      * @return a array of string with the first index being the data center and the second being the rack
      */
-    public static String[] getEndpointInfo(InetAddress endpoint)
+    public static String[] getEndpointInfo(InetAddressAndPort endpoint)
     {
         String[] rawEndpointInfo = getRawEndpointInfo(endpoint);
         if (rawEndpointInfo == null)
@@ -101,7 +100,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
         return rawEndpointInfo;
     }
 
-    private static String[] getRawEndpointInfo(InetAddress endpoint)
+    private static String[] getRawEndpointInfo(InetAddressAndPort endpoint)
     {
         String[] value = endpointMap.get(endpoint);
         if (value == null)
@@ -118,7 +117,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
      * @param endpoint the endpoint to process
      * @return string of data center
      */
-    public String getDatacenter(InetAddress endpoint)
+    public String getDatacenter(InetAddressAndPort endpoint)
     {
         String[] info = getEndpointInfo(endpoint);
         assert info != null : "No location defined for endpoint " + endpoint;
@@ -131,7 +130,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
      * @param endpoint the endpoint to process
      * @return string of rack
      */
-    public String getRack(InetAddress endpoint)
+    public String getRack(InetAddressAndPort endpoint)
     {
         String[] info = getEndpointInfo(endpoint);
         assert info != null : "No location defined for endpoint " + endpoint;
@@ -140,7 +139,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
 
     public void reloadConfiguration(boolean isUpdate) throws ConfigurationException
     {
-        HashMap<InetAddress, String[]> reloadedMap = new HashMap<>();
+        HashMap<InetAddressAndPort, String[]> reloadedMap = new HashMap<>();
         String[] reloadedDefaultDCRack = null;
 
         Properties properties = new Properties();
@@ -168,11 +167,11 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
             }
             else
             {
-                InetAddress host;
+                InetAddressAndPort host;
                 String hostString = StringUtils.remove(key, '/');
                 try
                 {
-                    host = InetAddress.getByName(hostString);
+                    host = InetAddressAndPort.getByName(hostString);
                 }
                 catch (UnknownHostException e)
                 {
@@ -186,7 +185,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
                 reloadedMap.put(host, token);
             }
         }
-        InetAddress broadcastAddress = FBUtilities.getBroadcastAddress();
+        InetAddressAndPort broadcastAddress = FBUtilities.getBroadcastAddressAndPort();
         String[] localInfo = reloadedMap.get(broadcastAddress);
         if (reloadedDefaultDCRack == null && localInfo == null)
             throw new ConfigurationException(String.format("Snitch definitions at %s do not define a location for " +
@@ -194,7 +193,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
                                                            SNITCH_PROPERTIES_FILENAME, broadcastAddress));
         // internode messaging code converts our broadcast address to local,
         // make sure we can be found at that as well.
-        InetAddress localAddress = FBUtilities.getLocalAddress();
+        InetAddressAndPort localAddress = FBUtilities.getLocalAddressAndPort();
         if (!localAddress.equals(broadcastAddress) && !reloadedMap.containsKey(localAddress))
             reloadedMap.put(localAddress, localInfo);
 
@@ -204,7 +203,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
         if (logger.isTraceEnabled())
         {
             StringBuilder sb = new StringBuilder();
-            for (Map.Entry<InetAddress, String[]> entry : reloadedMap.entrySet())
+            for (Map.Entry<InetAddressAndPort, String[]> entry : reloadedMap.entrySet())
                 sb.append(entry.getKey()).append(':').append(Arrays.toString(entry.getValue())).append(", ");
             logger.trace("Loaded network topology from property file: {}", StringUtils.removeEnd(sb.toString(), ", "));
         }
@@ -231,17 +230,17 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
      * @param reloadedDefaultDCRack - the default dc:rack or null if no default
      * @return true if we can continue updating (no live host had dc or rack updated)
      */
-    private static boolean livenessCheck(HashMap<InetAddress, String[]> reloadedMap, String[] reloadedDefaultDCRack)
+    private static boolean livenessCheck(HashMap<InetAddressAndPort, String[]> reloadedMap, String[] reloadedDefaultDCRack)
     {
         // If the default has changed we must check all live hosts but hopefully we will find a live
         // host quickly and interrupt the loop. Otherwise we only check the live hosts that were either
         // in the old set or in the new set
-        Set<InetAddress> hosts = Arrays.equals(defaultDCRack, reloadedDefaultDCRack)
+        Set<InetAddressAndPort> hosts = Arrays.equals(defaultDCRack, reloadedDefaultDCRack)
                                  ? Sets.intersection(StorageService.instance.getLiveRingMembers(), // same default
                                                      Sets.union(endpointMap.keySet(), reloadedMap.keySet()))
                                  : StorageService.instance.getLiveRingMembers(); // default updated
 
-        for (InetAddress host : hosts)
+        for (InetAddressAndPort host : hosts)
         {
             String[] origValue = endpointMap.containsKey(host) ? endpointMap.get(host) : defaultDCRack;
             String[] updateValue = reloadedMap.containsKey(host) ? reloadedMap.get(host) : reloadedDefaultDCRack;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/RackInferringSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/RackInferringSnitch.java b/src/java/org/apache/cassandra/locator/RackInferringSnitch.java
index a6ea1ab..6ae10cc 100644
--- a/src/java/org/apache/cassandra/locator/RackInferringSnitch.java
+++ b/src/java/org/apache/cassandra/locator/RackInferringSnitch.java
@@ -17,21 +17,19 @@
  */
 package org.apache.cassandra.locator;
 
-import java.net.InetAddress;
-
 /**
  * A simple endpoint snitch implementation that assumes datacenter and rack information is encoded
  * in the 2nd and 3rd octets of the ip address, respectively.
  */
 public class RackInferringSnitch extends AbstractNetworkTopologySnitch
 {
-    public String getRack(InetAddress endpoint)
+    public String getRack(InetAddressAndPort endpoint)
     {
-        return Integer.toString(endpoint.getAddress()[2] & 0xFF, 10);
+        return Integer.toString(endpoint.address.getAddress()[2] & 0xFF, 10);
     }
 
-    public String getDatacenter(InetAddress endpoint)
+    public String getDatacenter(InetAddressAndPort endpoint)
     {
-        return Integer.toString(endpoint.getAddress()[1] & 0xFF, 10);
+        return Integer.toString(endpoint.address.getAddress()[1] & 0xFF, 10);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
index 0b344c9..5479010 100644
--- a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
+++ b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.locator;
 
-import java.net.InetAddress;
 import java.net.UnknownHostException;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -49,11 +48,11 @@ public class ReconnectableSnitchHelper implements IEndpointStateChangeSubscriber
         this.preferLocal = preferLocal;
     }
 
-    private void reconnect(InetAddress publicAddress, VersionedValue localAddressValue)
+    private void reconnect(InetAddressAndPort publicAddress, VersionedValue localAddressValue)
     {
         try
         {
-            reconnect(publicAddress, InetAddress.getByName(localAddressValue.value), snitch, localDc);
+            reconnect(publicAddress, InetAddressAndPort.getByName(localAddressValue.value), snitch, localDc);
         }
         catch (UnknownHostException e)
         {
@@ -62,9 +61,9 @@ public class ReconnectableSnitchHelper implements IEndpointStateChangeSubscriber
     }
 
     @VisibleForTesting
-    static void reconnect(InetAddress publicAddress, InetAddress localAddress, IEndpointSnitch snitch, String localDc)
+    static void reconnect(InetAddressAndPort publicAddress, InetAddressAndPort localAddress, IEndpointSnitch snitch, String localDc)
     {
-        if (!DatabaseDescriptor.getInternodeAuthenticator().authenticate(publicAddress, MessagingService.instance().portFor(publicAddress)))
+        if (!DatabaseDescriptor.getInternodeAuthenticator().authenticate(publicAddress.address, MessagingService.instance().portFor(publicAddress)))
         {
             logger.debug("InternodeAuthenticator said don't reconnect to {} on {}", publicAddress, localAddress);
             return;
@@ -78,40 +77,65 @@ public class ReconnectableSnitchHelper implements IEndpointStateChangeSubscriber
         }
     }
 
-    public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue)
+    public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue)
     {
         // no-op
     }
 
-    public void onJoin(InetAddress endpoint, EndpointState epState)
+    public void onJoin(InetAddressAndPort endpoint, EndpointState epState)
     {
-        if (preferLocal && !Gossiper.instance.isDeadState(epState) && epState.getApplicationState(ApplicationState.INTERNAL_IP) != null)
-            reconnect(endpoint, epState.getApplicationState(ApplicationState.INTERNAL_IP));
+        if (preferLocal && !Gossiper.instance.isDeadState(epState))
+        {
+            VersionedValue address = epState.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT);
+            if (address == null)
+            {
+                address = epState.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT);
+            }
+            if (address != null)
+            {
+                reconnect(endpoint, address);
+            }
+        }
     }
 
-    public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
+    //Skeptical this will always do the right thing all the time port wise. It will converge on the right thing
+    //eventually once INTERNAL_ADDRESS_AND_PORT is populated
+    public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value)
     {
-        if (preferLocal && state == ApplicationState.INTERNAL_IP && !Gossiper.instance.isDeadState(Gossiper.instance.getEndpointStateForEndpoint(endpoint)))
-            reconnect(endpoint, value);
+        if (preferLocal && !Gossiper.instance.isDeadState(Gossiper.instance.getEndpointStateForEndpoint(endpoint)))
+        {
+            if (state == ApplicationState.INTERNAL_ADDRESS_AND_PORT)
+            {
+                reconnect(endpoint, value);
+            }
+            else if (state == ApplicationState.INTERNAL_IP &&
+                     null == Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT))
+            {
+                //Only use INTERNAL_IP if INTERNAL_ADDRESS_AND_PORT is unavailable
+                reconnect(endpoint, value);
+            }
+        }
     }
 
-    public void onAlive(InetAddress endpoint, EndpointState state)
+    public void onAlive(InetAddressAndPort endpoint, EndpointState state)
     {
-        if (preferLocal && state.getApplicationState(ApplicationState.INTERNAL_IP) != null)
-            reconnect(endpoint, state.getApplicationState(ApplicationState.INTERNAL_IP));
+        VersionedValue internalIP = state.getApplicationState(ApplicationState.INTERNAL_IP);
+        VersionedValue internalIPAndPorts = state.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT);
+        if (preferLocal && internalIP != null)
+            reconnect(endpoint, internalIPAndPorts != null ? internalIPAndPorts : internalIP);
     }
 
-    public void onDead(InetAddress endpoint, EndpointState state)
+    public void onDead(InetAddressAndPort endpoint, EndpointState state)
     {
         // do nothing.
     }
 
-    public void onRemove(InetAddress endpoint)
+    public void onRemove(InetAddressAndPort endpoint)
     {
         // do nothing.
     }
 
-    public void onRestart(InetAddress endpoint, EndpointState state)
+    public void onRestart(InetAddressAndPort endpoint, EndpointState state)
     {
         // do nothing.
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/SeedProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/SeedProvider.java b/src/java/org/apache/cassandra/locator/SeedProvider.java
index a013fbb..7efa9e0 100644
--- a/src/java/org/apache/cassandra/locator/SeedProvider.java
+++ b/src/java/org/apache/cassandra/locator/SeedProvider.java
@@ -17,10 +17,9 @@
  */
 package org.apache.cassandra.locator;
 
-import java.net.InetAddress;
 import java.util.List;
 
 public interface SeedProvider
 {
-    List<InetAddress> getSeeds();
+    List<InetAddressAndPort> getSeeds();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/SimpleSeedProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/SimpleSeedProvider.java b/src/java/org/apache/cassandra/locator/SimpleSeedProvider.java
index 665261d..47401a0 100644
--- a/src/java/org/apache/cassandra/locator/SimpleSeedProvider.java
+++ b/src/java/org/apache/cassandra/locator/SimpleSeedProvider.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.locator;
 
-import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -26,6 +25,7 @@ import java.util.Map;
 
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,7 +35,7 @@ public class SimpleSeedProvider implements SeedProvider
 
     public SimpleSeedProvider(Map<String, String> args) {}
 
-    public List<InetAddress> getSeeds()
+    public List<InetAddressAndPort> getSeeds()
     {
         Config conf;
         try
@@ -47,12 +47,12 @@ public class SimpleSeedProvider implements SeedProvider
             throw new AssertionError(e);
         }
         String[] hosts = conf.seed_provider.parameters.get("seeds").split(",", -1);
-        List<InetAddress> seeds = new ArrayList<InetAddress>(hosts.length);
+        List<InetAddressAndPort> seeds = new ArrayList<>(hosts.length);
         for (String host : hosts)
         {
             try
             {
-                seeds.add(InetAddress.getByName(host.trim()));
+                seeds.add(InetAddressAndPort.getByName(host.trim()));
             }
             catch (UnknownHostException ex)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/SimpleSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/SimpleSnitch.java b/src/java/org/apache/cassandra/locator/SimpleSnitch.java
index 27648c8..e31fc6b 100644
--- a/src/java/org/apache/cassandra/locator/SimpleSnitch.java
+++ b/src/java/org/apache/cassandra/locator/SimpleSnitch.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.locator;
 
-import java.net.InetAddress;
 import java.util.List;
 
 /**
@@ -27,23 +26,23 @@ import java.util.List;
  */
 public class SimpleSnitch extends AbstractEndpointSnitch
 {
-    public String getRack(InetAddress endpoint)
+    public String getRack(InetAddressAndPort endpoint)
     {
         return "rack1";
     }
 
-    public String getDatacenter(InetAddress endpoint)
+    public String getDatacenter(InetAddressAndPort endpoint)
     {
         return "datacenter1";
     }
 
     @Override
-    public void sortByProximity(final InetAddress address, List<InetAddress> addresses)
+    public void sortByProximity(final InetAddressAndPort address, List<InetAddressAndPort> addresses)
     {
         // Optimization to avoid walking the list
     }
 
-    public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+    public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2)
     {
         // Making all endpoints equal ensures we won't change the original ordering (since
         // Collections.sort is guaranteed to be stable)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/SimpleStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/SimpleStrategy.java b/src/java/org/apache/cassandra/locator/SimpleStrategy.java
index 9a5062b..545ad28 100644
--- a/src/java/org/apache/cassandra/locator/SimpleStrategy.java
+++ b/src/java/org/apache/cassandra/locator/SimpleStrategy.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.locator;
 
-import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Collection;
@@ -42,11 +41,11 @@ public class SimpleStrategy extends AbstractReplicationStrategy
         super(keyspaceName, tokenMetadata, snitch, configOptions);
     }
 
-    public List<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
+    public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
     {
         int replicas = getReplicationFactor();
         ArrayList<Token> tokens = metadata.sortedTokens();
-        List<InetAddress> endpoints = new ArrayList<InetAddress>(replicas);
+        List<InetAddressAndPort> endpoints = new ArrayList<InetAddressAndPort>(replicas);
 
         if (tokens.isEmpty())
             return endpoints;
@@ -55,7 +54,7 @@ public class SimpleStrategy extends AbstractReplicationStrategy
         Iterator<Token> iter = TokenMetadata.ringIterator(tokens, token, false);
         while (endpoints.size() < replicas && iter.hasNext())
         {
-            InetAddress ep = metadata.getEndpoint(iter.next());
+            InetAddressAndPort ep = metadata.getEndpoint(iter.next());
             if (!endpoints.contains(ep))
                 endpoints.add(ep);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 00f9536..e2c4628 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.locator;
 
-import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
@@ -52,12 +51,12 @@ public class TokenMetadata
      * Each Token is associated with exactly one Address, but each Address may have
      * multiple tokens.  Hence, the BiMultiValMap collection.
      */
-    private final BiMultiValMap<Token, InetAddress> tokenToEndpointMap;
+    private final BiMultiValMap<Token, InetAddressAndPort> tokenToEndpointMap;
 
     /** Maintains endpoint to host ID map of every node in the cluster */
-    private final BiMap<InetAddress, UUID> endpointToHostIdMap;
+    private final BiMap<InetAddressAndPort, UUID> endpointToHostIdMap;
 
-    // Prior to CASSANDRA-603, we just had <tt>Map<Range, InetAddress> pendingRanges<tt>,
+    // Prior to CASSANDRA-603, we just had <tt>Map<Range, InetAddressAndPort> pendingRanges<tt>,
     // which was added to when a node began bootstrap and removed from when it finished.
     //
     // This is inadequate when multiple changes are allowed simultaneously.  For example,
@@ -70,8 +69,8 @@ public class TokenMetadata
     //
     // So, we made two changes:
     //
-    // First, we changed pendingRanges to a <tt>Multimap<Range, InetAddress></tt> (now
-    // <tt>Map<String, Multimap<Range, InetAddress>></tt>, because replication strategy
+    // First, we changed pendingRanges to a <tt>Multimap<Range, InetAddressAndPort></tt> (now
+    // <tt>Map<String, Multimap<Range, InetAddressAndPort>></tt>, because replication strategy
     // and options are per-KeySpace).
     //
     // Second, we added the bootstrapTokens and leavingEndpoints collections, so we can
@@ -81,17 +80,17 @@ public class TokenMetadata
     // Finally, note that recording the tokens of joining nodes in bootstrapTokens also
     // means we can detect and reject the addition of multiple nodes at the same token
     // before one becomes part of the ring.
-    private final BiMultiValMap<Token, InetAddress> bootstrapTokens = new BiMultiValMap<>();
+    private final BiMultiValMap<Token, InetAddressAndPort> bootstrapTokens = new BiMultiValMap<>();
 
-    private final BiMap<InetAddress, InetAddress> replacementToOriginal = HashBiMap.create();
+    private final BiMap<InetAddressAndPort, InetAddressAndPort> replacementToOriginal = HashBiMap.create();
 
     // (don't need to record Token here since it's still part of tokenToEndpointMap until it's done leaving)
-    private final Set<InetAddress> leavingEndpoints = new HashSet<>();
+    private final Set<InetAddressAndPort> leavingEndpoints = new HashSet<>();
     // this is a cache of the calculation from {tokenToEndpointMap, bootstrapTokens, leavingEndpoints}
     private final ConcurrentMap<String, PendingRangeMaps> pendingRanges = new ConcurrentHashMap<String, PendingRangeMaps>();
 
     // nodes which are migrating to the new tokens in the ring
-    private final Set<Pair<Token, InetAddress>> movingEndpoints = new HashSet<>();
+    private final Set<Pair<Token, InetAddressAndPort>> movingEndpoints = new HashSet<>();
 
     /* Use this lock for manipulating the token map */
     private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
@@ -100,26 +99,18 @@ public class TokenMetadata
     private final Topology topology;
     public final IPartitioner partitioner;
 
-    private static final Comparator<InetAddress> inetaddressCmp = new Comparator<InetAddress>()
-    {
-        public int compare(InetAddress o1, InetAddress o2)
-        {
-            return ByteBuffer.wrap(o1.getAddress()).compareTo(ByteBuffer.wrap(o2.getAddress()));
-        }
-    };
-
     // signals replication strategies that nodes have joined or left the ring and they need to recompute ownership
     private volatile long ringVersion = 0;
 
     public TokenMetadata()
     {
-        this(SortedBiMultiValMap.<Token, InetAddress>create(null, inetaddressCmp),
-             HashBiMap.<InetAddress, UUID>create(),
+        this(SortedBiMultiValMap.<Token, InetAddressAndPort>create(),
+             HashBiMap.create(),
              new Topology(),
              DatabaseDescriptor.getPartitioner());
     }
 
-    private TokenMetadata(BiMultiValMap<Token, InetAddress> tokenToEndpointMap, BiMap<InetAddress, UUID> endpointsMap, Topology topology, IPartitioner partitioner)
+    private TokenMetadata(BiMultiValMap<Token, InetAddressAndPort> tokenToEndpointMap, BiMap<InetAddressAndPort, UUID> endpointsMap, Topology topology, IPartitioner partitioner)
     {
         this.tokenToEndpointMap = tokenToEndpointMap;
         this.topology = topology;
@@ -143,7 +134,7 @@ public class TokenMetadata
     }
 
     /** @return the number of nodes bootstrapping into source's primary range */
-    public int pendingRangeChanges(InetAddress source)
+    public int pendingRangeChanges(InetAddressAndPort source)
     {
         int n = 0;
         Collection<Range<Token>> sourceRanges = getPrimaryRangesFor(getTokens(source));
@@ -165,14 +156,14 @@ public class TokenMetadata
     /**
      * Update token map with a single token/endpoint pair in normal state.
      */
-    public void updateNormalToken(Token token, InetAddress endpoint)
+    public void updateNormalToken(Token token, InetAddressAndPort endpoint)
     {
         updateNormalTokens(Collections.singleton(token), endpoint);
     }
 
-    public void updateNormalTokens(Collection<Token> tokens, InetAddress endpoint)
+    public void updateNormalTokens(Collection<Token> tokens, InetAddressAndPort endpoint)
     {
-        Multimap<InetAddress, Token> endpointTokens = HashMultimap.create();
+        Multimap<InetAddressAndPort, Token> endpointTokens = HashMultimap.create();
         for (Token token : tokens)
             endpointTokens.put(endpoint, token);
         updateNormalTokens(endpointTokens);
@@ -184,7 +175,7 @@ public class TokenMetadata
      * Prefer this whenever there are multiple pairs to update, as each update (whether a single or multiple)
      * is expensive (CASSANDRA-3831).
      */
-    public void updateNormalTokens(Multimap<InetAddress, Token> endpointTokens)
+    public void updateNormalTokens(Multimap<InetAddressAndPort, Token> endpointTokens)
     {
         if (endpointTokens.isEmpty())
             return;
@@ -193,7 +184,7 @@ public class TokenMetadata
         try
         {
             boolean shouldSortTokens = false;
-            for (InetAddress endpoint : endpointTokens.keySet())
+            for (InetAddressAndPort endpoint : endpointTokens.keySet())
             {
                 Collection<Token> tokens = endpointTokens.get(endpoint);
 
@@ -208,7 +199,7 @@ public class TokenMetadata
 
                 for (Token token : tokens)
                 {
-                    InetAddress prev = tokenToEndpointMap.put(token, endpoint);
+                    InetAddressAndPort prev = tokenToEndpointMap.put(token, endpoint);
                     if (!endpoint.equals(prev))
                     {
                         if (prev != null)
@@ -231,7 +222,7 @@ public class TokenMetadata
      * Store an end-point to host ID mapping.  Each ID must be unique, and
      * cannot be changed after the fact.
      */
-    public void updateHostId(UUID hostId, InetAddress endpoint)
+    public void updateHostId(UUID hostId, InetAddressAndPort endpoint)
     {
         assert hostId != null;
         assert endpoint != null;
@@ -239,7 +230,7 @@ public class TokenMetadata
         lock.writeLock().lock();
         try
         {
-            InetAddress storedEp = endpointToHostIdMap.inverse().get(hostId);
+            InetAddressAndPort storedEp = endpointToHostIdMap.inverse().get(hostId);
             if (storedEp != null)
             {
                 if (!storedEp.equals(endpoint) && (FailureDetector.instance.isAlive(storedEp)))
@@ -265,7 +256,7 @@ public class TokenMetadata
     }
 
     /** Return the unique host ID for an end-point. */
-    public UUID getHostId(InetAddress endpoint)
+    public UUID getHostId(InetAddressAndPort endpoint)
     {
         lock.readLock().lock();
         try
@@ -279,7 +270,7 @@ public class TokenMetadata
     }
 
     /** Return the end-point for a unique host ID */
-    public InetAddress getEndpointForHostId(UUID hostId)
+    public InetAddressAndPort getEndpointForHostId(UUID hostId)
     {
         lock.readLock().lock();
         try
@@ -293,12 +284,12 @@ public class TokenMetadata
     }
 
     /** @return a copy of the endpoint-to-id map for read-only operations */
-    public Map<InetAddress, UUID> getEndpointToHostIdMapForReading()
+    public Map<InetAddressAndPort, UUID> getEndpointToHostIdMapForReading()
     {
         lock.readLock().lock();
         try
         {
-            Map<InetAddress, UUID> readMap = new HashMap<>();
+            Map<InetAddressAndPort, UUID> readMap = new HashMap<>();
             readMap.putAll(endpointToHostIdMap);
             return readMap;
         }
@@ -309,17 +300,17 @@ public class TokenMetadata
     }
 
     @Deprecated
-    public void addBootstrapToken(Token token, InetAddress endpoint)
+    public void addBootstrapToken(Token token, InetAddressAndPort endpoint)
     {
         addBootstrapTokens(Collections.singleton(token), endpoint);
     }
 
-    public void addBootstrapTokens(Collection<Token> tokens, InetAddress endpoint)
+    public void addBootstrapTokens(Collection<Token> tokens, InetAddressAndPort endpoint)
     {
         addBootstrapTokens(tokens, endpoint, null);
     }
 
-    private void addBootstrapTokens(Collection<Token> tokens, InetAddress endpoint, InetAddress original)
+    private void addBootstrapTokens(Collection<Token> tokens, InetAddressAndPort endpoint, InetAddressAndPort original)
     {
         assert tokens != null && !tokens.isEmpty();
         assert endpoint != null;
@@ -328,7 +319,7 @@ public class TokenMetadata
         try
         {
 
-            InetAddress oldEndpoint;
+            InetAddressAndPort oldEndpoint;
 
             for (Token token : tokens)
             {
@@ -352,7 +343,7 @@ public class TokenMetadata
         }
     }
 
-    public void addReplaceTokens(Collection<Token> replacingTokens, InetAddress newNode, InetAddress oldNode)
+    public void addReplaceTokens(Collection<Token> replacingTokens, InetAddressAndPort newNode, InetAddressAndPort oldNode)
     {
         assert replacingTokens != null && !replacingTokens.isEmpty();
         assert newNode != null && oldNode != null;
@@ -379,12 +370,12 @@ public class TokenMetadata
         }
     }
 
-    public Optional<InetAddress> getReplacementNode(InetAddress endpoint)
+    public Optional<InetAddressAndPort> getReplacementNode(InetAddressAndPort endpoint)
     {
         return Optional.ofNullable(replacementToOriginal.inverse().get(endpoint));
     }
 
-    public Optional<InetAddress> getReplacingNode(InetAddress endpoint)
+    public Optional<InetAddressAndPort> getReplacingNode(InetAddressAndPort endpoint)
     {
         return Optional.ofNullable((replacementToOriginal.get(endpoint)));
     }
@@ -405,7 +396,7 @@ public class TokenMetadata
         }
     }
 
-    public void addLeavingEndpoint(InetAddress endpoint)
+    public void addLeavingEndpoint(InetAddressAndPort endpoint)
     {
         assert endpoint != null;
 
@@ -425,7 +416,7 @@ public class TokenMetadata
      * @param token token which is node moving to
      * @param endpoint address of the moving node
      */
-    public void addMovingEndpoint(Token token, InetAddress endpoint)
+    public void addMovingEndpoint(Token token, InetAddressAndPort endpoint)
     {
         assert endpoint != null;
 
@@ -441,7 +432,7 @@ public class TokenMetadata
         }
     }
 
-    public void removeEndpoint(InetAddress endpoint)
+    public void removeEndpoint(InetAddressAndPort endpoint)
     {
         assert endpoint != null;
 
@@ -469,7 +460,7 @@ public class TokenMetadata
     /**
      * This is called when the snitch properties for this endpoint are updated, see CASSANDRA-10238.
      */
-    public void updateTopology(InetAddress endpoint)
+    public void updateTopology(InetAddressAndPort endpoint)
     {
         assert endpoint != null;
 
@@ -509,14 +500,14 @@ public class TokenMetadata
      * Remove pair of token/address from moving endpoints
      * @param endpoint address of the moving node
      */
-    public void removeFromMoving(InetAddress endpoint)
+    public void removeFromMoving(InetAddressAndPort endpoint)
     {
         assert endpoint != null;
 
         lock.writeLock().lock();
         try
         {
-            for (Pair<Token, InetAddress> pair : movingEndpoints)
+            for (Pair<Token, InetAddressAndPort> pair : movingEndpoints)
             {
                 if (pair.right.equals(endpoint))
                 {
@@ -533,7 +524,7 @@ public class TokenMetadata
         }
     }
 
-    public Collection<Token> getTokens(InetAddress endpoint)
+    public Collection<Token> getTokens(InetAddressAndPort endpoint)
     {
         assert endpoint != null;
         assert isMember(endpoint); // don't want to return nulls
@@ -550,12 +541,12 @@ public class TokenMetadata
     }
 
     @Deprecated
-    public Token getToken(InetAddress endpoint)
+    public Token getToken(InetAddressAndPort endpoint)
     {
         return getTokens(endpoint).iterator().next();
     }
 
-    public boolean isMember(InetAddress endpoint)
+    public boolean isMember(InetAddressAndPort endpoint)
     {
         assert endpoint != null;
 
@@ -570,7 +561,7 @@ public class TokenMetadata
         }
     }
 
-    public boolean isLeaving(InetAddress endpoint)
+    public boolean isLeaving(InetAddressAndPort endpoint)
     {
         assert endpoint != null;
 
@@ -585,7 +576,7 @@ public class TokenMetadata
         }
     }
 
-    public boolean isMoving(InetAddress endpoint)
+    public boolean isMoving(InetAddressAndPort endpoint)
     {
         assert endpoint != null;
 
@@ -593,7 +584,7 @@ public class TokenMetadata
 
         try
         {
-            for (Pair<Token, InetAddress> pair : movingEndpoints)
+            for (Pair<Token, InetAddressAndPort> pair : movingEndpoints)
             {
                 if (pair.right.equals(endpoint))
                     return true;
@@ -618,7 +609,7 @@ public class TokenMetadata
         lock.readLock().lock();
         try
         {
-            return new TokenMetadata(SortedBiMultiValMap.create(tokenToEndpointMap, null, inetaddressCmp),
+            return new TokenMetadata(SortedBiMultiValMap.create(tokenToEndpointMap),
                                      HashBiMap.create(endpointToHostIdMap),
                                      new Topology(topology),
                                      partitioner);
@@ -673,9 +664,9 @@ public class TokenMetadata
         }
     }
 
-    private static TokenMetadata removeEndpoints(TokenMetadata allLeftMetadata, Set<InetAddress> leavingEndpoints)
+    private static TokenMetadata removeEndpoints(TokenMetadata allLeftMetadata, Set<InetAddressAndPort> leavingEndpoints)
     {
-        for (InetAddress endpoint : leavingEndpoints)
+        for (InetAddressAndPort endpoint : leavingEndpoints)
             allLeftMetadata.removeEndpoint(endpoint);
 
         return allLeftMetadata;
@@ -695,11 +686,11 @@ public class TokenMetadata
         {
             TokenMetadata metadata = cloneOnlyTokenMap();
 
-            for (InetAddress endpoint : leavingEndpoints)
+            for (InetAddressAndPort endpoint : leavingEndpoints)
                 metadata.removeEndpoint(endpoint);
 
 
-            for (Pair<Token, InetAddress> pair : movingEndpoints)
+            for (Pair<Token, InetAddressAndPort> pair : movingEndpoints)
                 metadata.updateNormalToken(pair.left, pair.right);
 
             return metadata;
@@ -710,7 +701,7 @@ public class TokenMetadata
         }
     }
 
-    public InetAddress getEndpoint(Token token)
+    public InetAddressAndPort getEndpoint(Token token)
     {
         lock.readLock().lock();
         try
@@ -742,17 +733,17 @@ public class TokenMetadata
         return sortedTokens;
     }
 
-    public Multimap<Range<Token>, InetAddress> getPendingRangesMM(String keyspaceName)
+    public Multimap<Range<Token>, InetAddressAndPort> getPendingRangesMM(String keyspaceName)
     {
-        Multimap<Range<Token>, InetAddress> map = HashMultimap.create();
+        Multimap<Range<Token>, InetAddressAndPort> map = HashMultimap.create();
         PendingRangeMaps pendingRangeMaps = this.pendingRanges.get(keyspaceName);
 
         if (pendingRangeMaps != null)
         {
-            for (Map.Entry<Range<Token>, List<InetAddress>> entry : pendingRangeMaps)
+            for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : pendingRangeMaps)
             {
                 Range<Token> range = entry.getKey();
-                for (InetAddress address : entry.getValue())
+                for (InetAddressAndPort address : entry.getValue())
                 {
                     map.put(range, address);
                 }
@@ -768,10 +759,10 @@ public class TokenMetadata
         return this.pendingRanges.get(keyspaceName);
     }
 
-    public List<Range<Token>> getPendingRanges(String keyspaceName, InetAddress endpoint)
+    public List<Range<Token>> getPendingRanges(String keyspaceName, InetAddressAndPort endpoint)
     {
         List<Range<Token>> ranges = new ArrayList<>();
-        for (Map.Entry<Range<Token>, InetAddress> entry : getPendingRangesMM(keyspaceName).entries())
+        for (Map.Entry<Range<Token>, InetAddressAndPort> entry : getPendingRangesMM(keyspaceName).entries())
         {
             if (entry.getValue().equals(endpoint))
             {
@@ -824,9 +815,9 @@ public class TokenMetadata
                 long startedAt = System.currentTimeMillis();
 
                 // create clone of current state
-                BiMultiValMap<Token, InetAddress> bootstrapTokensClone = new BiMultiValMap<>();
-                Set<InetAddress> leavingEndpointsClone = new HashSet<>();
-                Set<Pair<Token, InetAddress>> movingEndpointsClone = new HashSet<>();
+                BiMultiValMap<Token, InetAddressAndPort> bootstrapTokensClone = new BiMultiValMap<>();
+                Set<InetAddressAndPort> leavingEndpointsClone = new HashSet<>();
+                Set<Pair<Token, InetAddressAndPort>> movingEndpointsClone = new HashSet<>();
                 TokenMetadata metadata;
 
                 lock.readLock().lock();
@@ -859,29 +850,29 @@ public class TokenMetadata
      */
     private static PendingRangeMaps calculatePendingRanges(AbstractReplicationStrategy strategy,
                                                            TokenMetadata metadata,
-                                                           BiMultiValMap<Token, InetAddress> bootstrapTokens,
-                                                           Set<InetAddress> leavingEndpoints,
-                                                           Set<Pair<Token, InetAddress>> movingEndpoints)
+                                                           BiMultiValMap<Token, InetAddressAndPort> bootstrapTokens,
+                                                           Set<InetAddressAndPort> leavingEndpoints,
+                                                           Set<Pair<Token, InetAddressAndPort>> movingEndpoints)
     {
         PendingRangeMaps newPendingRanges = new PendingRangeMaps();
 
-        Multimap<InetAddress, Range<Token>> addressRanges = strategy.getAddressRanges(metadata);
+        Multimap<InetAddressAndPort, Range<Token>> addressRanges = strategy.getAddressRanges(metadata);
 
         // Copy of metadata reflecting the situation after all leave operations are finished.
         TokenMetadata allLeftMetadata = removeEndpoints(metadata.cloneOnlyTokenMap(), leavingEndpoints);
 
         // get all ranges that will be affected by leaving nodes
         Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>();
-        for (InetAddress endpoint : leavingEndpoints)
+        for (InetAddressAndPort endpoint : leavingEndpoints)
             affectedRanges.addAll(addressRanges.get(endpoint));
 
         // for each of those ranges, find what new nodes will be responsible for the range when
         // all leaving nodes are gone.
         for (Range<Token> range : affectedRanges)
         {
-            Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
-            Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
-            for (InetAddress address : Sets.difference(newEndpoints, currentEndpoints))
+            Set<InetAddressAndPort> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
+            Set<InetAddressAndPort> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
+            for (InetAddressAndPort address : Sets.difference(newEndpoints, currentEndpoints))
             {
                 newPendingRanges.addPendingRange(range, address);
             }
@@ -892,8 +883,8 @@ public class TokenMetadata
 
         // For each of the bootstrapping nodes, simply add and remove them one by one to
         // allLeftMetadata and check in between what their ranges would be.
-        Multimap<InetAddress, Token> bootstrapAddresses = bootstrapTokens.inverse();
-        for (InetAddress endpoint : bootstrapAddresses.keySet())
+        Multimap<InetAddressAndPort, Token> bootstrapAddresses = bootstrapTokens.inverse();
+        for (InetAddressAndPort endpoint : bootstrapAddresses.keySet())
         {
             Collection<Token> tokens = bootstrapAddresses.get(endpoint);
 
@@ -910,11 +901,11 @@ public class TokenMetadata
 
         // For each of the moving nodes, we do the same thing we did for bootstrapping:
         // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be.
-        for (Pair<Token, InetAddress> moving : movingEndpoints)
+        for (Pair<Token, InetAddressAndPort> moving : movingEndpoints)
         {
             //Calculate all the ranges which will could be affected. This will include the ranges before and after the move.
             Set<Range<Token>> moveAffectedRanges = new HashSet<>();
-            InetAddress endpoint = moving.right; // address of the moving node
+            InetAddressAndPort endpoint = moving.right; // address of the moving node
             //Add ranges before the move
             for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
             {
@@ -930,10 +921,10 @@ public class TokenMetadata
 
             for(Range<Token> range : moveAffectedRanges)
             {
-                Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
-                Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
-                Set<InetAddress> difference = Sets.difference(newEndpoints, currentEndpoints);
-                for(final InetAddress address : difference)
+                Set<InetAddressAndPort> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
+                Set<InetAddressAndPort> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
+                Set<InetAddressAndPort> difference = Sets.difference(newEndpoints, currentEndpoints);
+                for(final InetAddressAndPort address : difference)
                 {
                     Collection<Range<Token>> newRanges = strategy.getAddressRanges(allLeftMetadata).get(address);
                     Collection<Range<Token>> oldRanges = strategy.getAddressRanges(metadata).get(address);
@@ -973,12 +964,12 @@ public class TokenMetadata
     }
 
     /** @return a copy of the bootstrapping tokens map */
-    public BiMultiValMap<Token, InetAddress> getBootstrapTokens()
+    public BiMultiValMap<Token, InetAddressAndPort> getBootstrapTokens()
     {
         lock.readLock().lock();
         try
         {
-            return new BiMultiValMap<Token, InetAddress>(bootstrapTokens);
+            return new BiMultiValMap<>(bootstrapTokens);
         }
         finally
         {
@@ -986,7 +977,7 @@ public class TokenMetadata
         }
     }
 
-    public Set<InetAddress> getAllEndpoints()
+    public Set<InetAddressAndPort> getAllEndpoints()
     {
         lock.readLock().lock();
         try
@@ -1010,7 +1001,7 @@ public class TokenMetadata
     }
 
     /** caller should not modify leavingEndpoints */
-    public Set<InetAddress> getLeavingEndpoints()
+    public Set<InetAddressAndPort> getLeavingEndpoints()
     {
         lock.readLock().lock();
         try
@@ -1037,7 +1028,7 @@ public class TokenMetadata
      * Endpoints which are migrating to the new tokens
      * @return set of addresses of moving endpoints
      */
-    public Set<Pair<Token, InetAddress>> getMovingEndpoints()
+    public Set<Pair<Token, InetAddressAndPort>> getMovingEndpoints()
     {
         lock.readLock().lock();
         try
@@ -1148,14 +1139,14 @@ public class TokenMetadata
         lock.readLock().lock();
         try
         {
-            Multimap<InetAddress, Token> endpointToTokenMap = tokenToEndpointMap.inverse();
-            Set<InetAddress> eps = endpointToTokenMap.keySet();
+            Multimap<InetAddressAndPort, Token> endpointToTokenMap = tokenToEndpointMap.inverse();
+            Set<InetAddressAndPort> eps = endpointToTokenMap.keySet();
 
             if (!eps.isEmpty())
             {
                 sb.append("Normal Tokens:");
                 sb.append(System.getProperty("line.separator"));
-                for (InetAddress ep : eps)
+                for (InetAddressAndPort ep : eps)
                 {
                     sb.append(ep);
                     sb.append(':');
@@ -1168,7 +1159,7 @@ public class TokenMetadata
             {
                 sb.append("Bootstrapping Tokens:" );
                 sb.append(System.getProperty("line.separator"));
-                for (Map.Entry<Token, InetAddress> entry : bootstrapTokens.entrySet())
+                for (Map.Entry<Token, InetAddressAndPort> entry : bootstrapTokens.entrySet())
                 {
                     sb.append(entry.getValue()).append(':').append(entry.getKey());
                     sb.append(System.getProperty("line.separator"));
@@ -1179,7 +1170,7 @@ public class TokenMetadata
             {
                 sb.append("Leaving Endpoints:");
                 sb.append(System.getProperty("line.separator"));
-                for (InetAddress ep : leavingEndpoints)
+                for (InetAddressAndPort ep : leavingEndpoints)
                 {
                     sb.append(ep);
                     sb.append(System.getProperty("line.separator"));
@@ -1213,7 +1204,7 @@ public class TokenMetadata
         return sb.toString();
     }
 
-    public Collection<InetAddress> pendingEndpointsFor(Token token, String keyspaceName)
+    public Collection<InetAddressAndPort> pendingEndpointsFor(Token token, String keyspaceName)
     {
         PendingRangeMaps pendingRangeMaps = this.pendingRanges.get(keyspaceName);
         if (pendingRangeMaps == null)
@@ -1225,19 +1216,19 @@ public class TokenMetadata
     /**
      * @deprecated retained for benefit of old tests
      */
-    public Collection<InetAddress> getWriteEndpoints(Token token, String keyspaceName, Collection<InetAddress> naturalEndpoints)
+    public Collection<InetAddressAndPort> getWriteEndpoints(Token token, String keyspaceName, Collection<InetAddressAndPort> naturalEndpoints)
     {
         return ImmutableList.copyOf(Iterables.concat(naturalEndpoints, pendingEndpointsFor(token, keyspaceName)));
     }
 
     /** @return an endpoint to token multimap representation of tokenToEndpointMap (a copy) */
-    public Multimap<InetAddress, Token> getEndpointToTokenMapForReading()
+    public Multimap<InetAddressAndPort, Token> getEndpointToTokenMapForReading()
     {
         lock.readLock().lock();
         try
         {
-            Multimap<InetAddress, Token> cloned = HashMultimap.create();
-            for (Map.Entry<Token, InetAddress> entry : tokenToEndpointMap.entrySet())
+            Multimap<InetAddressAndPort, Token> cloned = HashMultimap.create();
+            for (Map.Entry<Token, InetAddressAndPort> entry : tokenToEndpointMap.entrySet())
                 cloned.put(entry.getValue(), entry.getKey());
             return cloned;
         }
@@ -1251,12 +1242,12 @@ public class TokenMetadata
      * @return a (stable copy, won't be modified) Token to Endpoint map for all the normal and bootstrapping nodes
      *         in the cluster.
      */
-    public Map<Token, InetAddress> getNormalAndBootstrappingTokenToEndpointMap()
+    public Map<Token, InetAddressAndPort> getNormalAndBootstrappingTokenToEndpointMap()
     {
         lock.readLock().lock();
         try
         {
-            Map<Token, InetAddress> map = new HashMap<>(tokenToEndpointMap.size() + bootstrapTokens.size());
+            Map<Token, InetAddressAndPort> map = new HashMap<>(tokenToEndpointMap.size() + bootstrapTokens.size());
             map.putAll(tokenToEndpointMap);
             map.putAll(bootstrapTokens);
             return map;
@@ -1302,11 +1293,11 @@ public class TokenMetadata
     public static class Topology
     {
         /** multi-map of DC to endpoints in that DC */
-        private final Multimap<String, InetAddress> dcEndpoints;
+        private final Multimap<String, InetAddressAndPort> dcEndpoints;
         /** map of DC to multi-map of rack to endpoints in that rack */
-        private final Map<String, Multimap<String, InetAddress>> dcRacks;
+        private final Map<String, Multimap<String, InetAddressAndPort>> dcRacks;
         /** reverse-lookup map for endpoint to current known dc/rack assignment */
-        private final Map<InetAddress, Pair<String, String>> currentLocations;
+        private final Map<InetAddressAndPort, Pair<String, String>> currentLocations;
 
         Topology()
         {
@@ -1337,7 +1328,7 @@ public class TokenMetadata
         /**
          * Stores current DC/rack assignment for ep
          */
-        void addEndpoint(InetAddress ep)
+        void addEndpoint(InetAddressAndPort ep)
         {
             IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
             String dc = snitch.getDatacenter(ep);
@@ -1353,12 +1344,12 @@ public class TokenMetadata
             doAddEndpoint(ep, dc, rack);
         }
 
-        private void doAddEndpoint(InetAddress ep, String dc, String rack)
+        private void doAddEndpoint(InetAddressAndPort ep, String dc, String rack)
         {
             dcEndpoints.put(dc, ep);
 
             if (!dcRacks.containsKey(dc))
-                dcRacks.put(dc, HashMultimap.<String, InetAddress>create());
+                dcRacks.put(dc, HashMultimap.create());
             dcRacks.get(dc).put(rack, ep);
 
             currentLocations.put(ep, Pair.create(dc, rack));
@@ -1367,7 +1358,7 @@ public class TokenMetadata
         /**
          * Removes current DC/rack assignment for ep
          */
-        void removeEndpoint(InetAddress ep)
+        void removeEndpoint(InetAddressAndPort ep)
         {
             if (!currentLocations.containsKey(ep))
                 return;
@@ -1375,13 +1366,13 @@ public class TokenMetadata
             doRemoveEndpoint(ep, currentLocations.remove(ep));
         }
 
-        private void doRemoveEndpoint(InetAddress ep, Pair<String, String> current)
+        private void doRemoveEndpoint(InetAddressAndPort ep, Pair<String, String> current)
         {
             dcRacks.get(current.left).remove(current.right, ep);
             dcEndpoints.remove(current.left, ep);
         }
 
-        void updateEndpoint(InetAddress ep)
+        void updateEndpoint(InetAddressAndPort ep)
         {
             IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
             if (snitch == null || !currentLocations.containsKey(ep))
@@ -1396,11 +1387,11 @@ public class TokenMetadata
             if (snitch == null)
                 return;
 
-            for (InetAddress ep : currentLocations.keySet())
+            for (InetAddressAndPort ep : currentLocations.keySet())
                 updateEndpoint(ep, snitch);
         }
 
-        private void updateEndpoint(InetAddress ep, IEndpointSnitch snitch)
+        private void updateEndpoint(InetAddressAndPort ep, IEndpointSnitch snitch)
         {
             Pair<String, String> current = currentLocations.get(ep);
             String dc = snitch.getDatacenter(ep);
@@ -1415,7 +1406,7 @@ public class TokenMetadata
         /**
          * @return multi-map of DC to endpoints in that DC
          */
-        public Multimap<String, InetAddress> getDatacenterEndpoints()
+        public Multimap<String, InetAddressAndPort> getDatacenterEndpoints()
         {
             return dcEndpoints;
         }
@@ -1423,7 +1414,7 @@ public class TokenMetadata
         /**
          * @return map of DC to multi-map of rack to endpoints in that rack
          */
-        public Map<String, Multimap<String, InetAddress>> getDatacenterRacks()
+        public Map<String, Multimap<String, InetAddressAndPort>> getDatacenterRacks()
         {
             return dcRacks;
         }
@@ -1431,7 +1422,7 @@ public class TokenMetadata
         /**
          * @return The DC and rack of the given endpoint.
          */
-        public Pair<String, String> getLocation(InetAddress addr)
+        public Pair<String, String> getLocation(InetAddressAndPort addr)
         {
             return currentLocations.get(addr);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java b/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
index 7815784..3655a40 100644
--- a/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
@@ -17,14 +17,14 @@
  */
 package org.apache.cassandra.metrics;
 
-import java.net.InetAddress;
-
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Meter;
 import org.apache.cassandra.net.async.OutboundMessagingPool;
 
 import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
+
 /**
  * Metrics for internode connections.
  */
@@ -65,10 +65,10 @@ public class ConnectionMetrics
      *
      * @param ip IP address to use for metrics label
      */
-    public ConnectionMetrics(InetAddress ip, final OutboundMessagingPool messagingPool)
+    public ConnectionMetrics(InetAddressAndPort ip, final OutboundMessagingPool messagingPool)
     {
         // ipv6 addresses will contain colons, which are invalid in a JMX ObjectName
-        address = ip.getHostAddress().replace(':', '.');
+        address = ip.toString().replace(':', '.');
 
         factory = new DefaultNameFactory("Connection", address);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java b/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java
index 052830a..56888da 100644
--- a/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.metrics;
 
-import java.net.InetAddress;
 import java.util.Map.Entry;
 
 import com.google.common.util.concurrent.MoreExecutors;
@@ -27,6 +26,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
 
 import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.utils.UUIDGen;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,28 +43,28 @@ public class HintedHandoffMetrics
     private static final MetricNameFactory factory = new DefaultNameFactory("HintedHandOffManager");
 
     /** Total number of hints which are not stored, This is not a cache. */
-    private final LoadingCache<InetAddress, DifferencingCounter> notStored = Caffeine.newBuilder()
-         .executor(MoreExecutors.directExecutor())
-         .build(DifferencingCounter::new);
+    private final LoadingCache<InetAddressAndPort, DifferencingCounter> notStored = Caffeine.newBuilder()
+                                                                                            .executor(MoreExecutors.directExecutor())
+                                                                                            .build(DifferencingCounter::new);
 
     /** Total number of hints that have been created, This is not a cache. */
-    private final LoadingCache<InetAddress, Counter> createdHintCounts = Caffeine.newBuilder()
-         .executor(MoreExecutors.directExecutor())
-         .build(address -> Metrics.counter(factory.createMetricName("Hints_created-" + address.getHostAddress().replace(':', '.'))));
+    private final LoadingCache<InetAddressAndPort, Counter> createdHintCounts = Caffeine.newBuilder()
+                                                                                        .executor(MoreExecutors.directExecutor())
+                                                                                        .build(address -> Metrics.counter(factory.createMetricName("Hints_created-" + address.toString().replace(':', '.'))));
 
-    public void incrCreatedHints(InetAddress address)
+    public void incrCreatedHints(InetAddressAndPort address)
     {
         createdHintCounts.get(address).inc();
     }
 
-    public void incrPastWindow(InetAddress address)
+    public void incrPastWindow(InetAddressAndPort address)
     {
         notStored.get(address).mark();
     }
 
     public void log()
     {
-        for (Entry<InetAddress, DifferencingCounter> entry : notStored.asMap().entrySet())
+        for (Entry<InetAddressAndPort, DifferencingCounter> entry : notStored.asMap().entrySet())
         {
             long difference = entry.getValue().difference();
             if (difference == 0)
@@ -79,9 +79,10 @@ public class HintedHandoffMetrics
         private final Counter meter;
         private long reported = 0;
 
-        public DifferencingCounter(InetAddress address)
+        public DifferencingCounter(InetAddressAndPort address)
         {
-            this.meter = Metrics.counter(factory.createMetricName("Hints_not_stored-" + address.getHostAddress().replace(':', '.')));
+            //This changes the name of the metric, people can update their monitoring when upgrading?
+            this.meter = Metrics.counter(factory.createMetricName("Hints_not_stored-" + address.toString().replace(':', '.')));
         }
 
         public long difference()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java b/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java
index d6a75f7..424f502 100644
--- a/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java
@@ -17,8 +17,6 @@
  */
 package org.apache.cassandra.metrics;
 
-import java.net.InetAddress;
-
 import com.google.common.util.concurrent.MoreExecutors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,6 +25,7 @@ import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
+import org.apache.cassandra.locator.InetAddressAndPort;
 
 import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
 
@@ -47,11 +46,11 @@ public final class HintsServiceMetrics
     private static final Histogram globalDelayHistogram = Metrics.histogram(factory.createMetricName("Hint_delays"), false);
 
     /** Histograms per-endpoint of hint delivery delays, This is not a cache. */
-    private static final LoadingCache<InetAddress, Histogram> delayByEndpoint = Caffeine.newBuilder()
-        .executor(MoreExecutors.directExecutor())
-        .build(address -> Metrics.histogram(factory.createMetricName("Hint_delays-"+address.getHostAddress().replace(':', '.')), false));
+    private static final LoadingCache<InetAddressAndPort, Histogram> delayByEndpoint = Caffeine.newBuilder()
+                                                                                               .executor(MoreExecutors.directExecutor())
+                                                                                               .build(address -> Metrics.histogram(factory.createMetricName("Hint_delays-"+address.toString().replace(':', '.')), false));
 
-    public static void updateDelayMetrics(InetAddress endpoint, long delay)
+    public static void updateDelayMetrics(InetAddressAndPort endpoint, long delay)
     {
         if (delay <= 0)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/metrics/MessagingMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/MessagingMetrics.java b/src/java/org/apache/cassandra/metrics/MessagingMetrics.java
index 5f640b9..2f096f6 100644
--- a/src/java/org/apache/cassandra/metrics/MessagingMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/MessagingMetrics.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.metrics;
 
-import java.net.InetAddress;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
@@ -26,6 +25,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.codahale.metrics.Timer;
+import org.apache.cassandra.locator.InetAddressAndPort;
 
 import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
 
@@ -47,7 +47,7 @@ public class MessagingMetrics
         queueWaitLatency = new ConcurrentHashMap<>();
     }
 
-    public void addTimeTaken(InetAddress from, long timeTaken)
+    public void addTimeTaken(InetAddressAndPort from, long timeTaken)
     {
         String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(from);
         Timer timer = dcLatency.get(dc);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/metrics/StreamingMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/StreamingMetrics.java b/src/java/org/apache/cassandra/metrics/StreamingMetrics.java
index 72e9b23..d220ca5 100644
--- a/src/java/org/apache/cassandra/metrics/StreamingMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/StreamingMetrics.java
@@ -17,11 +17,12 @@
  */
 package org.apache.cassandra.metrics;
 
-import java.net.InetAddress;
 import java.util.concurrent.ConcurrentMap;
 
 
 import com.codahale.metrics.Counter;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
@@ -33,7 +34,7 @@ public class StreamingMetrics
 {
     public static final String TYPE_NAME = "Streaming";
 
-    private static final ConcurrentMap<InetAddress, StreamingMetrics> instances = new NonBlockingHashMap<InetAddress, StreamingMetrics>();
+    private static final ConcurrentMap<InetAddressAndPort, StreamingMetrics> instances = new NonBlockingHashMap<>();
 
     public static final Counter activeStreamsOutbound = Metrics.counter(DefaultNameFactory.createMetricName(TYPE_NAME, "ActiveOutboundStreams", null));
     public static final Counter totalIncomingBytes = Metrics.counter(DefaultNameFactory.createMetricName(TYPE_NAME, "TotalIncomingBytes", null));
@@ -41,7 +42,7 @@ public class StreamingMetrics
     public final Counter incomingBytes;
     public final Counter outgoingBytes;
 
-    public static StreamingMetrics get(InetAddress ip)
+    public static StreamingMetrics get(InetAddressAndPort ip)
     {
        StreamingMetrics metrics = instances.get(ip);
        if (metrics == null)
@@ -52,9 +53,9 @@ public class StreamingMetrics
        return metrics;
     }
 
-    public StreamingMetrics(final InetAddress peer)
+    public StreamingMetrics(final InetAddressAndPort peer)
     {
-        MetricNameFactory factory = new DefaultNameFactory("Streaming", peer.getHostAddress().replace(':', '.'));
+        MetricNameFactory factory = new DefaultNameFactory("Streaming", peer.toString().replace(':', '.'));
         incomingBytes = Metrics.counter(factory.createMetricName("IncomingBytes"));
         outgoingBytes= Metrics.counter(factory.createMetricName("OutgoingBytes"));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/BackPressureState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/BackPressureState.java b/src/java/org/apache/cassandra/net/BackPressureState.java
index 34fd0dd..886c075 100644
--- a/src/java/org/apache/cassandra/net/BackPressureState.java
+++ b/src/java/org/apache/cassandra/net/BackPressureState.java
@@ -17,7 +17,7 @@
  */
 package org.apache.cassandra.net;
 
-import java.net.InetAddress;
+import org.apache.cassandra.locator.InetAddressAndPort;
 
 /**
  * Interface meant to track the back-pressure state per replica host.
@@ -47,5 +47,5 @@ public interface BackPressureState
     /**
      * Returns the host this state refers to.
      */
-    InetAddress getHost();
+    InetAddressAndPort getHost();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/BackPressureStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/BackPressureStrategy.java b/src/java/org/apache/cassandra/net/BackPressureStrategy.java
index 78f748b..6b49495 100644
--- a/src/java/org/apache/cassandra/net/BackPressureStrategy.java
+++ b/src/java/org/apache/cassandra/net/BackPressureStrategy.java
@@ -17,10 +17,11 @@
  */
 package org.apache.cassandra.net;
 
-import java.net.InetAddress;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
+
 /**
  * Back-pressure algorithm interface.
  * <p>
@@ -39,5 +40,5 @@ public interface BackPressureStrategy<S extends BackPressureState>
     /**
      * Creates a new {@link BackPressureState} initialized as needed by the specific implementation.
      */
-    S newState(InetAddress host);
+    S newState(InetAddressAndPort host);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/CallbackInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/CallbackInfo.java b/src/java/org/apache/cassandra/net/CallbackInfo.java
index ea000ae..f2ed8a1 100644
--- a/src/java/org/apache/cassandra/net/CallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/CallbackInfo.java
@@ -17,9 +17,8 @@
  */
 package org.apache.cassandra.net;
 
-import java.net.InetAddress;
-
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.locator.InetAddressAndPort;
 
 /**
  * Encapsulates the callback information.
@@ -28,7 +27,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
  */
 public class CallbackInfo
 {
-    protected final InetAddress target;
+    protected final InetAddressAndPort target;
     protected final IAsyncCallback callback;
     protected final IVersionedSerializer<?> serializer;
     private final boolean failureCallback;
@@ -41,7 +40,7 @@ public class CallbackInfo
      * @param serializer serializer to deserialize response message
      * @param failureCallback True when we have a callback to handle failures
      */
-    public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?> serializer, boolean failureCallback)
+    public CallbackInfo(InetAddressAndPort target, IAsyncCallback callback, IVersionedSerializer<?> serializer, boolean failureCallback)
     {
         this.target = target;
         this.callback = callback;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java b/src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java
index 83bbbf3..b58ca47 100644
--- a/src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java
+++ b/src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java
@@ -21,28 +21,108 @@ import java.io.*;
 import java.net.Inet4Address;
 import java.net.Inet6Address;
 import java.net.InetAddress;
+import java.nio.ByteBuffer;
 
-public class CompactEndpointSerializationHelper
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.streaming.messages.StreamMessage;
+
+/*
+ * As of version 4.0 the endpoint description includes a port number as an unsigned short
+ */
+public class CompactEndpointSerializationHelper implements IVersionedSerializer<InetAddressAndPort>
 {
-    public static void serialize(InetAddress endpoint, DataOutput out) throws IOException
+    public static final IVersionedSerializer<InetAddressAndPort> instance = new CompactEndpointSerializationHelper();
+
+    /**
+     * Streaming uses its own version numbering so we need to ignore it and always use currrent version.
+     * There is no cross version streaming so it will always use the latest address serialization.
+     **/
+    public static final IVersionedSerializer<InetAddressAndPort> streamingInstance = new IVersionedSerializer<InetAddressAndPort>()
     {
-        byte[] buf = endpoint.getAddress();
-        out.writeByte(buf.length);
-        out.write(buf);
+        public void serialize(InetAddressAndPort inetAddressAndPort, DataOutputPlus out, int version) throws IOException
+        {
+            instance.serialize(inetAddressAndPort, out, MessagingService.current_version);
+        }
+
+        public InetAddressAndPort deserialize(DataInputPlus in, int version) throws IOException
+        {
+            return instance.deserialize(in, MessagingService.current_version);
+        }
+
+        public long serializedSize(InetAddressAndPort inetAddressAndPort, int version)
+        {
+            return instance.serializedSize(inetAddressAndPort, MessagingService.current_version);
+        }
+    };
+
+    private CompactEndpointSerializationHelper() {}
+
+    public void serialize(InetAddressAndPort endpoint, DataOutputPlus out, int version) throws IOException
+    {
+        if (version >= MessagingService.VERSION_40)
+        {
+            byte[] buf = endpoint.addressBytes;
+            out.writeByte(buf.length + 2);
+            out.write(buf);
+            out.writeShort(endpoint.port);
+        }
+        else
+        {
+            byte[] buf = endpoint.addressBytes;
+            out.writeByte(buf.length);
+            out.write(buf);
+        }
     }
 
-    public static InetAddress deserialize(DataInput in) throws IOException
+    public InetAddressAndPort deserialize(DataInputPlus in, int version) throws IOException
     {
-        byte[] bytes = new byte[in.readByte()];
-        in.readFully(bytes, 0, bytes.length);
-        return InetAddress.getByAddress(bytes);
+        int size = in.readByte() & 0xFF;
+        switch(size)
+        {
+            //The original pre-4.0 serialiation of just an address
+            case 4:
+            case 16:
+            {
+                byte[] bytes = new byte[size];
+                in.readFully(bytes, 0, bytes.length);
+                return InetAddressAndPort.getByAddress(bytes);
+            }
+            //Address and one port
+            case 6:
+            case 18:
+            {
+                byte[] bytes = new byte[size - 2];
+                in.readFully(bytes);
+
+                int port = in.readShort() & 0xFFFF;
+                return InetAddressAndPort.getByAddressOverrideDefaults(InetAddress.getByAddress(bytes), bytes, port);
+            }
+            default:
+                throw new AssertionError("Unexpected size " + size);
+
+        }
     }
 
-    public static int serializedSize(InetAddress from)
+    public long serializedSize(InetAddressAndPort from, int version)
     {
-        if (from instanceof Inet4Address)
-            return 1 + 4;
-        assert from instanceof Inet6Address;
-        return 1 + 16;
+        //4.0 includes a port number
+        if (version >= MessagingService.VERSION_40)
+        {
+            if (from.address instanceof Inet4Address)
+                return 1 + 4 + 2;
+            assert from.address instanceof Inet6Address;
+            return 1 + 16 + 2;
+        }
+        else
+        {
+            if (from.address instanceof Inet4Address)
+                return 1 + 4;
+            assert from.address instanceof Inet6Address;
+            return 1 + 16;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/ForwardToContainer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/ForwardToContainer.java b/src/java/org/apache/cassandra/net/ForwardToContainer.java
new file mode 100644
index 0000000..ac9e725
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/ForwardToContainer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.util.Collection;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+/**
+ * Contains forward to information until it can be serialized as part of a message using a version
+ * specific serialization
+ */
+public class ForwardToContainer
+{
+    public final Collection<InetAddressAndPort> targets;
+    public final int[] messageIds;
+
+    public ForwardToContainer(Collection<InetAddressAndPort> targets,
+                              int[] messageIds)
+    {
+        Preconditions.checkArgument(targets.size() == messageIds.length);
+        this.targets = targets;
+        this.messageIds = messageIds;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org