You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/01/25 20:12:06 UTC

[09/19] cassandra git commit: Allow storage port to be configurable per node

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 89d5358..cda575a 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -158,23 +158,23 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public Collection<Range<Token>> getLocalRanges(String keyspaceName)
     {
-        return getRangesForEndpoint(keyspaceName, FBUtilities.getBroadcastAddress());
+        return getRangesForEndpoint(keyspaceName, FBUtilities.getBroadcastAddressAndPort());
     }
 
     public Collection<Range<Token>> getPrimaryRanges(String keyspace)
     {
-        return getPrimaryRangesForEndpoint(keyspace, FBUtilities.getBroadcastAddress());
+        return getPrimaryRangesForEndpoint(keyspace, FBUtilities.getBroadcastAddressAndPort());
     }
 
     public Collection<Range<Token>> getPrimaryRangesWithinDC(String keyspace)
     {
-        return getPrimaryRangeForEndpointWithinDC(keyspace, FBUtilities.getBroadcastAddress());
+        return getPrimaryRangeForEndpointWithinDC(keyspace, FBUtilities.getBroadcastAddressAndPort());
     }
 
-    private final Set<InetAddress> replicatingNodes = Collections.synchronizedSet(new HashSet<InetAddress>());
+    private final Set<InetAddressAndPort> replicatingNodes = Collections.synchronizedSet(new HashSet<InetAddressAndPort>());
     private CassandraDaemon daemon;
 
-    private InetAddress removingNode;
+    private InetAddressAndPort removingNode;
 
     /* Are we starting this node in bootstrap mode? */
     private volatile boolean isBootstrapMode;
@@ -225,7 +225,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         SystemKeyspace.updateTokens(tokens);
         Collection<Token> localTokens = getLocalTokens();
         setGossipTokens(localTokens);
-        tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
+        tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddressAndPort());
         setMode(Mode.NORMAL, false);
     }
 
@@ -233,6 +233,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     {
         List<Pair<ApplicationState, VersionedValue>> states = new ArrayList<Pair<ApplicationState, VersionedValue>>();
         states.add(Pair.create(ApplicationState.TOKENS, valueFactory.tokens(tokens)));
+        states.add(Pair.create(ApplicationState.STATUS_WITH_PORT, valueFactory.normal(tokens)));
         states.add(Pair.create(ApplicationState.STATUS, valueFactory.normal(tokens)));
         Gossiper.instance.addLocalApplicationStates(states);
     }
@@ -407,7 +408,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      * they get the Gossip shutdown message, so even if
      * we don't get time to broadcast this, it is not a problem.
      *
-     * See {@link Gossiper#markAsShutdown(InetAddress)}
+     * See {@link Gossiper#markAsShutdown(InetAddressAndPort)}
      */
     private void shutdownClientServers()
     {
@@ -463,9 +464,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                                        "To perform this operation, please restart with " +
                                        "-Dcassandra.allow_unsafe_replace=true");
 
-        InetAddress replaceAddress = DatabaseDescriptor.getReplaceAddress();
+        InetAddressAndPort replaceAddress = DatabaseDescriptor.getReplaceAddress();
         logger.info("Gathering node replacement information for {}", replaceAddress);
-        Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
+        Map<InetAddressAndPort, EndpointState> epStates = Gossiper.instance.doShadowRound();
         // as we've completed the shadow round of gossip, we should be able to find the node we're replacing
         if (epStates.get(replaceAddress) == null)
             throw new RuntimeException(String.format("Cannot replace_address %s because it doesn't exist in gossip", replaceAddress));
@@ -503,25 +504,32 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
 
         logger.debug("Starting shadow gossip round to check for endpoint collision");
-        Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
+        Map<InetAddressAndPort, EndpointState> epStates = Gossiper.instance.doShadowRound();
         // If bootstrapping, check whether any previously known status for the endpoint makes it unsafe to do so.
         // If not bootstrapping, compare the host id for this endpoint learned from gossip (if any) with the local
         // one, which was either read from system.local or generated at startup. If a learned id is present &
         // doesn't match the local, then the node needs replacing
-        if (!Gossiper.instance.isSafeForStartup(FBUtilities.getBroadcastAddress(), localHostId, shouldBootstrap(), epStates))
+        if (!Gossiper.instance.isSafeForStartup(FBUtilities.getBroadcastAddressAndPort(), localHostId, shouldBootstrap(), epStates))
         {
             throw new RuntimeException(String.format("A node with address %s already exists, cancelling join. " +
                                                      "Use cassandra.replace_address if you want to replace this node.",
-                                                     FBUtilities.getBroadcastAddress()));
+                                                     FBUtilities.getBroadcastAddressAndPort()));
         }
 
         if (shouldBootstrap() && useStrictConsistency && !allowSimultaneousMoves())
         {
-            for (Map.Entry<InetAddress, EndpointState> entry : epStates.entrySet())
+            for (Map.Entry<InetAddressAndPort, EndpointState> entry : epStates.entrySet())
             {
                 // ignore local node or empty status
-                if (entry.getKey().equals(FBUtilities.getBroadcastAddress()) || entry.getValue().getApplicationState(ApplicationState.STATUS) == null)
+                if (entry.getKey().equals(FBUtilities.getBroadcastAddressAndPort()) || (entry.getValue().getApplicationState(ApplicationState.STATUS_WITH_PORT) == null & entry.getValue().getApplicationState(ApplicationState.STATUS) == null))
                     continue;
+
+                VersionedValue value = entry.getValue().getApplicationState(ApplicationState.STATUS_WITH_PORT);
+                if (value == null)
+                {
+                    value = entry.getValue().getApplicationState(ApplicationState.STATUS);
+                }
+
                 String[] pieces = splitValue(entry.getValue().getApplicationState(ApplicationState.STATUS));
                 assert (pieces.length > 0);
                 String state = pieces[0];
@@ -553,10 +561,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         if (Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true")))
         {
             logger.info("Populating token metadata from system tables");
-            Multimap<InetAddress, Token> loadedTokens = SystemKeyspace.loadTokens();
+            Multimap<InetAddressAndPort, Token> loadedTokens = SystemKeyspace.loadTokens();
             if (!shouldBootstrap()) // if we have not completed bootstrapping, we should not add ourselves as a normal token
-                loadedTokens.putAll(FBUtilities.getBroadcastAddress(), SystemKeyspace.getSavedTokens());
-            for (InetAddress ep : loadedTokens.keySet())
+                loadedTokens.putAll(FBUtilities.getBroadcastAddressAndPort(), SystemKeyspace.getSavedTokens());
+            for (InetAddressAndPort ep : loadedTokens.keySet())
                 tokenMetadata.updateNormalTokens(loadedTokens.get(ep), ep);
 
             logger.info("Token metadata: {}", tokenMetadata);
@@ -640,10 +648,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             Collection<Token> tokens = SystemKeyspace.getSavedTokens();
             if (!tokens.isEmpty())
             {
-                tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
+                tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddressAndPort());
                 // order is important here, the gossiper can fire in between adding these two states.  It's ok to send TOKENS without STATUS, but *not* vice versa.
                 List<Pair<ApplicationState, VersionedValue>> states = new ArrayList<Pair<ApplicationState, VersionedValue>>();
                 states.add(Pair.create(ApplicationState.TOKENS, valueFactory.tokens(tokens)));
+                states.add(Pair.create(ApplicationState.STATUS_WITH_PORT, valueFactory.hibernate(true)));
                 states.add(Pair.create(ApplicationState.STATUS, valueFactory.hibernate(true)));
                 Gossiper.instance.addLocalApplicationStates(states);
             }
@@ -659,11 +668,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         if (Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true")))
         {
             logger.info("Loading persisted ring state");
-            Multimap<InetAddress, Token> loadedTokens = SystemKeyspace.loadTokens();
-            Map<InetAddress, UUID> loadedHostIds = SystemKeyspace.loadHostIds();
-            for (InetAddress ep : loadedTokens.keySet())
+            Multimap<InetAddressAndPort, Token> loadedTokens = SystemKeyspace.loadTokens();
+            Map<InetAddressAndPort, UUID> loadedHostIds = SystemKeyspace.loadHostIds();
+            for (InetAddressAndPort ep : loadedTokens.keySet())
             {
-                if (ep.equals(FBUtilities.getBroadcastAddress()))
+                if (ep.equals(FBUtilities.getBroadcastAddressAndPort()))
                 {
                     // entry has been mistakenly added, delete it
                     SystemKeyspace.removeEndpoint(ep);
@@ -707,7 +716,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public static boolean isSeed()
     {
-        return DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress());
+        return DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddressAndPort());
     }
 
     private void prepareToJoin() throws ConfigurationException
@@ -753,6 +762,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                                 "the node to be replaced ({}). If the previous node has been down for longer than max_hint_window_in_ms, " +
                                 "repair must be run after the replacement process in order to make this node consistent.",
                                 DatabaseDescriptor.getReplaceAddress());
+                    appStates.put(ApplicationState.STATUS_WITH_PORT, valueFactory.hibernate(true));
                     appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true));
                 }
             }
@@ -765,10 +775,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             // for bootstrap to get the load info it needs.
             // (we won't be part of the storage ring though until we add a counterId to our state, below.)
             // Seed the host ID-to-endpoint map with our own ID.
-            getTokenMetadata().updateHostId(localHostId, FBUtilities.getBroadcastAddress());
+            getTokenMetadata().updateHostId(localHostId, FBUtilities.getBroadcastAddressAndPort());
             appStates.put(ApplicationState.NET_VERSION, valueFactory.networkVersion());
             appStates.put(ApplicationState.HOST_ID, valueFactory.hostId(localHostId));
-            appStates.put(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(FBUtilities.getBroadcastRpcAddress()));
+            appStates.put(ApplicationState.NATIVE_ADDRESS_AND_PORT, valueFactory.nativeaddressAndPort(FBUtilities.getBroadcastNativeAddressAndPort()));
+            appStates.put(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(FBUtilities.getJustBroadcastNativeAddress()));
             appStates.put(ApplicationState.RELEASE_VERSION, valueFactory.releaseVersion());
 
             // load the persisted ring state. This used to be done earlier in the init process,
@@ -826,16 +837,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         //
         // We attempted to replace this with a schema-presence check, but you need a meaningful sleep
         // to get schema info from gossip which defeats the purpose.  See CASSANDRA-4427 for the gory details.
-        Set<InetAddress> current = new HashSet<>();
+        Set<InetAddressAndPort> current = new HashSet<>();
         if (logger.isDebugEnabled())
         {
             logger.debug("Bootstrap variables: {} {} {} {}",
                          DatabaseDescriptor.isAutoBootstrap(),
                          SystemKeyspace.bootstrapInProgress(),
                          SystemKeyspace.bootstrapComplete(),
-                         DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress()));
+                         DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddressAndPort()));
         }
-        if (DatabaseDescriptor.isAutoBootstrap() && !SystemKeyspace.bootstrapComplete() && DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress()))
+        if (DatabaseDescriptor.isAutoBootstrap() && !SystemKeyspace.bootstrapComplete() && DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddressAndPort()))
         {
             logger.info("This node will not auto bootstrap because it is configured to be a seed node.");
         }
@@ -873,13 +884,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             // get bootstrap tokens
             if (!replacing)
             {
-                if (tokenMetadata.isMember(FBUtilities.getBroadcastAddress()))
+                if (tokenMetadata.isMember(FBUtilities.getBroadcastAddressAndPort()))
                 {
                     String s = "This node is already a member of the token ring; bootstrap aborted. (If replacing a dead node, remove the old one from the ring first.)";
                     throw new UnsupportedOperationException(s);
                 }
                 setMode(Mode.JOINING, "getting bootstrap token", true);
-                bootstrapTokens = BootStrapper.getBootstrapTokens(tokenMetadata, FBUtilities.getBroadcastAddress(), delay);
+                bootstrapTokens = BootStrapper.getBootstrapTokens(tokenMetadata, FBUtilities.getBroadcastAddressAndPort(), delay);
             }
             else
             {
@@ -899,7 +910,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                     // check for operator errors...
                     for (Token token : bootstrapTokens)
                     {
-                        InetAddress existing = tokenMetadata.getEndpoint(token);
+                        InetAddressAndPort existing = tokenMetadata.getEndpoint(token);
                         if (existing != null)
                         {
                             long nanoDelay = delay * 1000000L;
@@ -935,7 +946,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             bootstrapTokens = SystemKeyspace.getSavedTokens();
             if (bootstrapTokens.isEmpty())
             {
-                bootstrapTokens = BootStrapper.getBootstrapTokens(tokenMetadata, FBUtilities.getBroadcastAddress(), delay);
+                bootstrapTokens = BootStrapper.getBootstrapTokens(tokenMetadata, FBUtilities.getBroadcastAddressAndPort(), delay);
             }
             else
             {
@@ -958,7 +969,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 // remove the existing info about the replaced node.
                 if (!current.isEmpty())
                 {
-                    for (InetAddress existing : current)
+                    for (InetAddressAndPort existing : current)
                         Gossiper.instance.replacedEndpoint(existing);
                 }
             }
@@ -975,15 +986,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public static boolean isReplacingSameAddress()
     {
-        InetAddress replaceAddress = DatabaseDescriptor.getReplaceAddress();
-        return replaceAddress != null && replaceAddress.equals(FBUtilities.getBroadcastAddress());
+        InetAddressAndPort replaceAddress = DatabaseDescriptor.getReplaceAddress();
+        return replaceAddress != null && replaceAddress.equals(FBUtilities.getBroadcastAddressAndPort());
     }
 
     public void gossipSnitchInfo()
     {
         IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-        String dc = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
-        String rack = snitch.getRack(FBUtilities.getBroadcastAddress());
+        String dc = snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+        String rack = snitch.getRack(FBUtilities.getBroadcastAddressAndPort());
         Gossiper.instance.addLocalApplicationState(ApplicationState.DC, StorageService.instance.valueFactory.datacenter(dc));
         Gossiper.instance.addLocalApplicationState(ApplicationState.RACK, StorageService.instance.valueFactory.rack(rack));
     }
@@ -1111,7 +1122,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public boolean isJoined()
     {
-        return tokenMetadata.isMember(FBUtilities.getBroadcastAddress()) && !isSurveyMode;
+        return tokenMetadata.isMember(FBUtilities.getBroadcastAddressAndPort()) && !isSurveyMode;
     }
 
     public void rebuild(String sourceDc)
@@ -1141,7 +1152,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         {
             RangeStreamer streamer = new RangeStreamer(tokenMetadata,
                                                        null,
-                                                       FBUtilities.getBroadcastAddress(),
+                                                       FBUtilities.getBroadcastAddressAndPort(),
                                                        StreamOperation.REBUILD,
                                                        useStrictConsistency && !replacing,
                                                        DatabaseDescriptor.getEndpointSnitch(),
@@ -1202,13 +1213,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 if (specificSources != null)
                 {
                     String[] stringHosts = specificSources.split(",");
-                    Set<InetAddress> sources = new HashSet<>(stringHosts.length);
+                    Set<InetAddressAndPort> sources = new HashSet<>(stringHosts.length);
                     for (String stringHost : stringHosts)
                     {
                         try
                         {
-                            InetAddress endpoint = InetAddress.getByName(stringHost);
-                            if (FBUtilities.getBroadcastAddress().equals(endpoint))
+                            InetAddressAndPort endpoint = InetAddressAndPort.getByName(stringHost);
+                            if (FBUtilities.getBroadcastAddressAndPort().equals(endpoint))
                             {
                                 throw new IllegalArgumentException("This host was specified as a source for rebuilding. Sources for a rebuild can only be other nodes in the cluster.");
                             }
@@ -1449,8 +1460,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             // if not an existing token then bootstrap
             List<Pair<ApplicationState, VersionedValue>> states = new ArrayList<>();
             states.add(Pair.create(ApplicationState.TOKENS, valueFactory.tokens(tokens)));
+            states.add(Pair.create(ApplicationState.STATUS_WITH_PORT, replacing?
+                                                            valueFactory.bootReplacingWithPort(DatabaseDescriptor.getReplaceAddress()) :
+                                                            valueFactory.bootstrapping(tokens)));
             states.add(Pair.create(ApplicationState.STATUS, replacing?
-                                                            valueFactory.bootReplacing(DatabaseDescriptor.getReplaceAddress()) :
+                                                            valueFactory.bootReplacing(DatabaseDescriptor.getReplaceAddress().address) :
                                                             valueFactory.bootstrapping(tokens)));
             Gossiper.instance.addLocalApplicationStates(states);
             setMode(Mode.JOINING, "sleeping " + RING_DELAY + " ms for pending range setup", true);
@@ -1459,7 +1473,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         else
         {
             // Dont set any state for the node which is bootstrapping the existing token...
-            tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
+            tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddressAndPort());
             SystemKeyspace.removeEndpoint(DatabaseDescriptor.getReplaceAddress());
         }
         if (!Gossiper.instance.seenAnySeed())
@@ -1475,7 +1489,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         invalidateDiskBoundaries();
 
         setMode(Mode.JOINING, "Starting to bootstrap...", true);
-        BootStrapper bootstrapper = new BootStrapper(FBUtilities.getBroadcastAddress(), tokens, tokenMetadata);
+        BootStrapper bootstrapper = new BootStrapper(FBUtilities.getBroadcastAddressAndPort(), tokens, tokenMetadata);
         bootstrapper.addProgressListener(progressSupport);
         ListenableFuture<StreamState> bootstrapStream = bootstrapper.bootstrap(streamStateStore, useStrictConsistency && !replacing); // handles token update
         Futures.addCallback(bootstrapStream, new FutureCallback<StreamState>()
@@ -1547,7 +1561,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             // get bootstrap tokens saved in system keyspace
             final Collection<Token> tokens = SystemKeyspace.getSavedTokens();
             // already bootstrapped ranges are filtered during bootstrap
-            BootStrapper bootstrapper = new BootStrapper(FBUtilities.getBroadcastAddress(), tokens, tokenMetadata);
+            BootStrapper bootstrapper = new BootStrapper(FBUtilities.getBroadcastAddressAndPort(), tokens, tokenMetadata);
             bootstrapper.addProgressListener(progressSupport);
             ListenableFuture<StreamState> bootstrapStream = bootstrapper.bootstrap(streamStateStore, useStrictConsistency && !replacing); // handles token update
             Futures.addCallback(bootstrapStream, new FutureCallback<StreamState>()
@@ -1608,35 +1622,67 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return tokenMetadata;
     }
 
+    public Map<List<String>, List<String>> getRangeToEndpointMap(String keyspace)
+    {
+        return getRangeToEndpointMap(keyspace, false);
+    }
+
+    public Map<List<String>, List<String>> getRangeToEndpointWithPortMap(String keyspace)
+    {
+         return getRangeToEndpointMap(keyspace, true);
+    }
+
     /**
      * for a keyspace, return the ranges and corresponding listen addresses.
      * @param keyspace
      * @return the endpoint map
      */
-    public Map<List<String>, List<String>> getRangeToEndpointMap(String keyspace)
+    public Map<List<String>, List<String>> getRangeToEndpointMap(String keyspace, boolean withPort)
     {
         /* All the ranges for the tokens */
         Map<List<String>, List<String>> map = new HashMap<>();
-        for (Map.Entry<Range<Token>,List<InetAddress>> entry : getRangeToAddressMap(keyspace).entrySet())
+        for (Map.Entry<Range<Token>,List<InetAddressAndPort>> entry : getRangeToAddressMap(keyspace).entrySet())
         {
-            map.put(entry.getKey().asList(), stringify(entry.getValue()));
+            map.put(entry.getKey().asList(), stringify(entry.getValue(), withPort));
         }
         return map;
     }
 
     /**
-     * Return the rpc address associated with an endpoint as a string.
+     * Return the native address associated with an endpoint as a string.
      * @param endpoint The endpoint to get rpc address for
-     * @return the rpc address
+     * @return the native address
      */
-    public String getRpcaddress(InetAddress endpoint)
+    public String getNativeaddress(InetAddressAndPort endpoint, boolean withPort)
     {
-        if (endpoint.equals(FBUtilities.getBroadcastAddress()))
-            return FBUtilities.getBroadcastRpcAddress().getHostAddress();
+        if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
+            return FBUtilities.getBroadcastNativeAddressAndPort().toString(withPort);
+        else if (Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.NATIVE_ADDRESS_AND_PORT) != null)
+        {
+            try
+            {
+                InetAddressAndPort address = InetAddressAndPort.getByName(Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RPC_ADDRESS).value);
+                return address.getHostAddress(withPort);
+            }
+            catch (UnknownHostException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
         else if (Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RPC_ADDRESS) == null)
-            return endpoint.getHostAddress();
+            return endpoint.address.getHostAddress() + ":" + DatabaseDescriptor.getNativeTransportPort();
         else
-            return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RPC_ADDRESS).value;
+            return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RPC_ADDRESS).value + ":" + DatabaseDescriptor.getNativeTransportPort();
+    }
+
+    public Map<List<String>, List<String>> getRangeToRpcaddressMap(String keyspace)
+    {
+        return getRangeToNativeaddressMap(keyspace, false);
+    }
+
+    public Map<List<String>, List<String>> getRangeToNativeaddressWithPortMap(String keyspace)
+    {
+        return getRangeToNativeaddressMap(keyspace, true);
     }
 
     /**
@@ -1644,16 +1690,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      * @param keyspace
      * @return the endpoint map
      */
-    public Map<List<String>, List<String>> getRangeToRpcaddressMap(String keyspace)
+    private Map<List<String>, List<String>> getRangeToNativeaddressMap(String keyspace, boolean withPort)
     {
         /* All the ranges for the tokens */
         Map<List<String>, List<String>> map = new HashMap<>();
-        for (Map.Entry<Range<Token>, List<InetAddress>> entry : getRangeToAddressMap(keyspace).entrySet())
+        for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : getRangeToAddressMap(keyspace).entrySet())
         {
             List<String> rpcaddrs = new ArrayList<>(entry.getValue().size());
-            for (InetAddress endpoint: entry.getValue())
+            for (InetAddressAndPort endpoint: entry.getValue())
             {
-                rpcaddrs.add(getRpcaddress(endpoint));
+                rpcaddrs.add(getNativeaddress(endpoint, withPort));
             }
             map.put(entry.getKey().asList(), rpcaddrs);
         }
@@ -1662,40 +1708,50 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public Map<List<String>, List<String>> getPendingRangeToEndpointMap(String keyspace)
     {
+        return getPendingRangeToEndpointMap(keyspace, false);
+    }
+
+    public Map<List<String>, List<String>> getPendingRangeToEndpointWithPortMap(String keyspace)
+    {
+        return getPendingRangeToEndpointMap(keyspace, true);
+    }
+
+    private Map<List<String>, List<String>> getPendingRangeToEndpointMap(String keyspace, boolean withPort)
+    {
         // some people just want to get a visual representation of things. Allow null and set it to the first
         // non-system keyspace.
         if (keyspace == null)
             keyspace = Schema.instance.getNonLocalStrategyKeyspaces().get(0);
 
         Map<List<String>, List<String>> map = new HashMap<>();
-        for (Map.Entry<Range<Token>, Collection<InetAddress>> entry : tokenMetadata.getPendingRangesMM(keyspace).asMap().entrySet())
+        for (Map.Entry<Range<Token>, Collection<InetAddressAndPort>> entry : tokenMetadata.getPendingRangesMM(keyspace).asMap().entrySet())
         {
-            List<InetAddress> l = new ArrayList<>(entry.getValue());
-            map.put(entry.getKey().asList(), stringify(l));
+            List<InetAddressAndPort> l = new ArrayList<>(entry.getValue());
+            map.put(entry.getKey().asList(), stringify(l, withPort));
         }
         return map;
     }
 
-    public Map<Range<Token>, List<InetAddress>> getRangeToAddressMap(String keyspace)
+    public Map<Range<Token>, List<InetAddressAndPort>> getRangeToAddressMap(String keyspace)
     {
         return getRangeToAddressMap(keyspace, tokenMetadata.sortedTokens());
     }
 
-    public Map<Range<Token>, List<InetAddress>> getRangeToAddressMapInLocalDC(String keyspace)
+    public Map<Range<Token>, List<InetAddressAndPort>> getRangeToAddressMapInLocalDC(String keyspace)
     {
-        Predicate<InetAddress> isLocalDC = new Predicate<InetAddress>()
+        Predicate<InetAddressAndPort> isLocalDC = new Predicate<InetAddressAndPort>()
         {
-            public boolean apply(InetAddress address)
+            public boolean apply(InetAddressAndPort address)
             {
                 return isLocalDC(address);
             }
         };
 
-        Map<Range<Token>, List<InetAddress>> origMap = getRangeToAddressMap(keyspace, getTokensInLocalDC());
-        Map<Range<Token>, List<InetAddress>> filteredMap = Maps.newHashMap();
-        for (Map.Entry<Range<Token>, List<InetAddress>> entry : origMap.entrySet())
+        Map<Range<Token>, List<InetAddressAndPort>> origMap = getRangeToAddressMap(keyspace, getTokensInLocalDC());
+        Map<Range<Token>, List<InetAddressAndPort>> filteredMap = Maps.newHashMap();
+        for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : origMap.entrySet())
         {
-            List<InetAddress> endpointsInLocalDC = Lists.newArrayList(Collections2.filter(entry.getValue(), isLocalDC));
+            List<InetAddressAndPort> endpointsInLocalDC = Lists.newArrayList(Collections2.filter(entry.getValue(), isLocalDC));
             filteredMap.put(entry.getKey(), endpointsInLocalDC);
         }
 
@@ -1707,21 +1763,21 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         List<Token> filteredTokens = Lists.newArrayList();
         for (Token token : tokenMetadata.sortedTokens())
         {
-            InetAddress endpoint = tokenMetadata.getEndpoint(token);
+            InetAddressAndPort endpoint = tokenMetadata.getEndpoint(token);
             if (isLocalDC(endpoint))
                 filteredTokens.add(token);
         }
         return filteredTokens;
     }
 
-    private boolean isLocalDC(InetAddress targetHost)
+    private boolean isLocalDC(InetAddressAndPort targetHost)
     {
         String remoteDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(targetHost);
-        String localDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
+        String localDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
         return remoteDC.equals(localDC);
     }
 
-    private Map<Range<Token>, List<InetAddress>> getRangeToAddressMap(String keyspace, List<Token> sortedTokens)
+    private Map<Range<Token>, List<InetAddressAndPort>> getRangeToAddressMap(String keyspace, List<Token> sortedTokens)
     {
         // some people just want to get a visual representation of things. Allow null and set it to the first
         // non-system keyspace.
@@ -1733,6 +1789,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     }
 
 
+    public List<String> describeRingJMX(String keyspace) throws IOException
+    {
+        return describeRingJMX(keyspace, false);
+    }
+
+    public List<String> describeRingWithPortJMX(String keyspace) throws IOException
+    {
+        return describeRingJMX(keyspace,true);
+    }
+
     /**
      * The same as {@code describeRing(String)} but converts TokenRange to the String for JMX compatibility
      *
@@ -1740,12 +1806,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      *
      * @return a List of TokenRange(s) converted to String for the given keyspace
      */
-    public List<String> describeRingJMX(String keyspace) throws IOException
+    private List<String> describeRingJMX(String keyspace, boolean withPort) throws IOException
     {
         List<TokenRange> tokenRanges;
         try
         {
-            tokenRanges = describeRing(keyspace);
+            tokenRanges = describeRing(keyspace, false, withPort);
         }
         catch (InvalidRequestException e)
         {
@@ -1754,7 +1820,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         List<String> result = new ArrayList<>(tokenRanges.size());
 
         for (TokenRange tokenRange : tokenRanges)
-            result.add(tokenRange.toString());
+            result.add(tokenRange.toString(withPort));
 
         return result;
     }
@@ -1770,7 +1836,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      */
     public List<TokenRange> describeRing(String keyspace) throws InvalidRequestException
     {
-        return describeRing(keyspace, false);
+        return describeRing(keyspace, false, false);
     }
 
     /**
@@ -1778,10 +1844,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      */
     public List<TokenRange> describeLocalRing(String keyspace) throws InvalidRequestException
     {
-        return describeRing(keyspace, true);
+        return describeRing(keyspace, true, false);
     }
 
-    private List<TokenRange> describeRing(String keyspace, boolean includeOnlyLocalDC) throws InvalidRequestException
+    private List<TokenRange> describeRing(String keyspace, boolean includeOnlyLocalDC, boolean withPort) throws InvalidRequestException
     {
         if (!Schema.instance.getKeyspaces().contains(keyspace))
             throw new InvalidRequestException("No such keyspace: " + keyspace);
@@ -1792,39 +1858,49 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         List<TokenRange> ranges = new ArrayList<>();
         Token.TokenFactory tf = getTokenFactory();
 
-        Map<Range<Token>, List<InetAddress>> rangeToAddressMap =
+        Map<Range<Token>, List<InetAddressAndPort>> rangeToAddressMap =
                 includeOnlyLocalDC
                         ? getRangeToAddressMapInLocalDC(keyspace)
                         : getRangeToAddressMap(keyspace);
 
-        for (Map.Entry<Range<Token>, List<InetAddress>> entry : rangeToAddressMap.entrySet())
-            ranges.add(TokenRange.create(tf, entry.getKey(), entry.getValue()));
+        for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : rangeToAddressMap.entrySet())
+            ranges.add(TokenRange.create(tf, entry.getKey(), entry.getValue(), withPort));
 
         return ranges;
     }
 
     public Map<String, String> getTokenToEndpointMap()
     {
-        Map<Token, InetAddress> mapInetAddress = tokenMetadata.getNormalAndBootstrappingTokenToEndpointMap();
+        return getTokenToEndpointMap(false);
+    }
+
+    public Map<String, String> getTokenToEndpointWithPortMap()
+    {
+        return getTokenToEndpointMap(true);
+    }
+
+    private Map<String, String> getTokenToEndpointMap(boolean withPort)
+    {
+        Map<Token, InetAddressAndPort> mapInetAddress = tokenMetadata.getNormalAndBootstrappingTokenToEndpointMap();
         // in order to preserve tokens in ascending order, we use LinkedHashMap here
         Map<String, String> mapString = new LinkedHashMap<>(mapInetAddress.size());
         List<Token> tokens = new ArrayList<>(mapInetAddress.keySet());
         Collections.sort(tokens);
         for (Token token : tokens)
         {
-            mapString.put(token.toString(), mapInetAddress.get(token).getHostAddress());
+            mapString.put(token.toString(), mapInetAddress.get(token).getHostAddress(withPort));
         }
         return mapString;
     }
 
     public String getLocalHostId()
     {
-        return getTokenMetadata().getHostId(FBUtilities.getBroadcastAddress()).toString();
+        return getTokenMetadata().getHostId(FBUtilities.getBroadcastAddressAndPort()).toString();
     }
 
     public UUID getLocalHostUUID()
     {
-        return getTokenMetadata().getHostId(FBUtilities.getBroadcastAddress());
+        return getTokenMetadata().getHostId(FBUtilities.getBroadcastAddressAndPort());
     }
 
     public Map<String, String> getHostIdMap()
@@ -1832,19 +1908,40 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return getEndpointToHostId();
     }
 
+
     public Map<String, String> getEndpointToHostId()
     {
+        return getEndpointToHostId(false);
+    }
+
+    public Map<String, String> getEndpointWithPortToHostId()
+    {
+        return getEndpointToHostId(true);
+    }
+
+    private  Map<String, String> getEndpointToHostId(boolean withPort)
+    {
         Map<String, String> mapOut = new HashMap<>();
-        for (Map.Entry<InetAddress, UUID> entry : getTokenMetadata().getEndpointToHostIdMapForReading().entrySet())
-            mapOut.put(entry.getKey().getHostAddress(), entry.getValue().toString());
+        for (Map.Entry<InetAddressAndPort, UUID> entry : getTokenMetadata().getEndpointToHostIdMapForReading().entrySet())
+            mapOut.put(entry.getKey().getHostAddress(withPort), entry.getValue().toString());
         return mapOut;
     }
 
     public Map<String, String> getHostIdToEndpoint()
     {
+        return getHostIdToEndpoint(false);
+    }
+
+    public Map<String, String> getHostIdToEndpointWithPort()
+    {
+        return getHostIdToEndpoint(true);
+    }
+
+    private Map<String, String> getHostIdToEndpoint(boolean withPort)
+    {
         Map<String, String> mapOut = new HashMap<>();
-        for (Map.Entry<InetAddress, UUID> entry : getTokenMetadata().getEndpointToHostIdMapForReading().entrySet())
-            mapOut.put(entry.getValue().toString(), entry.getKey().getHostAddress());
+        for (Map.Entry<InetAddressAndPort, UUID> entry : getTokenMetadata().getEndpointToHostIdMapForReading().entrySet())
+            mapOut.put(entry.getValue().toString(), entry.getKey().getHostAddress(withPort));
         return mapOut;
     }
 
@@ -1854,9 +1951,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      * @param ranges
      * @return mapping of ranges to the replicas responsible for them.
     */
-    private Map<Range<Token>, List<InetAddress>> constructRangeToEndpointMap(String keyspace, List<Range<Token>> ranges)
+    private Map<Range<Token>, List<InetAddressAndPort>> constructRangeToEndpointMap(String keyspace, List<Range<Token>> ranges)
     {
-        Map<Range<Token>, List<InetAddress>> rangeToEndpointMap = new HashMap<>(ranges.size());
+        Map<Range<Token>, List<InetAddressAndPort>> rangeToEndpointMap = new HashMap<>(ranges.size());
         for (Range<Token> range : ranges)
         {
             rangeToEndpointMap.put(range, Keyspace.open(keyspace).getReplicationStrategy().getNaturalEndpoints(range.right));
@@ -1864,7 +1961,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return rangeToEndpointMap;
     }
 
-    public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue)
+    public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue)
     {
         // no-op
     }
@@ -1901,9 +1998,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      * Note: Any time a node state changes from STATUS_NORMAL, it will not be visible to new nodes. So it follows that
      * you should never bootstrap a new node during a removenode, decommission or move.
      */
-    public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
+    public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value)
     {
-        if (state == ApplicationState.STATUS)
+        if (state == ApplicationState.STATUS | state == ApplicationState.STATUS_WITH_PORT)
         {
             String[] pieces = splitValue(value);
             assert (pieces.length > 0);
@@ -1973,6 +2070,17 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                             throw new RuntimeException(e);
                         }
                         break;
+                    case NATIVE_ADDRESS_AND_PORT:
+                        try
+                        {
+                            InetAddressAndPort address = InetAddressAndPort.getByName(value.value);
+                            SystemKeyspace.updatePeerNativeAddress(endpoint, address);
+                        }
+                        catch (UnknownHostException e)
+                        {
+                            throw new RuntimeException(e);
+                        }
+                        break;
                     case SCHEMA:
                         SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(value.value));
                         MigrationManager.instance.scheduleSchemaPull(endpoint, epState);
@@ -1996,7 +2104,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return value.value.split(VersionedValue.DELIMITER_STR, -1);
     }
 
-    private void updateNetVersion(InetAddress endpoint, VersionedValue value)
+    private void updateNetVersion(InetAddressAndPort endpoint, VersionedValue value)
     {
         try
         {
@@ -2008,7 +2116,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
     }
 
-    public void updateTopology(InetAddress endpoint)
+    public void updateTopology(InetAddressAndPort endpoint)
     {
         if (getTokenMetadata().isMember(endpoint))
         {
@@ -2021,9 +2129,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         getTokenMetadata().updateTopology();
     }
 
-    private void updatePeerInfo(InetAddress endpoint)
+    private void updatePeerInfo(InetAddressAndPort endpoint)
     {
         EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+        InetAddress native_address = null;
+        int native_port = DatabaseDescriptor.getNativeTransportPort();
+
         for (Map.Entry<ApplicationState, VersionedValue> entry : epState.states())
         {
             switch (entry.getKey())
@@ -2040,7 +2151,19 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 case RPC_ADDRESS:
                     try
                     {
-                        SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(entry.getValue().value));
+                        native_address = InetAddress.getByName(entry.getValue().value);
+                    }
+                    catch (UnknownHostException e)
+                    {
+                        throw new RuntimeException(e);
+                    }
+                    break;
+                case NATIVE_ADDRESS_AND_PORT:
+                    try
+                    {
+                        InetAddressAndPort address = InetAddressAndPort.getByName(entry.getValue().value);
+                        native_address = address.address;
+                        native_port = address.port;
                     }
                     catch (UnknownHostException e)
                     {
@@ -2055,9 +2178,17 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                     break;
             }
         }
+
+        //Some tests won't set all the states
+        if (native_address != null)
+        {
+            SystemKeyspace.updatePeerNativeAddress(endpoint,
+                                                   InetAddressAndPort.getByAddressOverrideDefaults(native_address,
+                                                                                                   native_port));
+        }
     }
 
-    private void notifyRpcChange(InetAddress endpoint, boolean ready)
+    private void notifyRpcChange(InetAddressAndPort endpoint, boolean ready)
     {
         if (ready)
             notifyUp(endpoint);
@@ -2065,7 +2196,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             notifyDown(endpoint);
     }
 
-    private void notifyUp(InetAddress endpoint)
+    private void notifyUp(InetAddressAndPort endpoint)
     {
         if (!isRpcReady(endpoint) || !Gossiper.instance.isAlive(endpoint))
             return;
@@ -2074,13 +2205,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             subscriber.onUp(endpoint);
     }
 
-    private void notifyDown(InetAddress endpoint)
+    private void notifyDown(InetAddressAndPort endpoint)
     {
         for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
             subscriber.onDown(endpoint);
     }
 
-    private void notifyJoined(InetAddress endpoint)
+    private void notifyJoined(InetAddressAndPort endpoint)
     {
         if (!isStatus(endpoint, VersionedValue.STATUS_NORMAL))
             return;
@@ -2089,25 +2220,25 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             subscriber.onJoinCluster(endpoint);
     }
 
-    private void notifyMoved(InetAddress endpoint)
+    private void notifyMoved(InetAddressAndPort endpoint)
     {
         for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
             subscriber.onMove(endpoint);
     }
 
-    private void notifyLeft(InetAddress endpoint)
+    private void notifyLeft(InetAddressAndPort endpoint)
     {
         for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
             subscriber.onLeaveCluster(endpoint);
     }
 
-    private boolean isStatus(InetAddress endpoint, String status)
+    private boolean isStatus(InetAddressAndPort endpoint, String status)
     {
         EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
         return state != null && state.getStatus().equals(status);
     }
 
-    public boolean isRpcReady(InetAddress endpoint)
+    public boolean isRpcReady(InetAddressAndPort endpoint)
     {
         EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
         return state != null && state.isRpcReady();
@@ -2123,7 +2254,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      */
     public void setRpcReady(boolean value)
     {
-        EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddress());
+        EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddressAndPort());
         // if value is false we're OK with a null state, if it is true we are not.
         assert !value || state != null;
 
@@ -2131,7 +2262,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             Gossiper.instance.addLocalApplicationState(ApplicationState.RPC_READY, valueFactory.rpcReady(value));
     }
 
-    private Collection<Token> getTokensFor(InetAddress endpoint)
+    private Collection<Token> getTokensFor(InetAddressAndPort endpoint)
     {
         try
         {
@@ -2156,7 +2287,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      *
      * @param endpoint bootstrapping node
      */
-    private void handleStateBootstrap(InetAddress endpoint)
+    private void handleStateBootstrap(InetAddressAndPort endpoint)
     {
         Collection<Token> tokens;
         // explicitly check for TOKENS, because a bootstrapping node might be bootstrapping in legacy mode; that is, not using vnodes and no token specified
@@ -2186,12 +2317,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         tokenMetadata.updateHostId(Gossiper.instance.getHostId(endpoint), endpoint);
     }
 
-    private void handleStateBootreplacing(InetAddress newNode, String[] pieces)
+    private void handleStateBootreplacing(InetAddressAndPort newNode, String[] pieces)
     {
-        InetAddress oldNode;
+        InetAddressAndPort oldNode;
         try
         {
-            oldNode = InetAddress.getByName(pieces[1]);
+            oldNode = InetAddressAndPort.getByName(pieces[1]);
         }
         catch (Exception e)
         {
@@ -2204,7 +2335,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             throw new RuntimeException(String.format("Node %s is trying to replace alive node %s.", newNode, oldNode));
         }
 
-        Optional<InetAddress> replacingNode = tokenMetadata.getReplacingNode(newNode);
+        Optional<InetAddressAndPort> replacingNode = tokenMetadata.getReplacingNode(newNode);
         if (replacingNode.isPresent() && !replacingNode.get().equals(oldNode))
         {
             throw new RuntimeException(String.format("Node %s is already replacing %s but is trying to replace %s.",
@@ -2228,12 +2359,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      *
      * @param endpoint node
      */
-    private void handleStateNormal(final InetAddress endpoint, final String status)
+    private void handleStateNormal(final InetAddressAndPort endpoint, final String status)
     {
         Collection<Token> tokens = getTokensFor(endpoint);
         Set<Token> tokensToUpdateInMetadata = new HashSet<>();
         Set<Token> tokensToUpdateInSystemKeyspace = new HashSet<>();
-        Set<InetAddress> endpointsToRemove = new HashSet<>();
+        Set<InetAddressAndPort> endpointsToRemove = new HashSet<>();
 
         if (logger.isDebugEnabled())
             logger.debug("Node {} state {}, token {}", endpoint, status, tokens);
@@ -2246,7 +2377,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                          endpoint,
                          Gossiper.instance.getEndpointStateForEndpoint(endpoint));
 
-        Optional<InetAddress> replacingNode = tokenMetadata.getReplacingNode(endpoint);
+        Optional<InetAddressAndPort> replacingNode = tokenMetadata.getReplacingNode(endpoint);
         if (replacingNode.isPresent())
         {
             assert !endpoint.equals(replacingNode.get()) : "Pending replacement endpoint with same address is not supported";
@@ -2259,7 +2390,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             endpointsToRemove.add(replacingNode.get());
         }
 
-        Optional<InetAddress> replacementNode = tokenMetadata.getReplacementNode(endpoint);
+        Optional<InetAddressAndPort> replacementNode = tokenMetadata.getReplacementNode(endpoint);
         if (replacementNode.isPresent())
         {
             logger.warn("Node {} is currently being replaced by node {}.", endpoint, replacementNode.get());
@@ -2268,7 +2399,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         updatePeerInfo(endpoint);
         // Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300).
         UUID hostId = Gossiper.instance.getHostId(endpoint);
-        InetAddress existing = tokenMetadata.getEndpointForHostId(hostId);
+        InetAddressAndPort existing = tokenMetadata.getEndpointForHostId(hostId);
         if (replacing && isReplacingSameAddress() && Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()) != null
             && (hostId.equals(Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress()))))
             logger.warn("Not updating token metadata for {} because I am replacing it", endpoint);
@@ -2276,7 +2407,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         {
             if (existing != null && !existing.equals(endpoint))
             {
-                if (existing.equals(FBUtilities.getBroadcastAddress()))
+                if (existing.equals(FBUtilities.getBroadcastAddressAndPort()))
                 {
                     logger.warn("Not updating host ID {} for {} because it's mine", hostId, endpoint);
                     tokenMetadata.removeEndpoint(endpoint);
@@ -2303,7 +2434,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         for (final Token token : tokens)
         {
             // we don't want to update if this node is responsible for the token and it has a later startup time than endpoint.
-            InetAddress currentOwner = tokenMetadata.getEndpoint(token);
+            InetAddressAndPort currentOwner = tokenMetadata.getEndpoint(token);
             if (currentOwner == null)
             {
                 logger.debug("New node {} at token {}", endpoint, token);
@@ -2323,7 +2454,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
                 // currentOwner is no longer current, endpoint is.  Keep track of these moves, because when
                 // a host no longer has any tokens, we'll want to remove it.
-                Multimap<InetAddress, Token> epToTokenCopy = getTokenMetadata().getEndpointToTokenMapForReading();
+                Multimap<InetAddressAndPort, Token> epToTokenCopy = getTokenMetadata().getEndpointToTokenMapForReading();
                 epToTokenCopy.get(currentOwner).remove(token);
                 if (epToTokenCopy.get(currentOwner).size() < 1)
                     endpointsToRemove.add(currentOwner);
@@ -2348,7 +2479,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         boolean isMember = tokenMetadata.isMember(endpoint);
         boolean isMoving = tokenMetadata.isMoving(endpoint);
         tokenMetadata.updateNormalTokens(tokensToUpdateInMetadata, endpoint);
-        for (InetAddress ep : endpointsToRemove)
+        for (InetAddressAndPort ep : endpointsToRemove)
         {
             removeEndpoint(ep);
             if (replacing && DatabaseDescriptor.getReplaceAddress().equals(ep))
@@ -2375,7 +2506,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      *
      * @param endpoint node
      */
-    private void handleStateLeaving(InetAddress endpoint)
+    private void handleStateLeaving(InetAddressAndPort endpoint)
     {
         Collection<Token> tokens = getTokensFor(endpoint);
 
@@ -2408,7 +2539,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      * @param endpoint If reason for leaving is decommission, endpoint is the leaving node.
      * @param pieces STATE_LEFT,token
      */
-    private void handleStateLeft(InetAddress endpoint, String[] pieces)
+    private void handleStateLeft(InetAddressAndPort endpoint, String[] pieces)
     {
         assert pieces.length >= 2;
         Collection<Token> tokens = getTokensFor(endpoint);
@@ -2425,7 +2556,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      * @param endpoint moving endpoint address
      * @param pieces STATE_MOVING, token
      */
-    private void handleStateMoving(InetAddress endpoint, String[] pieces)
+    private void handleStateMoving(InetAddressAndPort endpoint, String[] pieces)
     {
         assert pieces.length >= 2;
         Token token = getTokenFactory().fromString(pieces[1]);
@@ -2444,11 +2575,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      * @param endpoint node
      * @param pieces either REMOVED_TOKEN (node is gone) or REMOVING_TOKEN (replicas need to be restored)
      */
-    private void handleStateRemoving(InetAddress endpoint, String[] pieces)
+    private void handleStateRemoving(InetAddressAndPort endpoint, String[] pieces)
     {
         assert (pieces.length > 0);
 
-        if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+        if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
         {
             logger.info("Received removenode gossip about myself. Is this node rejoining after an explicit removenode?");
             try
@@ -2494,7 +2625,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
     }
 
-    private void excise(Collection<Token> tokens, InetAddress endpoint)
+    private void excise(Collection<Token> tokens, InetAddressAndPort endpoint)
     {
         logger.info("Removing tokens {} for {}", tokens, endpoint);
 
@@ -2510,20 +2641,20 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         PendingRangeCalculatorService.instance.update();
     }
 
-    private void excise(Collection<Token> tokens, InetAddress endpoint, long expireTime)
+    private void excise(Collection<Token> tokens, InetAddressAndPort endpoint, long expireTime)
     {
         addExpireTimeIfFound(endpoint, expireTime);
         excise(tokens, endpoint);
     }
 
     /** unlike excise we just need this endpoint gone without going through any notifications **/
-    private void removeEndpoint(InetAddress endpoint)
+    private void removeEndpoint(InetAddressAndPort endpoint)
     {
         Gossiper.instance.removeEndpoint(endpoint);
         SystemKeyspace.removeEndpoint(endpoint);
     }
 
-    protected void addExpireTimeIfFound(InetAddress endpoint, long expireTime)
+    protected void addExpireTimeIfFound(InetAddressAndPort endpoint, long expireTime)
     {
         if (expireTime != 0L)
         {
@@ -2543,23 +2674,23 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      * @param ranges the ranges to find sources for
      * @return multimap of addresses to ranges the address is responsible for
      */
-    private Multimap<InetAddress, Range<Token>> getNewSourceRanges(String keyspaceName, Set<Range<Token>> ranges)
+    private Multimap<InetAddressAndPort, Range<Token>> getNewSourceRanges(String keyspaceName, Set<Range<Token>> ranges)
     {
-        InetAddress myAddress = FBUtilities.getBroadcastAddress();
-        Multimap<Range<Token>, InetAddress> rangeAddresses = Keyspace.open(keyspaceName).getReplicationStrategy().getRangeAddresses(tokenMetadata.cloneOnlyTokenMap());
-        Multimap<InetAddress, Range<Token>> sourceRanges = HashMultimap.create();
+        InetAddressAndPort myAddress = FBUtilities.getBroadcastAddressAndPort();
+        Multimap<Range<Token>, InetAddressAndPort> rangeAddresses = Keyspace.open(keyspaceName).getReplicationStrategy().getRangeAddresses(tokenMetadata.cloneOnlyTokenMap());
+        Multimap<InetAddressAndPort, Range<Token>> sourceRanges = HashMultimap.create();
         IFailureDetector failureDetector = FailureDetector.instance;
 
         // find alive sources for our new ranges
         for (Range<Token> range : ranges)
         {
-            Collection<InetAddress> possibleRanges = rangeAddresses.get(range);
+            Collection<InetAddressAndPort> possibleRanges = rangeAddresses.get(range);
             IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-            List<InetAddress> sources = snitch.getSortedListByProximity(myAddress, possibleRanges);
+            List<InetAddressAndPort> sources = snitch.getSortedListByProximity(myAddress, possibleRanges);
 
             assert (!sources.contains(myAddress));
 
-            for (InetAddress source : sources)
+            for (InetAddressAndPort source : sources)
             {
                 if (failureDetector.isAlive(source))
                 {
@@ -2576,7 +2707,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      *
      * @param remote node to send notification to
      */
-    private void sendReplicationNotification(InetAddress remote)
+    private void sendReplicationNotification(InetAddressAndPort remote)
     {
         // notify the remote token
         MessageOut msg = new MessageOut(MessagingService.Verb.REPLICATION_FINISHED);
@@ -2608,23 +2739,23 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      *
      * @param endpoint the node that left
      */
-    private void restoreReplicaCount(InetAddress endpoint, final InetAddress notifyEndpoint)
+    private void restoreReplicaCount(InetAddressAndPort endpoint, final InetAddressAndPort notifyEndpoint)
     {
-        Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> rangesToFetch = HashMultimap.create();
+        Multimap<String, Map.Entry<InetAddressAndPort, Collection<Range<Token>>>> rangesToFetch = HashMultimap.create();
 
-        InetAddress myAddress = FBUtilities.getBroadcastAddress();
+        InetAddressAndPort myAddress = FBUtilities.getBroadcastAddressAndPort();
 
         for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces())
         {
-            Multimap<Range<Token>, InetAddress> changedRanges = getChangedRangesForLeaving(keyspaceName, endpoint);
+            Multimap<Range<Token>, InetAddressAndPort> changedRanges = getChangedRangesForLeaving(keyspaceName, endpoint);
             Set<Range<Token>> myNewRanges = new HashSet<>();
-            for (Map.Entry<Range<Token>, InetAddress> entry : changedRanges.entries())
+            for (Map.Entry<Range<Token>, InetAddressAndPort> entry : changedRanges.entries())
             {
                 if (entry.getValue().equals(myAddress))
                     myNewRanges.add(entry.getKey());
             }
-            Multimap<InetAddress, Range<Token>> sourceRanges = getNewSourceRanges(keyspaceName, myNewRanges);
-            for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : sourceRanges.asMap().entrySet())
+            Multimap<InetAddressAndPort, Range<Token>> sourceRanges = getNewSourceRanges(keyspaceName, myNewRanges);
+            for (Map.Entry<InetAddressAndPort, Collection<Range<Token>>> entry : sourceRanges.asMap().entrySet())
             {
                 rangesToFetch.put(keyspaceName, entry);
             }
@@ -2633,10 +2764,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         StreamPlan stream = new StreamPlan(StreamOperation.RESTORE_REPLICA_COUNT);
         for (String keyspaceName : rangesToFetch.keySet())
         {
-            for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : rangesToFetch.get(keyspaceName))
+            for (Map.Entry<InetAddressAndPort, Collection<Range<Token>>> entry : rangesToFetch.get(keyspaceName))
             {
-                InetAddress source = entry.getKey();
-                InetAddress preferred = SystemKeyspace.getPreferredIP(source);
+                InetAddressAndPort source = entry.getKey();
+                InetAddressAndPort preferred = SystemKeyspace.getPreferredIP(source);
                 Collection<Range<Token>> ranges = entry.getValue();
                 if (logger.isDebugEnabled())
                     logger.debug("Requesting from {} ranges {}", source, StringUtils.join(ranges, ", "));
@@ -2661,7 +2792,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     }
 
     // needs to be modified to accept either a keyspace or ARS.
-    private Multimap<Range<Token>, InetAddress> getChangedRangesForLeaving(String keyspaceName, InetAddress endpoint)
+    private Multimap<Range<Token>, InetAddressAndPort> getChangedRangesForLeaving(String keyspaceName, InetAddressAndPort endpoint)
     {
         // First get all ranges the leaving endpoint is responsible for
         Collection<Range<Token>> ranges = getRangesForEndpoint(keyspaceName, endpoint);
@@ -2669,7 +2800,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         if (logger.isDebugEnabled())
             logger.debug("Node {} ranges [{}]", endpoint, StringUtils.join(ranges, ", "));
 
-        Map<Range<Token>, List<InetAddress>> currentReplicaEndpoints = new HashMap<>(ranges.size());
+        Map<Range<Token>, List<InetAddressAndPort>> currentReplicaEndpoints = new HashMap<>(ranges.size());
 
         // Find (for each range) all nodes that store replicas for these ranges as well
         TokenMetadata metadata = tokenMetadata.cloneOnlyTokenMap(); // don't do this in the loop! #7758
@@ -2683,7 +2814,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         if (temp.isMember(endpoint))
             temp.removeEndpoint(endpoint);
 
-        Multimap<Range<Token>, InetAddress> changedRanges = HashMultimap.create();
+        Multimap<Range<Token>, InetAddressAndPort> changedRanges = HashMultimap.create();
 
         // Go through the ranges and for each range check who will be
         // storing replicas for these ranges when the leaving endpoint
@@ -2692,7 +2823,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         // range.
         for (Range<Token> range : ranges)
         {
-            Collection<InetAddress> newReplicaEndpoints = Keyspace.open(keyspaceName).getReplicationStrategy().calculateNaturalEndpoints(range.right, temp);
+            Collection<InetAddressAndPort> newReplicaEndpoints = Keyspace.open(keyspaceName).getReplicationStrategy().calculateNaturalEndpoints(range.right, temp);
             newReplicaEndpoints.removeAll(currentReplicaEndpoints.get(range));
             if (logger.isDebugEnabled())
                 if (newReplicaEndpoints.isEmpty())
@@ -2705,7 +2836,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return changedRanges;
     }
 
-    public void onJoin(InetAddress endpoint, EndpointState epState)
+    public void onJoin(InetAddressAndPort endpoint, EndpointState epState)
     {
         for (Map.Entry<ApplicationState, VersionedValue> entry : epState.states())
         {
@@ -2714,7 +2845,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         MigrationManager.instance.scheduleSchemaPull(endpoint, epState);
     }
 
-    public void onAlive(InetAddress endpoint, EndpointState state)
+    public void onAlive(InetAddressAndPort endpoint, EndpointState state)
     {
         MigrationManager.instance.scheduleSchemaPull(endpoint, state);
 
@@ -2722,19 +2853,19 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             notifyUp(endpoint);
     }
 
-    public void onRemove(InetAddress endpoint)
+    public void onRemove(InetAddressAndPort endpoint)
     {
         tokenMetadata.removeEndpoint(endpoint);
         PendingRangeCalculatorService.instance.update();
     }
 
-    public void onDead(InetAddress endpoint, EndpointState state)
+    public void onDead(InetAddressAndPort endpoint, EndpointState state)
     {
         MessagingService.instance().convict(endpoint);
         notifyDown(endpoint);
     }
 
-    public void onRestart(InetAddress endpoint, EndpointState state)
+    public void onRestart(InetAddressAndPort endpoint, EndpointState state)
     {
         // If we have restarted before the node was even marked down, we need to reset the connection pool
         if (state.isAlive())
@@ -2753,15 +2884,25 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return FileUtils.stringifyFileSize(StorageMetrics.load.getCount());
     }
 
+    public Map<String, String> getLoadMapWithPort()
+    {
+        return getLoadMap(true);
+    }
+
     public Map<String, String> getLoadMap()
     {
+        return getLoadMap(false);
+    }
+
+    private Map<String, String> getLoadMap(boolean withPort)
+    {
         Map<String, String> map = new HashMap<>();
-        for (Map.Entry<InetAddress,Double> entry : LoadBroadcaster.instance.getLoadInfo().entrySet())
+        for (Map.Entry<InetAddressAndPort,Double> entry : LoadBroadcaster.instance.getLoadInfo().entrySet())
         {
-            map.put(entry.getKey().getHostAddress(), FileUtils.stringifyFileSize(entry.getValue()));
+            map.put(entry.getKey().getHostAddress(withPort), FileUtils.stringifyFileSize(entry.getValue()));
         }
         // gossiper doesn't see its own updates, so we need to special-case the local node
-        map.put(FBUtilities.getBroadcastAddress().getHostAddress(), getLoadString());
+        map.put(withPort ? FBUtilities.getJustBroadcastAddress().getHostAddress() : FBUtilities.getBroadcastAddressAndPort().toString(), getLoadString());
         return map;
     }
 
@@ -2779,13 +2920,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     }
 
     @Nullable
-    public InetAddress getEndpointForHostId(UUID hostId)
+    public InetAddressAndPort getEndpointForHostId(UUID hostId)
     {
         return tokenMetadata.getEndpointForHostId(hostId);
     }
 
     @Nullable
-    public UUID getHostIdForEndpoint(InetAddress address)
+    public UUID getHostIdForEndpoint(InetAddressAndPort address)
     {
         return tokenMetadata.getHostId(address);
     }
@@ -2794,15 +2935,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public List<String> getTokens()
     {
-        return getTokens(FBUtilities.getBroadcastAddress());
+        return getTokens(FBUtilities.getBroadcastAddressAndPort());
     }
 
     public List<String> getTokens(String endpoint) throws UnknownHostException
     {
-        return getTokens(InetAddress.getByName(endpoint));
+        return getTokens(InetAddressAndPort.getByName(endpoint));
     }
 
-    private List<String> getTokens(InetAddress endpoint)
+    private List<String> getTokens(InetAddressAndPort endpoint)
     {
         List<String> strTokens = new ArrayList<>();
         for (Token tok : getTokenMetadata().getTokens(endpoint))
@@ -2820,42 +2961,74 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return Schema.instance.getVersion().toString();
     }
 
+    @Deprecated
     public List<String> getLeavingNodes()
     {
-        return stringify(tokenMetadata.getLeavingEndpoints());
+        return stringify(tokenMetadata.getLeavingEndpoints(), false);
+    }
+
+    public List<String> getLeavingNodesWithPort()
+    {
+        return stringify(tokenMetadata.getLeavingEndpoints(), true);
     }
 
+    @Deprecated
     public List<String> getMovingNodes()
     {
         List<String> endpoints = new ArrayList<>();
 
-        for (Pair<Token, InetAddress> node : tokenMetadata.getMovingEndpoints())
+        for (Pair<Token, InetAddressAndPort> node : tokenMetadata.getMovingEndpoints())
+        {
+            endpoints.add(node.right.address.getHostAddress());
+        }
+
+        return endpoints;
+    }
+
+    public List<String> getMovingNodesWithPort()
+    {
+        List<String> endpoints = new ArrayList<>();
+
+        for (Pair<Token, InetAddressAndPort> node : tokenMetadata.getMovingEndpoints())
         {
-            endpoints.add(node.right.getHostAddress());
+            endpoints.add(node.right.toString());
         }
 
         return endpoints;
     }
 
+
     public List<String> getJoiningNodes()
     {
-        return stringify(tokenMetadata.getBootstrapTokens().valueSet());
+        return stringify(tokenMetadata.getBootstrapTokens().valueSet(), false);
+    }
+
+    @Deprecated
+    public List<String> getJoiningNodesWithPort()
+    {
+        return stringify(tokenMetadata.getBootstrapTokens().valueSet(), true);
     }
 
     public List<String> getLiveNodes()
     {
-        return stringify(Gossiper.instance.getLiveMembers());
+        return stringify(Gossiper.instance.getLiveMembers(), false);
+    }
+
+    @Deprecated
+    public List<String> getLiveNodesWithPort()
+    {
+        return stringify(Gossiper.instance.getLiveMembers(), true);
     }
 
-    public Set<InetAddress> getLiveRingMembers()
+    public Set<InetAddressAndPort> getLiveRingMembers()
     {
         return getLiveRingMembers(false);
     }
 
-    public Set<InetAddress> getLiveRingMembers(boolean excludeDeadStates)
+    public Set<InetAddressAndPort> getLiveRingMembers(boolean excludeDeadStates)
     {
-        Set<InetAddress> ret = new HashSet<>();
-        for (InetAddress ep : Gossiper.instance.getLiveMembers())
+        Set<InetAddressAndPort> ret = new HashSet<>();
+        for (InetAddressAndPort ep : Gossiper.instance.getLiveMembers())
         {
             if (excludeDeadStates)
             {
@@ -2871,9 +3044,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     }
 
 
+    @Deprecated
     public List<String> getUnreachableNodes()
     {
-        return stringify(Gossiper.instance.getUnreachableMembers());
+        return stringify(Gossiper.instance.getUnreachableMembers(), false);
+    }
+
+    public List<String> getUnreachableNodesWithPort()
+    {
+        return stringify(Gossiper.instance.getUnreachableMembers(), true);
     }
 
     public String[] getAllDataFileLocations()
@@ -2894,19 +3073,19 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return FileUtils.getCanonicalPath(DatabaseDescriptor.getSavedCachesLocation());
     }
 
-    private List<String> stringify(Iterable<InetAddress> endpoints)
+    private List<String> stringify(Iterable<InetAddressAndPort> endpoints, boolean withPort)
     {
         List<String> stringEndpoints = new ArrayList<>();
-        for (InetAddress ep : endpoints)
+        for (InetAddressAndPort ep : endpoints)
         {
-            stringEndpoints.add(ep.getHostAddress());
+            stringEndpoints.add(ep.getHostAddress(withPort));
         }
         return stringEndpoints;
     }
 
     public int getCurrentGenerationNumber()
     {
-        return Gossiper.instance.getCurrentGenerationNumber(FBUtilities.getBroadcastAddress());
+        return Gossiper.instance.getCurrentGenerationNumber(FBUtilities.getBroadcastAddressAndPort());
     }
 
     public int forceKeyspaceCleanup(String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException
@@ -3436,14 +3615,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      * @param ep endpoint we are interested in.
      * @return primary ranges for the specified endpoint.
      */
-    public Collection<Range<Token>> getPrimaryRangesForEndpoint(String keyspace, InetAddress ep)
+    public Collection<Range<Token>> getPrimaryRangesForEndpoint(String keyspace, InetAddressAndPort ep)
     {
         AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy();
         Collection<Range<Token>> primaryRanges = new HashSet<>();
         TokenMetadata metadata = tokenMetadata.cloneOnlyTokenMap();
         for (Token token : metadata.sortedTokens())
         {
-            List<InetAddress> endpoints = strategy.calculateNaturalEndpoints(token, metadata);
+            List<InetAddressAndPort> endpoints = strategy.calculateNaturalEndpoints(token, metadata);
             if (endpoints.size() > 0 && endpoints.get(0).equals(ep))
                 primaryRanges.add(new Range<>(metadata.getPredecessor(token), token));
         }
@@ -3453,23 +3632,23 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     /**
      * Get the "primary ranges" within local DC for the specified keyspace and endpoint.
      *
-     * @see #getPrimaryRangesForEndpoint(String, java.net.InetAddress)
+     * @see #getPrimaryRangesForEndpoint(String, InetAddressAndPort)
      * @param keyspace Keyspace name to check primary ranges
      * @param referenceEndpoint endpoint we are interested in.
      * @return primary ranges within local DC for the specified endpoint.
      */
-    public Collection<Range<Token>> getPrimaryRangeForEndpointWithinDC(String keyspace, InetAddress referenceEndpoint)
+    public Collection<Range<Token>> getPrimaryRangeForEndpointWithinDC(String keyspace, InetAddressAndPort referenceEndpoint)
     {
         TokenMetadata metadata = tokenMetadata.cloneOnlyTokenMap();
         String localDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(referenceEndpoint);
-        Collection<InetAddress> localDcNodes = metadata.getTopology().getDatacenterEndpoints().get(localDC);
+        Collection<InetAddressAndPort> localDcNodes = metadata.getTopology().getDatacenterEndpoints().get(localDC);
         AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy();
 
         Collection<Range<Token>> localDCPrimaryRanges = new HashSet<>();
         for (Token token : metadata.sortedTokens())
         {
-            List<InetAddress> endpoints = strategy.calculateNaturalEndpoints(token, metadata);
-            for (InetAddress endpoint : endpoints)
+            List<InetAddressAndPort> endpoints = strategy.calculateNaturalEndpoints(token, metadata);
+            for (InetAddressAndPort endpoint : endpoints)
             {
                 if (localDcNodes.contains(endpoint))
                 {
@@ -3490,7 +3669,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      * @param ep endpoint we are interested in.
      * @return ranges for the specified endpoint.
      */
-    Collection<Range<Token>> getRangesForEndpoint(String keyspaceName, InetAddress ep)
+    Collection<Range<Token>> getRangesForEndpoint(String keyspaceName, InetAddressAndPort ep)
     {
         return Keyspace.open(keyspaceName).getReplicationStrategy().getAddressRanges().get(ep);
     }
@@ -3530,6 +3709,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      * @param key key for which we need to find the endpoint
      * @return the endpoint responsible for this key
      */
+    @Deprecated
     public List<InetAddress> getNaturalEndpoints(String keyspaceName, String cf, String key)
     {
         KeyspaceMetadata ksMetaData = Schema.instance.getKeyspaceMetadata(keyspaceName);
@@ -3540,12 +3720,32 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         if (metadata == null)
             throw new IllegalArgumentException("Unknown table '" + cf + "' in keyspace '" + keyspaceName + "'");
 
-        return getNaturalEndpoints(keyspaceName, tokenMetadata.partitioner.getToken(metadata.partitionKeyType.fromString(key)));
+        return getNaturalEndpoints(keyspaceName, tokenMetadata.partitioner.getToken(metadata.partitionKeyType.fromString(key))).stream().map(i -> i.address).collect(toList());
+    }
+
+    public List<String> getNaturalEndpointsWithPort(String keyspaceName, String cf, String key)
+    {
+        KeyspaceMetadata ksMetaData = Schema.instance.getKeyspaceMetadata(keyspaceName);
+        if (ksMetaData == null)
+            throw new IllegalArgumentException("Unknown keyspace '" + keyspaceName + "'");
+
+        TableMetadata metadata = ksMetaData.getTableOrViewNullable(cf);
+        if (metadata == null)
+            throw new IllegalArgumentException("Unknown table '" + cf + "' in keyspace '" + keyspaceName + "'");
+
+        return stringify(getNaturalEndpoints(keyspaceName, tokenMetadata.partitioner.getToken(metadata.partitionKeyType.fromString(key))), true);
     }
 
+
+    @Deprecated
     public List<InetAddress> getNaturalEndpoints(String keyspaceName, ByteBuffer key)
     {
-        return getNaturalEndpoints(keyspaceName, tokenMetadata.partitioner.getToken(key));
+        return getNaturalEndpoints(keyspaceName, tokenMetadata.partitioner.getToken(key)).stream().map(i -> i.address).collect(toList());
+    }
+
+    public List<String> getNaturalEndpointsWithPort(String keyspaceName, ByteBuffer key)
+    {
+        return stringify(getNaturalEndpoints(keyspaceName, tokenMetadata.partitioner.getToken(key)), true);
     }
 
     /**
@@ -3556,7 +3756,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      * @param pos position for which we need to find the endpoint
      * @return the endpoint responsible for this token
      */
-    public List<InetAddress> getNaturalEndpoints(String keyspaceName, RingPosition pos)
+    public List<InetAddressAndPort> getNaturalEndpoints(String keyspaceName, RingPosition pos)
     {
         return Keyspace.open(keyspaceName).getReplicationStrategy().getNaturalEndpoints(pos);
     }
@@ -3564,7 +3764,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     /**
      * Returns the endpoints currently responsible for storing the token plus pending ones
      */
-    public Iterable<InetAddress> getNaturalAndPendingEndpoints(String keyspaceName, Token token)
+    public Iterable<InetAddressAndPort> getNaturalAndPendingEndpoints(String keyspaceName, Token token)
     {
         return Iterables.concat(getNaturalEndpoints(keyspaceName, token), tokenMetadata.pendingEndpointsFor(token, keyspaceName));
     }
@@ -3577,14 +3777,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      * @param key key for which we need to find the endpoint
      * @return the endpoint responsible for this key
      */
-    public List<InetAddress> getLiveNaturalEndpoints(Keyspace keyspace, ByteBuffer key)
+    public List<InetAddressAndPort> getLiveNaturalEndpoints(Keyspace keyspace, ByteBuffer key)
     {
         return getLiveNaturalEndpoints(keyspace, tokenMetadata.decorateKey(key));
     }
 
-    public List<InetAddress> getLiveNaturalEndpoints(Keyspace keyspace, RingPosition pos)
+    public List<InetAddressAndPort> getLiveNaturalEndpoints(Keyspace keyspace, RingPosition pos)
     {
-        List<InetAddress> liveEps = new ArrayList<>();
+        List<InetAddressAndPort> liveEps = new ArrayList<>();
         getLiveNaturalEndpoints(keyspace, pos, liveEps);
         return liveEps;
     }
@@ -3597,11 +3797,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      * @param pos position for which we need to find the endpoint
      * @param liveEps the list of endpoints to mutate
      */
-    public void getLiveNaturalEndpoints(Keyspace keyspace, RingPosition pos, List<InetAddress> liveEps)
+    public void getLiveNaturalEndpoints(Keyspace keyspace, RingPosition pos, List<InetAddressAndPort> liveEps)
     {
-        List<InetAddress> endpoints = keyspace.getReplicationStrategy().getNaturalEndpoints(pos);
+        List<InetAddressAndPort> endpoints = keyspace.getReplicationStrategy().getNaturalEndpoints(pos);
 
-        for (InetAddress endpoint : endpoints)
+        for (InetAddressAndPort endpoint : endpoints)
         {
             if (FailureDetector.instance.isAlive(endpoint))
                 liveEps.add(endpoint);
@@ -3718,8 +3918,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      */
     private void startLeaving()
     {
+        Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS_WITH_PORT, valueFactory.leaving(getLocalTokens()));
         Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.leaving(getLocalTokens()));
-        tokenMetadata.addLeavingEndpoint(FBUtilities.getBroadcastAddress());
+        tokenMetadata.addLeavingEndpoint(FBUtilities.getBroadcastAddressAndPort());
         PendingRangeCalculatorService.instance.update();
     }
 
@@ -3728,7 +3929,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         TokenMetadata metadata = tokenMetadata.cloneAfterAllLeft();
         if (operationMode != Mode.LEAVING)
         {
-            if (!tokenMetadata.isMember(FBUtilities.getBroadcastAddress()))
+            if (!tokenMetadata.isMember(FBUtilities.getBroadcastAddressAndPort()))
                 throw new UnsupportedOperationException("local node is not a member of the token ring yet");
             if (metadata.getAllEndpoints().size() < 2)
                     throw new UnsupportedOperationException("no other normal nodes in the ring; decommission would be pointless");
@@ -3745,7 +3946,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         {
             PendingRangeCalculatorService.instance.blockUntilFinished();
 
-            String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
+            String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
 
             if (operationMode != Mode.LEAVING) // If we're already decommissioning there is no point checking RF/pending ranges
             {
@@ -3772,7 +3973,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                                                                     + keyspaceName + " (RF = " + rf + ", N = " + numNodes + ")."
                                                                     + " Perform a forceful decommission to ignore.");
                     }
-                    if (tokenMetadata.getPendingRanges(keyspaceName, FBUtilities.getBroadcastAddress()).size() > 0)
+                    if (tokenMetadata.getPendingRanges(keyspaceName, FBUtilities.getBroadcastAddressAndPort()).size() > 0)
                         throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring");
                 }
             }
@@ -3822,9 +4023,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     private void leaveRing()
     {
         SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.NEEDS_BOOTSTRAP);
-        tokenMetadata.removeEndpoint(FBUtilities.getBroadcastAddress());
+        tokenMetadata.removeEndpoint(FBUtilities.getBroadcastAddressAndPort());
         PendingRangeCalculatorService.instance.update();
 
+        Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS_WITH_PORT, valueFactory.left(getLocalTokens(),Gossiper.computeExpireTime()));
         Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.left(getLocalTokens(),Gossiper.computeExpireTime()));
         int delay = Math.max(RING_DELAY, Gossiper.intervalInMillis * 2);
         logger.info("Announcing that I have left the ring for {}ms", delay);
@@ -3833,11 +4035,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     private void unbootstrap(Runnable onFinish) throws ExecutionException, InterruptedException
     {
-        Map<String, Multimap<Range<Token>, InetAddress>> rangesToStream = new HashMap<>();
+        Map<String, Multimap<Range<Token>, InetAddressAndPort>> rangesToStream = new HashMap<>();
 
         for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces())
         {
-            Multimap<Range<Token>, InetAddress> rangesMM = getChangedRangesForLeaving(keyspaceName, FBUtilities.getBroadcastAddress());
+            Multimap<Range<Token>, InetAddressAndPort> rangesMM = getChangedRangesForLeaving(keyspaceName, FBUtilities.getBroadcastAddressAndPort());
 
             if (logger.isDebugEnabled())
                 logger.debug("Ranges needing transfer are [{}]", StringUtils.join(rangesMM.keySet(), ","));
@@ -3878,11 +4080,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      */
     private UUID getPreferredHintsStreamTarget()
     {
-        List<InetAddress> candidates = new ArrayList<>(StorageService.instance.getTokenMetadata().cloneAfterAllLeft().getAllEndpoints());
-     

<TRUNCATED>

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org