You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/01/25 20:12:01 UTC

[04/19] cassandra git commit: Allow storage port to be configurable per node

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/locator/PropertyFileSnitchTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/PropertyFileSnitchTest.java b/test/unit/org/apache/cassandra/locator/PropertyFileSnitchTest.java
index 522b118..df81a9b 100644
--- a/test/unit/org/apache/cassandra/locator/PropertyFileSnitchTest.java
+++ b/test/unit/org/apache/cassandra/locator/PropertyFileSnitchTest.java
@@ -18,7 +18,7 @@
 package org.apache.cassandra.locator;
 
 import java.io.IOException;
-import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -32,8 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-
-import com.google.common.net.InetAddresses;
+import java.util.regex.Matcher;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.IPartitioner;
@@ -61,7 +60,7 @@ public class PropertyFileSnitchTest
     private Path backupFile;
 
     private VersionedValue.VersionedValueFactory valueFactory;
-    private Map<InetAddress, Set<Token>> tokenMap;
+    private Map<InetAddressAndPort, Set<Token>> tokenMap;
 
     @BeforeClass
     public static void setupDD()
@@ -78,17 +77,17 @@ public class PropertyFileSnitchTest
 
         restoreOrigConfigFile();
 
-        InetAddress[] hosts = {
-            InetAddress.getByName("127.0.0.1"), // this exists in the config file
-            InetAddress.getByName("127.0.0.2"), // this exists in the config file
-            InetAddress.getByName("127.0.0.9"), // this does not exist in the config file
+        InetAddressAndPort[] hosts = {
+        InetAddressAndPort.getByName("127.0.0.1"), // this exists in the config file
+        InetAddressAndPort.getByName("127.0.0.2"), // this exists in the config file
+        InetAddressAndPort.getByName("127.0.0.9"), // this does not exist in the config file
         };
 
         IPartitioner partitioner = new RandomPartitioner();
         valueFactory = new VersionedValue.VersionedValueFactory(partitioner);
         tokenMap = new HashMap<>();
 
-        for (InetAddress host : hosts)
+        for (InetAddressAndPort host : hosts)
         {
             Set<Token> tokens = Collections.singleton(partitioner.getRandomToken());
             Gossiper.instance.initializeNodeUnsafe(host, UUID.randomUUID(), 1);
@@ -117,13 +116,21 @@ public class PropertyFileSnitchTest
         for (String line : lines)
         {
             String[] info = line.split("=");
-            if (info.length == 2 && replacements.containsKey(info[0]))
+            if (info.length == 2 && !line.startsWith("#") && !line.startsWith("default="))
             {
-                String replacement = replacements.get(info[0]);
-                if (!replacement.isEmpty()) // empty means remove this line
-                    newLines.add(info[0] + '=' + replacement);
-
-                replaced.add(info[0]);
+                InetAddressAndPort address = InetAddressAndPort.getByName(info[0].replaceAll(Matcher.quoteReplacement("\\:"), ":"));
+                String replacement = replacements.get(address.toString());
+                if (replacement != null)
+                {
+                    if (!replacement.isEmpty()) // empty means remove this line
+                        newLines.add(info[0] + '=' + replacement);
+
+                    replaced.add(address.toString());
+                }
+                else
+                {
+                    newLines.add(line);
+                }
             }
             else
             {
@@ -138,21 +145,26 @@ public class PropertyFileSnitchTest
                 continue;
 
             if (!replacement.getValue().isEmpty()) // empty means remove this line so do nothing here
-                newLines.add(replacement.getKey() + '=' + replacement.getValue());
+            {
+                String escaped = replacement.getKey().replaceAll(Matcher.quoteReplacement(":"), "\\\\:");
+                newLines.add(escaped + '=' + replacement.getValue());
+            }
         }
 
         Files.write(effectiveFile, newLines, StandardCharsets.UTF_8, StandardOpenOption.TRUNCATE_EXISTING);
     }
 
-    private void setNodeShutdown(InetAddress host)
+    private void setNodeShutdown(InetAddressAndPort host)
     {
         StorageService.instance.getTokenMetadata().removeEndpoint(host);
+        Gossiper.instance.injectApplicationState(host, ApplicationState.STATUS_WITH_PORT, valueFactory.shutdown(true));
         Gossiper.instance.injectApplicationState(host, ApplicationState.STATUS, valueFactory.shutdown(true));
         Gossiper.instance.markDead(host, Gossiper.instance.getEndpointStateForEndpoint(host));
     }
 
-    private void setNodeLive(InetAddress host)
+    private void setNodeLive(InetAddressAndPort host)
     {
+        Gossiper.instance.injectApplicationState(host, ApplicationState.STATUS_WITH_PORT, valueFactory.normal(tokenMap.get(host)));
         Gossiper.instance.injectApplicationState(host, ApplicationState.STATUS, valueFactory.normal(tokenMap.get(host)));
         Gossiper.instance.realMarkAlive(host, Gossiper.instance.getEndpointStateForEndpoint(host));
         StorageService.instance.getTokenMetadata().updateNormalTokens(tokenMap.get(host), host);
@@ -160,9 +172,9 @@ public class PropertyFileSnitchTest
 
     private static void checkEndpoint(final AbstractNetworkTopologySnitch snitch,
                                       final String endpointString, final String expectedDatacenter,
-                                      final String expectedRack)
+                                      final String expectedRack) throws UnknownHostException
     {
-        final InetAddress endpoint = InetAddresses.forString(endpointString);
+        final InetAddressAndPort endpoint = InetAddressAndPort.getByName(endpointString);
         assertEquals(expectedDatacenter, snitch.getDatacenter(endpoint));
         assertEquals(expectedRack, snitch.getRack(endpoint));
     }
@@ -174,25 +186,25 @@ public class PropertyFileSnitchTest
     @Test
     public void testChangeHostRack() throws Exception
     {
-        final InetAddress host = InetAddress.getByName("127.0.0.1");
+        final InetAddressAndPort host = InetAddressAndPort.getByName("127.0.0.1");
         final PropertyFileSnitch snitch = new PropertyFileSnitch(/*refreshPeriodInSeconds*/1);
-        checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC1");
+        checkEndpoint(snitch, host.toString(), "DC1", "RAC1");
 
         try
         {
             setNodeLive(host);
 
             Files.copy(effectiveFile, backupFile);
-            replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "DC1:RAC2"));
+            replaceConfigFile(Collections.singletonMap(host.toString(), "DC1:RAC2"));
 
             Thread.sleep(1500);
-            checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC1");
+            checkEndpoint(snitch, host.toString(), "DC1", "RAC1");
 
             setNodeShutdown(host);
-            replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "DC1:RAC2"));
+            replaceConfigFile(Collections.singletonMap(host.toString(), "DC1:RAC2"));
 
             Thread.sleep(1500);
-            checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC2");
+            checkEndpoint(snitch, host.toString(), "DC1", "RAC2");
         }
         finally
         {
@@ -208,25 +220,25 @@ public class PropertyFileSnitchTest
     @Test
     public void testChangeHostDc() throws Exception
     {
-        final InetAddress host = InetAddress.getByName("127.0.0.1");
+        final InetAddressAndPort host = InetAddressAndPort.getByName("127.0.0.1");
         final PropertyFileSnitch snitch = new PropertyFileSnitch(/*refreshPeriodInSeconds*/1);
-        checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC1");
+        checkEndpoint(snitch, host.toString(), "DC1", "RAC1");
 
         try
         {
             setNodeLive(host);
 
             Files.copy(effectiveFile, backupFile);
-            replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "DC2:RAC1"));
+            replaceConfigFile(Collections.singletonMap(host.toString(), "DC2:RAC1"));
 
             Thread.sleep(1500);
-            checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC1");
+            checkEndpoint(snitch, host.toString(), "DC1", "RAC1");
 
             setNodeShutdown(host);
-            replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "DC2:RAC1"));
+            replaceConfigFile(Collections.singletonMap(host.toString(), "DC2:RAC1"));
 
             Thread.sleep(1500);
-            checkEndpoint(snitch, host.getHostAddress(), "DC2", "RAC1");
+            checkEndpoint(snitch, host.toString(), "DC2", "RAC1");
         }
         finally
         {
@@ -243,25 +255,25 @@ public class PropertyFileSnitchTest
     @Test
     public void testAddHost() throws Exception
     {
-        final InetAddress host = InetAddress.getByName("127.0.0.9");
+        final InetAddressAndPort host = InetAddressAndPort.getByName("127.0.0.9");
         final PropertyFileSnitch snitch = new PropertyFileSnitch(/*refreshPeriodInSeconds*/1);
-        checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // default
+        checkEndpoint(snitch, host.toString(), "DC1", "r1"); // default
 
         try
         {
             setNodeLive(host);
 
             Files.copy(effectiveFile, backupFile);
-            replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "DC2:RAC2")); // add this line if not yet there
+            replaceConfigFile(Collections.singletonMap(host.toString(), "DC2:RAC2")); // add this line if not yet there
 
             Thread.sleep(1500);
-            checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // unchanged
+            checkEndpoint(snitch, host.toString(), "DC1", "r1"); // unchanged
 
             setNodeShutdown(host);
-            replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "DC2:RAC2")); // add this line if not yet there
+            replaceConfigFile(Collections.singletonMap(host.toString(), "DC2:RAC2")); // add this line if not yet there
 
             Thread.sleep(1500);
-            checkEndpoint(snitch, host.getHostAddress(), "DC2", "RAC2"); // changed
+            checkEndpoint(snitch, host.toString(), "DC2", "RAC2"); // changed
         }
         finally
         {
@@ -278,25 +290,25 @@ public class PropertyFileSnitchTest
     @Test
     public void testRemoveHost() throws Exception
     {
-        final InetAddress host = InetAddress.getByName("127.0.0.2");
+        final InetAddressAndPort host = InetAddressAndPort.getByName("127.0.0.2");
         final PropertyFileSnitch snitch = new PropertyFileSnitch(/*refreshPeriodInSeconds*/1);
-        checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC2");
+        checkEndpoint(snitch, host.toString(), "DC1", "RAC2");
 
         try
         {
             setNodeLive(host);
 
             Files.copy(effectiveFile, backupFile);
-            replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "")); // removes line if found
+            replaceConfigFile(Collections.singletonMap(host.toString(), "")); // removes line if found
 
             Thread.sleep(1500);
-            checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC2"); // unchanged
+            checkEndpoint(snitch, host.toString(), "DC1", "RAC2"); // unchanged
 
             setNodeShutdown(host);
-            replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "")); // removes line if found
+            replaceConfigFile(Collections.singletonMap(host.toString(), "")); // removes line if found
 
             Thread.sleep(1500);
-            checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // default
+            checkEndpoint(snitch, host.toString(), "DC1", "r1"); // default
         }
         finally
         {
@@ -313,9 +325,9 @@ public class PropertyFileSnitchTest
     @Test
     public void testChangeDefault() throws Exception
     {
-        final InetAddress host = InetAddress.getByName("127.0.0.9");
+        final InetAddressAndPort host = InetAddressAndPort.getByName("127.0.0.9");
         final PropertyFileSnitch snitch = new PropertyFileSnitch(/*refreshPeriodInSeconds*/1);
-        checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // default
+        checkEndpoint(snitch, host.toString(), "DC1", "r1"); // default
 
         try
         {
@@ -325,13 +337,13 @@ public class PropertyFileSnitchTest
             replaceConfigFile(Collections.singletonMap("default", "DC2:r2")); // change default
 
             Thread.sleep(1500);
-            checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // unchanged
+            checkEndpoint(snitch, host.toString(), "DC1", "r1"); // unchanged
 
             setNodeShutdown(host);
             replaceConfigFile(Collections.singletonMap("default", "DC2:r2")); // change default again (refresh file update)
 
             Thread.sleep(1500);
-            checkEndpoint(snitch, host.getHostAddress(), "DC2", "r2"); // default updated
+            checkEndpoint(snitch, host.toString(), "DC2", "r2"); // default updated
         }
         finally
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java b/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java
index 232865a..b1c3775 100644
--- a/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java
+++ b/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java
@@ -50,7 +50,7 @@ public class ReconnectableSnitchHelperTest
     public void failedAuthentication() throws Exception
     {
         DatabaseDescriptor.setInternodeAuthenticator(MessagingServiceTest.ALLOW_NOTHING_AUTHENTICATOR);
-        InetAddress address = InetAddress.getByName("127.0.0.250");
+        InetAddressAndPort address = InetAddressAndPort.getByName("127.0.0.250");
         //Should tolerate null returns by MS for the connection
         ReconnectableSnitchHelper.reconnect(address, address, null, null);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java b/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
index b83194b..a8caa72 100644
--- a/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
+++ b/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.locator;
 
-import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -55,14 +54,14 @@ public class ReplicationStrategyEndpointCacheTest
 
         strategy = getStrategyWithNewTokenMetadata(Keyspace.open(KEYSPACE).getReplicationStrategy(), tmd);
 
-        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(10)), InetAddress.getByName("127.0.0.1"));
-        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(20)), InetAddress.getByName("127.0.0.2"));
-        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(30)), InetAddress.getByName("127.0.0.3"));
-        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(40)), InetAddress.getByName("127.0.0.4"));
-        //tmd.updateNormalToken(new BigIntegerToken(String.valueOf(50)), InetAddress.getByName("127.0.0.5"));
-        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(60)), InetAddress.getByName("127.0.0.6"));
-        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(70)), InetAddress.getByName("127.0.0.7"));
-        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(80)), InetAddress.getByName("127.0.0.8"));
+        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(10)), InetAddressAndPort.getByName("127.0.0.1"));
+        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(20)), InetAddressAndPort.getByName("127.0.0.2"));
+        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(30)), InetAddressAndPort.getByName("127.0.0.3"));
+        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(40)), InetAddressAndPort.getByName("127.0.0.4"));
+        //tmd.updateNormalToken(new BigIntegerToken(String.valueOf(50)), InetAddressAndPort.getByName("127.0.0.5", null, null));
+        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(60)), InetAddressAndPort.getByName("127.0.0.6"));
+        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(70)), InetAddressAndPort.getByName("127.0.0.7"));
+        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(80)), InetAddressAndPort.getByName("127.0.0.8"));
     }
 
     @Test
@@ -90,31 +89,31 @@ public class ReplicationStrategyEndpointCacheTest
     public void runCacheRespectsTokenChangesTest(Class stratClass, Map<String, String> configOptions) throws Exception
     {
         setup(stratClass, configOptions);
-        ArrayList<InetAddress> initial;
-        ArrayList<InetAddress> endpoints;
+        ArrayList<InetAddressAndPort> initial;
+        ArrayList<InetAddressAndPort> endpoints;
 
         endpoints = strategy.getNaturalEndpoints(searchToken);
         assert endpoints.size() == 5 : StringUtils.join(endpoints, ",");
 
         // test token addition, in DC2 before existing token
         initial = strategy.getNaturalEndpoints(searchToken);
-        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(35)), InetAddress.getByName("127.0.0.5"));
+        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(35)), InetAddressAndPort.getByName("127.0.0.5"));
         endpoints = strategy.getNaturalEndpoints(searchToken);
         assert endpoints.size() == 5 : StringUtils.join(endpoints, ",");
         assert !endpoints.equals(initial);
 
         // test token removal, newly created token
         initial = strategy.getNaturalEndpoints(searchToken);
-        tmd.removeEndpoint(InetAddress.getByName("127.0.0.5"));
+        tmd.removeEndpoint(InetAddressAndPort.getByName("127.0.0.5"));
         endpoints = strategy.getNaturalEndpoints(searchToken);
         assert endpoints.size() == 5 : StringUtils.join(endpoints, ",");
-        assert !endpoints.contains(InetAddress.getByName("127.0.0.5"));
+        assert !endpoints.contains(InetAddressAndPort.getByName("127.0.0.5"));
         assert !endpoints.equals(initial);
 
         // test token change
         initial = strategy.getNaturalEndpoints(searchToken);
         //move .8 after search token but before other DC3
-        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(25)), InetAddress.getByName("127.0.0.8"));
+        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(25)), InetAddressAndPort.getByName("127.0.0.8"));
         endpoints = strategy.getNaturalEndpoints(searchToken);
         assert endpoints.size() == 5 : StringUtils.join(endpoints, ",");
         assert !endpoints.equals(initial);
@@ -129,7 +128,7 @@ public class ReplicationStrategyEndpointCacheTest
             super(keyspaceName, tokenMetadata, snitch, configOptions);
         }
 
-        public List<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
+        public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
         {
             assert !called : "calculateNaturalEndpoints was already called, result should have been cached";
             called = true;
@@ -146,7 +145,7 @@ public class ReplicationStrategyEndpointCacheTest
             super(keyspaceName, tokenMetadata, snitch, configOptions);
         }
 
-        public List<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
+        public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
         {
             assert !called : "calculateNaturalEndpoints was already called, result should have been cached";
             called = true;
@@ -163,7 +162,7 @@ public class ReplicationStrategyEndpointCacheTest
             super(keyspaceName, tokenMetadata, snitch, configOptions);
         }
 
-        public List<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
+        public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
         {
             assert !called : "calculateNaturalEndpoints was already called, result should have been cached";
             called = true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
index f97a6e5..fe77b0e 100644
--- a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.locator;
 
-import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -98,22 +97,22 @@ public class SimpleStrategyTest
         {
             tmd = new TokenMetadata();
             strategy = getStrategy(keyspaceName, tmd);
-            List<InetAddress> hosts = new ArrayList<InetAddress>();
+            List<InetAddressAndPort> hosts = new ArrayList<>();
             for (int i = 0; i < endpointTokens.length; i++)
             {
-                InetAddress ep = InetAddress.getByName("127.0.0." + String.valueOf(i + 1));
+                InetAddressAndPort ep = InetAddressAndPort.getByName("127.0.0." + String.valueOf(i + 1));
                 tmd.updateNormalToken(endpointTokens[i], ep);
                 hosts.add(ep);
             }
 
             for (int i = 0; i < keyTokens.length; i++)
             {
-                List<InetAddress> endpoints = strategy.getNaturalEndpoints(keyTokens[i]);
+                List<InetAddressAndPort> endpoints = strategy.getNaturalEndpoints(keyTokens[i]);
                 assertEquals(strategy.getReplicationFactor(), endpoints.size());
-                List<InetAddress> correctEndpoints = new ArrayList<InetAddress>();
+                List<InetAddressAndPort> correctEndpoints = new ArrayList<>();
                 for (int j = 0; j < endpoints.size(); j++)
                     correctEndpoints.add(hosts.get((i + j + 1) % hosts.size()));
-                assertEquals(new HashSet<InetAddress>(correctEndpoints), new HashSet<InetAddress>(endpoints));
+                assertEquals(new HashSet<>(correctEndpoints), new HashSet<>(endpoints));
             }
         }
     }
@@ -135,17 +134,17 @@ public class SimpleStrategyTest
             keyTokens[i] = new BigIntegerToken(String.valueOf(RING_SIZE * 2 * i + RING_SIZE));
         }
 
-        List<InetAddress> hosts = new ArrayList<InetAddress>();
+        List<InetAddressAndPort> hosts = new ArrayList<>();
         for (int i = 0; i < endpointTokens.length; i++)
         {
-            InetAddress ep = InetAddress.getByName("127.0.0." + String.valueOf(i + 1));
+            InetAddressAndPort ep = InetAddressAndPort.getByName("127.0.0." + String.valueOf(i + 1));
             tmd.updateNormalToken(endpointTokens[i], ep);
             hosts.add(ep);
         }
 
         // bootstrap at the end of the ring
         Token bsToken = new BigIntegerToken(String.valueOf(210));
-        InetAddress bootstrapEndpoint = InetAddress.getByName("127.0.0.11");
+        InetAddressAndPort bootstrapEndpoint = InetAddressAndPort.getByName("127.0.0.11");
         tmd.addBootstrapToken(bsToken, bootstrapEndpoint);
 
         AbstractReplicationStrategy strategy = null;
@@ -159,7 +158,7 @@ public class SimpleStrategyTest
 
             for (int i = 0; i < keyTokens.length; i++)
             {
-                Collection<InetAddress> endpoints = tmd.getWriteEndpoints(keyTokens[i], keyspaceName, strategy.getNaturalEndpoints(keyTokens[i]));
+                Collection<InetAddressAndPort> endpoints = tmd.getWriteEndpoints(keyTokens[i], keyspaceName, strategy.getNaturalEndpoints(keyTokens[i]));
                 assertTrue(endpoints.size() >= replicationFactor);
 
                 for (int j = 0; j < replicationFactor; j++)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
index e5a86fd..b589d2d 100644
--- a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
+++ b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.locator;
 
-import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Map;
@@ -56,8 +55,8 @@ public class TokenMetadataTest
     {
         DatabaseDescriptor.daemonInitialization();
         tmd = StorageService.instance.getTokenMetadata();
-        tmd.updateNormalToken(token(ONE), InetAddress.getByName("127.0.0.1"));
-        tmd.updateNormalToken(token(SIX), InetAddress.getByName("127.0.0.6"));
+        tmd.updateNormalToken(token(ONE), InetAddressAndPort.getByName("127.0.0.1"));
+        tmd.updateNormalToken(token(SIX), InetAddressAndPort.getByName("127.0.0.6"));
     }
 
     private static void testRingIterator(ArrayList<Token> ring, String start, boolean includeMin, String... expected)
@@ -98,8 +97,8 @@ public class TokenMetadataTest
     @Test
     public void testTopologyUpdate_RackConsolidation() throws UnknownHostException
     {
-        final InetAddress first = InetAddress.getByName("127.0.0.1");
-        final InetAddress second = InetAddress.getByName("127.0.0.6");
+        final InetAddressAndPort first = InetAddressAndPort.getByName("127.0.0.1");
+        final InetAddressAndPort second = InetAddressAndPort.getByName("127.0.0.6");
         final String DATA_CENTER = "datacenter1";
         final String RACK1 = "rack1";
         final String RACK2 = "rack2";
@@ -107,19 +106,19 @@ public class TokenMetadataTest
         DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch()
         {
             @Override
-            public String getRack(InetAddress endpoint)
+            public String getRack(InetAddressAndPort endpoint)
             {
                 return endpoint.equals(first) ? RACK1 : RACK2;
             }
 
             @Override
-            public String getDatacenter(InetAddress endpoint)
+            public String getDatacenter(InetAddressAndPort endpoint)
             {
                 return DATA_CENTER;
             }
 
             @Override
-            public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+            public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2)
             {
                 return 0;
             }
@@ -134,14 +133,14 @@ public class TokenMetadataTest
         TokenMetadata.Topology topology = tokenMetadata.getTopology();
         assertNotNull(topology);
 
-        Multimap<String, InetAddress> allEndpoints = topology.getDatacenterEndpoints();
+        Multimap<String, InetAddressAndPort> allEndpoints = topology.getDatacenterEndpoints();
         assertNotNull(allEndpoints);
         assertTrue(allEndpoints.size() == 2);
         assertTrue(allEndpoints.containsKey(DATA_CENTER));
         assertTrue(allEndpoints.get(DATA_CENTER).contains(first));
         assertTrue(allEndpoints.get(DATA_CENTER).contains(second));
 
-        Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks();
+        Map<String, Multimap<String, InetAddressAndPort>> racks = topology.getDatacenterRacks();
         assertNotNull(racks);
         assertTrue(racks.size() == 1);
         assertTrue(racks.containsKey(DATA_CENTER));
@@ -154,19 +153,19 @@ public class TokenMetadataTest
         DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch()
         {
             @Override
-            public String getRack(InetAddress endpoint)
+            public String getRack(InetAddressAndPort endpoint)
             {
                 return RACK1;
             }
 
             @Override
-            public String getDatacenter(InetAddress endpoint)
+            public String getDatacenter(InetAddressAndPort endpoint)
             {
                 return DATA_CENTER;
             }
 
             @Override
-            public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+            public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2)
             {
                 return 0;
             }
@@ -196,8 +195,8 @@ public class TokenMetadataTest
     @Test
     public void testTopologyUpdate_RackExpansion() throws UnknownHostException
     {
-        final InetAddress first = InetAddress.getByName("127.0.0.1");
-        final InetAddress second = InetAddress.getByName("127.0.0.6");
+        final InetAddressAndPort first = InetAddressAndPort.getByName("127.0.0.1");
+        final InetAddressAndPort second = InetAddressAndPort.getByName("127.0.0.6");
         final String DATA_CENTER = "datacenter1";
         final String RACK1 = "rack1";
         final String RACK2 = "rack2";
@@ -205,19 +204,19 @@ public class TokenMetadataTest
         DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch()
         {
             @Override
-            public String getRack(InetAddress endpoint)
+            public String getRack(InetAddressAndPort endpoint)
             {
                 return RACK1;
             }
 
             @Override
-            public String getDatacenter(InetAddress endpoint)
+            public String getDatacenter(InetAddressAndPort endpoint)
             {
                 return DATA_CENTER;
             }
 
             @Override
-            public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+            public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2)
             {
                 return 0;
             }
@@ -232,14 +231,14 @@ public class TokenMetadataTest
         TokenMetadata.Topology topology = tokenMetadata.getTopology();
         assertNotNull(topology);
 
-        Multimap<String, InetAddress> allEndpoints = topology.getDatacenterEndpoints();
+        Multimap<String, InetAddressAndPort> allEndpoints = topology.getDatacenterEndpoints();
         assertNotNull(allEndpoints);
         assertTrue(allEndpoints.size() == 2);
         assertTrue(allEndpoints.containsKey(DATA_CENTER));
         assertTrue(allEndpoints.get(DATA_CENTER).contains(first));
         assertTrue(allEndpoints.get(DATA_CENTER).contains(second));
 
-        Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks();
+        Map<String, Multimap<String, InetAddressAndPort>> racks = topology.getDatacenterRacks();
         assertNotNull(racks);
         assertTrue(racks.size() == 1);
         assertTrue(racks.containsKey(DATA_CENTER));
@@ -252,19 +251,19 @@ public class TokenMetadataTest
         DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch()
         {
             @Override
-            public String getRack(InetAddress endpoint)
+            public String getRack(InetAddressAndPort endpoint)
             {
                 return endpoint.equals(first) ? RACK1 : RACK2;
             }
 
             @Override
-            public String getDatacenter(InetAddress endpoint)
+            public String getDatacenter(InetAddressAndPort endpoint)
             {
                 return DATA_CENTER;
             }
 
             @Override
-            public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+            public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2)
             {
                 return 0;
             }
@@ -293,8 +292,8 @@ public class TokenMetadataTest
     @Test
     public void testEndpointSizes() throws UnknownHostException
     {
-        final InetAddress first = InetAddress.getByName("127.0.0.1");
-        final InetAddress second = InetAddress.getByName("127.0.0.6");
+        final InetAddressAndPort first = InetAddressAndPort.getByName("127.0.0.1");
+        final InetAddressAndPort second = InetAddressAndPort.getByName("127.0.0.6");
 
         tmd.updateNormalToken(token(ONE), first);
         tmd.updateNormalToken(token(SIX), second);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/metrics/HintedHandOffMetricsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/metrics/HintedHandOffMetricsTest.java b/test/unit/org/apache/cassandra/metrics/HintedHandOffMetricsTest.java
index 2394e0c..2b13715 100644
--- a/test/unit/org/apache/cassandra/metrics/HintedHandOffMetricsTest.java
+++ b/test/unit/org/apache/cassandra/metrics/HintedHandOffMetricsTest.java
@@ -20,7 +20,6 @@
  */
 package org.apache.cassandra.metrics;
 
-import java.net.InetAddress;
 import java.util.Map;
 import java.util.UUID;
 
@@ -35,6 +34,7 @@ import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.hints.HintsService;
+import org.apache.cassandra.locator.InetAddressAndPort;
 
 import static org.junit.Assert.assertEquals;
 import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
@@ -53,11 +53,15 @@ public class HintedHandOffMetricsTest
         DatabaseDescriptor.getHintsDirectory().mkdirs();
 
         for (int i = 0; i < 99; i++)
-            HintsService.instance.metrics.incrPastWindow(InetAddress.getByName("127.0.0.1"));
+            HintsService.instance.metrics.incrPastWindow(InetAddressAndPort.getLocalHost());
         HintsService.instance.metrics.log();
 
-        UntypedResultSet rows = executeInternal("SELECT hints_dropped FROM system." + SystemKeyspace.PEER_EVENTS);
+        UntypedResultSet rows = executeInternal("SELECT hints_dropped FROM system." + SystemKeyspace.PEER_EVENTS_V2);
         Map<UUID, Integer> returned = rows.one().getMap("hints_dropped", UUIDType.instance, Int32Type.instance);
         assertEquals(Iterators.getLast(returned.values().iterator()).intValue(), 99);
+
+        rows = executeInternal("SELECT hints_dropped FROM system." + SystemKeyspace.LEGACY_PEER_EVENTS);
+        returned = rows.one().getMap("hints_dropped", UUIDType.instance, Int32Type.instance);
+        assertEquals(Iterators.getLast(returned.values().iterator()).intValue(), 99);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/CompactEndpointSerializationHelperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/CompactEndpointSerializationHelperTest.java b/test/unit/org/apache/cassandra/net/CompactEndpointSerializationHelperTest.java
new file mode 100644
index 0000000..3232455
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/CompactEndpointSerializationHelperTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+import static org.junit.Assert.assertEquals;
+
+public class CompactEndpointSerializationHelperTest
+{
+
+    @Test
+    public void testRoundtrip() throws Exception
+    {
+        InetAddressAndPort ipv4 = InetAddressAndPort.getByName("127.0.0.1:42");
+        InetAddressAndPort ipv6 = InetAddressAndPort.getByName("[2001:db8:0:0:0:ff00:42:8329]:42");
+
+        testAddress(ipv4, MessagingService.VERSION_30);
+        testAddress(ipv6, MessagingService.VERSION_30);
+        testAddress(ipv4, MessagingService.current_version);
+        testAddress(ipv6, MessagingService.current_version);
+    }
+
+    private void testAddress(InetAddressAndPort address, int version) throws Exception
+    {
+        ByteBuffer out;
+        try (DataOutputBuffer dob = new DataOutputBuffer())
+        {
+            CompactEndpointSerializationHelper.instance.serialize(address, dob, version);
+            out = dob.buffer();
+        }
+        assertEquals(out.remaining(), CompactEndpointSerializationHelper.instance.serializedSize(address, version));
+
+        InetAddressAndPort roundtripped;
+        try (DataInputBuffer dib = new DataInputBuffer(out, false))
+        {
+            roundtripped = CompactEndpointSerializationHelper.instance.deserialize(dib, version);
+        }
+
+        if (version >= MessagingService.VERSION_40)
+        {
+            assertEquals(address, roundtripped);
+        }
+        else
+        {
+            assertEquals(roundtripped.address, address.address);
+            assertEquals(7000, roundtripped.port);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/ForwardToContainerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/ForwardToContainerTest.java b/test/unit/org/apache/cassandra/net/ForwardToContainerTest.java
new file mode 100644
index 0000000..195d734
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/ForwardToContainerTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Test;
+
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ForwardToContainerTest
+{
+    @Test
+    public void testCurrent() throws Exception
+    {
+        testVersion(MessagingService.current_version);
+    }
+
+    @Test
+    public void test30() throws Exception
+    {
+        testVersion(MessagingService.VERSION_30);
+    }
+
+    private void testVersion(int version) throws Exception
+    {
+        InetAddressAndPort.initializeDefaultPort(65532);
+        List<InetAddressAndPort> addresses = ImmutableList.of(InetAddressAndPort.getByNameOverrideDefaults("127.0.0.1", 42),
+                                                              InetAddressAndPort.getByName("127.0.0.1"),
+                                                              InetAddressAndPort.getByName("127.0.0.1:7000"),
+                                                              InetAddressAndPort.getByNameOverrideDefaults("2001:0db8:0000:0000:0000:ff00:0042:8329", 42),
+                                                              InetAddressAndPort.getByName("2001:0db8:0000:0000:0000:ff00:0042:8329"),
+                                                              InetAddressAndPort.getByName("[2001:0db8:0000:0000:0000:ff00:0042:8329]:7000"));
+
+        ForwardToContainer ftc = new ForwardToContainer(addresses, new int[] { 44, 45, 46, 47, 48, 49 });
+        ByteBuffer buffer;
+        try (DataOutputBuffer dob = new DataOutputBuffer())
+        {
+            ForwardToSerializer.instance.serialize(ftc, dob, version);
+            buffer = dob.buffer();
+        }
+
+        assertEquals(buffer.remaining(), ForwardToSerializer.instance.serializedSize(ftc, version));
+
+        ForwardToContainer deserialized;
+        try (DataInputBuffer dib = new DataInputBuffer(buffer, false))
+        {
+            deserialized = ForwardToSerializer.instance.deserialize(dib, version);
+        }
+
+        assertTrue(Arrays.equals(ftc.messageIds, deserialized.messageIds));
+
+        Iterator<InetAddressAndPort> iterator = deserialized.targets.iterator();
+        if (version >= MessagingService.VERSION_40)
+        {
+            for (int ii = 0; ii < addresses.size(); ii++)
+            {
+                InetAddressAndPort original = addresses.get(ii);
+                InetAddressAndPort roundtripped = iterator.next();
+                assertEquals(original, roundtripped);
+            }
+        }
+        else
+        {
+            for (int ii = 0; ii < addresses.size(); ii++)
+            {
+                InetAddressAndPort original = addresses.get(ii);
+                InetAddressAndPort roundtripped = iterator.next();
+                assertEquals(original.address, roundtripped.address);
+                //3.0 can't send port numbers so you get the defaults
+                assertEquals(65532, roundtripped.port);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/Matcher.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/Matcher.java b/test/unit/org/apache/cassandra/net/Matcher.java
index cd1b667..27b685f 100644
--- a/test/unit/org/apache/cassandra/net/Matcher.java
+++ b/test/unit/org/apache/cassandra/net/Matcher.java
@@ -17,7 +17,7 @@
  */
 package org.apache.cassandra.net;
 
-import java.net.InetAddress;
+import org.apache.cassandra.locator.InetAddressAndPort;
 
 /**
  * Predicate based on intercepted, outgoing messange and the message's destination address.
@@ -28,5 +28,5 @@ public interface Matcher<T>
      * @param obj intercepted outgoing message
      * @param to  destination address
      */
-    public boolean matches(MessageOut<T> obj, InetAddress to);
+    public boolean matches(MessageOut<T> obj, InetAddressAndPort to);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/MatcherResponse.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MatcherResponse.java b/test/unit/org/apache/cassandra/net/MatcherResponse.java
index 6cd8085..7a1772a 100644
--- a/test/unit/org/apache/cassandra/net/MatcherResponse.java
+++ b/test/unit/org/apache/cassandra/net/MatcherResponse.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.net;
 
-import java.net.InetAddress;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Queue;
@@ -27,6 +26,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
+
 /**
  * Sends a response for an incoming message with a matching {@link Matcher}.
  * The actual behavior by any instance of this class can be inspected by
@@ -76,7 +77,7 @@ public class MatcherResponse
      * Respond with the message created by the provided function that will be called with each intercepted outbound message.
      * @param fnResponse    function to call for creating reply based on intercepted message and target address
      */
-    public <T, S> MockMessagingSpy respond(BiFunction<MessageOut<T>, InetAddress, MessageIn<S>> fnResponse)
+    public <T, S> MockMessagingSpy respond(BiFunction<MessageOut<T>, InetAddressAndPort, MessageIn<S>> fnResponse)
     {
         return respondN(fnResponse, Integer.MAX_VALUE);
     }
@@ -101,7 +102,7 @@ public class MatcherResponse
      */
     public <T, S> MockMessagingSpy respondNWithPayloadForEachReceiver(Function<MessageOut<T>, S> fnResponse, MessagingService.Verb verb, int limit)
     {
-        return respondN((MessageOut<T> msg, InetAddress to) -> {
+        return respondN((MessageOut<T> msg, InetAddressAndPort to) -> {
                     S payload = fnResponse.apply(msg);
                     if (payload == null)
                         return null;
@@ -147,7 +148,7 @@ public class MatcherResponse
      * each intercepted outbound message.
      * @param fnResponse    function to call for creating reply based on intercepted message and target address
      */
-    public <T, S> MockMessagingSpy respondN(BiFunction<MessageOut<T>, InetAddress, MessageIn<S>> fnResponse, int limit)
+    public <T, S> MockMessagingSpy respondN(BiFunction<MessageOut<T>, InetAddressAndPort, MessageIn<S>> fnResponse, int limit)
     {
         limitCounter.set(limit);
 
@@ -155,7 +156,7 @@ public class MatcherResponse
 
         sink = new IMessageSink()
         {
-            public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
+            public boolean allowOutgoingMessage(MessageOut message, int id, InetAddressAndPort to)
             {
                 // prevent outgoing message from being send in case matcher indicates a match
                 // and instead send the mocked response

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
index f0a959e..4ce3422 100644
--- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
+++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
@@ -48,6 +48,10 @@ import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.monitoring.ApproximateTime;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessagingService.ServerChannel;
 import org.apache.cassandra.net.async.NettyFactory;
 import org.apache.cassandra.net.async.OutboundConnectionIdentifier;
@@ -81,7 +85,7 @@ public class MessagingServiceTest
     };
     private static IInternodeAuthenticator originalAuthenticator;
     private static ServerEncryptionOptions originalServerEncryptionOptions;
-    private static InetAddress originalListenAddress;
+    private static InetAddressAndPort originalListenAddress;
 
     private final MessagingService messagingService = MessagingService.test();
 
@@ -93,7 +97,7 @@ public class MessagingServiceTest
         DatabaseDescriptor.setBroadcastAddress(InetAddress.getByName("127.0.0.1"));
         originalAuthenticator = DatabaseDescriptor.getInternodeAuthenticator();
         originalServerEncryptionOptions = DatabaseDescriptor.getServerEncryptionOptions();
-        originalListenAddress = DatabaseDescriptor.getListenAddress();
+        originalListenAddress = InetAddressAndPort.getByAddressOverrideDefaults(DatabaseDescriptor.getListenAddress(), DatabaseDescriptor.getStoragePort());
     }
 
     private static int metricScopeId = 0;
@@ -103,8 +107,8 @@ public class MessagingServiceTest
     {
         messagingService.resetDroppedMessagesMap(Integer.toString(metricScopeId++));
         MockBackPressureStrategy.applied = false;
-        messagingService.destroyConnectionPool(InetAddress.getByName("127.0.0.2"));
-        messagingService.destroyConnectionPool(InetAddress.getByName("127.0.0.3"));
+        messagingService.destroyConnectionPool(InetAddressAndPort.getByName("127.0.0.2"));
+        messagingService.destroyConnectionPool(InetAddressAndPort.getByName("127.0.0.3"));
     }
 
     @After
@@ -113,7 +117,7 @@ public class MessagingServiceTest
         DatabaseDescriptor.setInternodeAuthenticator(originalAuthenticator);
         DatabaseDescriptor.setServerEncryptionOptions(originalServerEncryptionOptions);
         DatabaseDescriptor.setShouldListenOnBroadcastAddress(false);
-        DatabaseDescriptor.setListenAddress(originalListenAddress);
+        DatabaseDescriptor.setListenAddress(originalListenAddress.address);
         FBUtilities.reset();
     }
 
@@ -221,44 +225,44 @@ public class MessagingServiceTest
     @Test
     public void testUpdatesBackPressureOnSendWhenEnabledAndWithSupportedCallback() throws UnknownHostException
     {
-        MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getBackPressureState(InetAddress.getByName("127.0.0.2"));
+        MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getBackPressureState(InetAddressAndPort.getByName("127.0.0.2"));
         IAsyncCallback bpCallback = new BackPressureCallback();
         IAsyncCallback noCallback = new NoBackPressureCallback();
         MessageOut<?> ignored = null;
 
         DatabaseDescriptor.setBackPressureEnabled(true);
-        messagingService.updateBackPressureOnSend(InetAddress.getByName("127.0.0.2"), noCallback, ignored);
+        messagingService.updateBackPressureOnSend(InetAddressAndPort.getByName("127.0.0.2"), noCallback, ignored);
         assertFalse(backPressureState.onSend);
 
         DatabaseDescriptor.setBackPressureEnabled(false);
-        messagingService.updateBackPressureOnSend(InetAddress.getByName("127.0.0.2"), bpCallback, ignored);
+        messagingService.updateBackPressureOnSend(InetAddressAndPort.getByName("127.0.0.2"), bpCallback, ignored);
         assertFalse(backPressureState.onSend);
 
         DatabaseDescriptor.setBackPressureEnabled(true);
-        messagingService.updateBackPressureOnSend(InetAddress.getByName("127.0.0.2"), bpCallback, ignored);
+        messagingService.updateBackPressureOnSend(InetAddressAndPort.getByName("127.0.0.2"), bpCallback, ignored);
         assertTrue(backPressureState.onSend);
     }
 
     @Test
     public void testUpdatesBackPressureOnReceiveWhenEnabledAndWithSupportedCallback() throws UnknownHostException
     {
-        MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getBackPressureState(InetAddress.getByName("127.0.0.2"));
+        MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getBackPressureState(InetAddressAndPort.getByName("127.0.0.2"));
         IAsyncCallback bpCallback = new BackPressureCallback();
         IAsyncCallback noCallback = new NoBackPressureCallback();
         boolean timeout = false;
 
         DatabaseDescriptor.setBackPressureEnabled(true);
-        messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), noCallback, timeout);
+        messagingService.updateBackPressureOnReceive(InetAddressAndPort.getByName("127.0.0.2"), noCallback, timeout);
         assertFalse(backPressureState.onReceive);
         assertFalse(backPressureState.onTimeout);
 
         DatabaseDescriptor.setBackPressureEnabled(false);
-        messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), bpCallback, timeout);
+        messagingService.updateBackPressureOnReceive(InetAddressAndPort.getByName("127.0.0.2"), bpCallback, timeout);
         assertFalse(backPressureState.onReceive);
         assertFalse(backPressureState.onTimeout);
 
         DatabaseDescriptor.setBackPressureEnabled(true);
-        messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), bpCallback, timeout);
+        messagingService.updateBackPressureOnReceive(InetAddressAndPort.getByName("127.0.0.2"), bpCallback, timeout);
         assertTrue(backPressureState.onReceive);
         assertFalse(backPressureState.onTimeout);
     }
@@ -266,23 +270,23 @@ public class MessagingServiceTest
     @Test
     public void testUpdatesBackPressureOnTimeoutWhenEnabledAndWithSupportedCallback() throws UnknownHostException
     {
-        MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getBackPressureState(InetAddress.getByName("127.0.0.2"));
+        MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getBackPressureState(InetAddressAndPort.getByName("127.0.0.2"));
         IAsyncCallback bpCallback = new BackPressureCallback();
         IAsyncCallback noCallback = new NoBackPressureCallback();
         boolean timeout = true;
 
         DatabaseDescriptor.setBackPressureEnabled(true);
-        messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), noCallback, timeout);
+        messagingService.updateBackPressureOnReceive(InetAddressAndPort.getByName("127.0.0.2"), noCallback, timeout);
         assertFalse(backPressureState.onReceive);
         assertFalse(backPressureState.onTimeout);
 
         DatabaseDescriptor.setBackPressureEnabled(false);
-        messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), bpCallback, timeout);
+        messagingService.updateBackPressureOnReceive(InetAddressAndPort.getByName("127.0.0.2"), bpCallback, timeout);
         assertFalse(backPressureState.onReceive);
         assertFalse(backPressureState.onTimeout);
 
         DatabaseDescriptor.setBackPressureEnabled(true);
-        messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), bpCallback, timeout);
+        messagingService.updateBackPressureOnReceive(InetAddressAndPort.getByName("127.0.0.2"), bpCallback, timeout);
         assertFalse(backPressureState.onReceive);
         assertTrue(backPressureState.onTimeout);
     }
@@ -291,11 +295,11 @@ public class MessagingServiceTest
     public void testAppliesBackPressureWhenEnabled() throws UnknownHostException
     {
         DatabaseDescriptor.setBackPressureEnabled(false);
-        messagingService.applyBackPressure(Arrays.asList(InetAddress.getByName("127.0.0.2")), ONE_SECOND);
+        messagingService.applyBackPressure(Arrays.asList(InetAddressAndPort.getByName("127.0.0.2")), ONE_SECOND);
         assertFalse(MockBackPressureStrategy.applied);
 
         DatabaseDescriptor.setBackPressureEnabled(true);
-        messagingService.applyBackPressure(Arrays.asList(InetAddress.getByName("127.0.0.2")), ONE_SECOND);
+        messagingService.applyBackPressure(Arrays.asList(InetAddressAndPort.getByName("127.0.0.2")), ONE_SECOND);
         assertTrue(MockBackPressureStrategy.applied);
     }
 
@@ -303,13 +307,13 @@ public class MessagingServiceTest
     public void testDoesntApplyBackPressureToBroadcastAddress() throws UnknownHostException
     {
         DatabaseDescriptor.setBackPressureEnabled(true);
-        messagingService.applyBackPressure(Arrays.asList(InetAddress.getByName("127.0.0.1")), ONE_SECOND);
+        messagingService.applyBackPressure(Arrays.asList(InetAddressAndPort.getByName("127.0.0.1")), ONE_SECOND);
         assertFalse(MockBackPressureStrategy.applied);
     }
 
     private static void addDCLatency(long sentAt, long nowTime) throws IOException
     {
-        MessageIn.deriveConstructionTime(InetAddress.getLocalHost(), (int)sentAt, nowTime);
+        MessageIn.deriveConstructionTime(InetAddressAndPort.getLocalHost(), (int)sentAt, nowTime);
     }
 
     public static class MockBackPressureStrategy implements BackPressureStrategy<MockBackPressureStrategy.MockBackPressureState>
@@ -328,19 +332,19 @@ public class MessagingServiceTest
         }
 
         @Override
-        public MockBackPressureState newState(InetAddress host)
+        public MockBackPressureState newState(InetAddressAndPort host)
         {
             return new MockBackPressureState(host);
         }
 
         public static class MockBackPressureState implements BackPressureState
         {
-            private final InetAddress host;
+            private final InetAddressAndPort host;
             public volatile boolean onSend = false;
             public volatile boolean onReceive = false;
             public volatile boolean onTimeout = false;
 
-            private MockBackPressureState(InetAddress host)
+            private MockBackPressureState(InetAddressAndPort host)
             {
                 this.host = host;
             }
@@ -370,7 +374,7 @@ public class MessagingServiceTest
             }
 
             @Override
-            public InetAddress getHost()
+            public InetAddressAndPort getHost()
             {
                 return host;
             }
@@ -429,7 +433,7 @@ public class MessagingServiceTest
     {
         MessagingService ms = MessagingService.instance();
         DatabaseDescriptor.setInternodeAuthenticator(ALLOW_NOTHING_AUTHENTICATOR);
-        InetAddress address = InetAddress.getByName("127.0.0.250");
+        InetAddressAndPort address = InetAddressAndPort.getByName("127.0.0.250");
 
         //Should return null
         MessageOut messageOut = new MessageOut(MessagingService.Verb.GOSSIP_DIGEST_ACK);
@@ -444,20 +448,20 @@ public class MessagingServiceTest
     public void testOutboundMessagingConnectionCleansUp() throws Exception
     {
         MessagingService ms = MessagingService.instance();
-        InetSocketAddress local = new InetSocketAddress("127.0.0.1", 9876);
-        InetSocketAddress remote = new InetSocketAddress("127.0.0.2", 9876);
+        InetAddressAndPort local = InetAddressAndPort.getByNameOverrideDefaults("127.0.0.1", 9876);
+        InetAddressAndPort remote = InetAddressAndPort.getByNameOverrideDefaults("127.0.0.2", 9876);
 
-        OutboundMessagingPool pool = new OutboundMessagingPool(remote, local, null, new MockBackPressureStrategy(null).newState(remote.getAddress()), ALLOW_NOTHING_AUTHENTICATOR);
-        ms.channelManagers.put(remote.getAddress(), pool);
+        OutboundMessagingPool pool = new OutboundMessagingPool(remote, local, null, new MockBackPressureStrategy(null).newState(remote), ALLOW_NOTHING_AUTHENTICATOR);
+        ms.channelManagers.put(remote, pool);
         pool.sendMessage(new MessageOut(MessagingService.Verb.GOSSIP_DIGEST_ACK), 0);
-        assertFalse(ms.channelManagers.containsKey(remote.getAddress()));
+        assertFalse(ms.channelManagers.containsKey(remote));
     }
 
     @Test
-    public void reconnectWithNewIp() throws UnknownHostException
+    public void reconnectWithNewIp() throws Exception
     {
-        InetAddress publicIp = InetAddress.getByName("127.0.0.2");
-        InetAddress privateIp = InetAddress.getByName("127.0.0.3");
+        InetAddressAndPort publicIp = InetAddressAndPort.getByName("127.0.0.2");
+        InetAddressAndPort privateIp = InetAddressAndPort.getByName("127.0.0.3");
 
         // reset the preferred IP value, for good test hygene
         SystemKeyspace.updatePreferredIP(publicIp, publicIp);
@@ -485,8 +489,8 @@ public class MessagingServiceTest
                 Assert.assertEquals(0, serverChannel.size());
 
             // now, create a connection and make sure it's in a channel group
-            InetSocketAddress server = new InetSocketAddress(FBUtilities.getBroadcastAddress(), DatabaseDescriptor.getStoragePort());
-            OutboundConnectionIdentifier id = OutboundConnectionIdentifier.small(new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 0), server);
+            InetAddressAndPort server = FBUtilities.getBroadcastAddressAndPort();
+            OutboundConnectionIdentifier id = OutboundConnectionIdentifier.small(InetAddressAndPort.getByNameOverrideDefaults("127.0.0.2", 0), server);
 
             CountDownLatch latch = new CountDownLatch(1);
             OutboundConnectionParams params = OutboundConnectionParams.builder()
@@ -596,7 +600,7 @@ public class MessagingServiceTest
         if (listenOnBroadcastAddr)
         {
             DatabaseDescriptor.setShouldListenOnBroadcastAddress(true);
-            listenAddress = InetAddresses.increment(FBUtilities.getBroadcastAddress());
+            listenAddress = InetAddresses.increment(FBUtilities.getBroadcastAddressAndPort().address);
             DatabaseDescriptor.setListenAddress(listenAddress);
             FBUtilities.reset();
         }
@@ -627,7 +631,7 @@ public class MessagingServiceTest
 
                     if (serverEncryptionOptions.enable_legacy_ssl_storage_port)
                     {
-                        if (legacySslPort == serverChannel.getAddress().getPort())
+                        if (legacySslPort == serverChannel.getAddress().port)
                         {
                             foundLegacyListenSslAddress = true;
                             Assert.assertEquals(ServerChannel.SecurityLevel.REQUIRED, serverChannel.getSecurityLevel());
@@ -646,7 +650,7 @@ public class MessagingServiceTest
                 int found = 0;
                 for (ServerChannel serverChannel : messagingService.serverChannels)
                 {
-                    if (serverChannel.getAddress().getAddress().equals(listenAddress))
+                    if (serverChannel.getAddress().address.equals(listenAddress))
                         found++;
                 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/MockMessagingService.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MockMessagingService.java b/test/unit/org/apache/cassandra/net/MockMessagingService.java
index 0412759..4ea1bf5 100644
--- a/test/unit/org/apache/cassandra/net/MockMessagingService.java
+++ b/test/unit/org/apache/cassandra/net/MockMessagingService.java
@@ -17,14 +17,15 @@
  */
 package org.apache.cassandra.net;
 
-import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.function.Predicate;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
+
 /**
  * Starting point for mocking {@link MessagingService} interactions. Outgoing messages can be
  * intercepted by first creating a {@link MatcherResponse} by calling {@link MockMessagingService#when(Matcher)}.
- * Alternatively {@link Matcher}s can be created by using helper methods such as {@link #to(InetAddress)},
+ * Alternatively {@link Matcher}s can be created by using helper methods such as {@link #to(InetAddressAndPort)},
  * {@link #verb(MessagingService.Verb)} or {@link #payload(Predicate)} and may also be
  * nested using {@link MockMessagingService#all(Matcher[])} or {@link MockMessagingService#any(Matcher[])}.
  * After each test, {@link MockMessagingService#cleanup()} must be called for free listeners registered
@@ -58,11 +59,11 @@ public class MockMessagingService
      * Creates a matcher that will indicate if the target address of the outgoing message equals the
      * provided address.
      */
-    public static Matcher<InetAddress> to(String address)
+    public static Matcher<InetAddressAndPort> to(String address)
     {
         try
         {
-            return to(InetAddress.getByName(address));
+            return to(InetAddressAndPort.getByName(address));
         }
         catch (UnknownHostException e)
         {
@@ -74,7 +75,7 @@ public class MockMessagingService
      * Creates a matcher that will indicate if the target address of the outgoing message equals the
      * provided address.
      */
-    public static Matcher<InetAddress> to(InetAddress address)
+    public static Matcher<InetAddressAndPort> to(InetAddressAndPort address)
     {
         return (in, to) -> to == address || to.equals(address);
     }
@@ -117,7 +118,7 @@ public class MockMessagingService
      */
     public static <T> Matcher<?> all(Matcher<?>... matchers)
     {
-        return (MessageOut<T> out, InetAddress to) -> {
+        return (MessageOut<T> out, InetAddressAndPort to) -> {
             for (Matcher matcher : matchers)
             {
                 if (!matcher.matches(out, to))
@@ -132,7 +133,7 @@ public class MockMessagingService
      */
     public static <T> Matcher<?> any(Matcher<?>... matchers)
     {
-        return (MessageOut<T> out, InetAddress to) -> {
+        return (MessageOut<T> out, InetAddressAndPort to) -> {
             for (Matcher matcher : matchers)
             {
                 if (matcher.matches(out, to))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java
index 3f6564e..8d0f91b 100644
--- a/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java
+++ b/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java
@@ -55,7 +55,7 @@ public class MockMessagingServiceTest
     public void testRequestResponse() throws InterruptedException, ExecutionException
     {
         // echo message that we like to mock as incoming reply for outgoing echo message
-        MessageIn<EchoMessage> echoMessageIn = MessageIn.create(FBUtilities.getBroadcastAddress(),
+        MessageIn<EchoMessage> echoMessageIn = MessageIn.create(FBUtilities.getBroadcastAddressAndPort(),
                 EchoMessage.instance,
                 Collections.emptyMap(),
                 MessagingService.Verb.ECHO,
@@ -63,14 +63,14 @@ public class MockMessagingServiceTest
         MockMessagingSpy spy = MockMessagingService
                 .when(
                         all(
-                                to(FBUtilities.getBroadcastAddress()),
+                                to(FBUtilities.getBroadcastAddressAndPort()),
                                 verb(MessagingService.Verb.ECHO)
                         )
                 )
                 .respond(echoMessageIn);
 
         MessageOut<EchoMessage> echoMessageOut = new MessageOut<>(MessagingService.Verb.ECHO, EchoMessage.instance, EchoMessage.serializer);
-        MessagingService.instance().sendRR(echoMessageOut, FBUtilities.getBroadcastAddress(), new IAsyncCallback()
+        MessagingService.instance().sendRR(echoMessageOut, FBUtilities.getBroadcastAddressAndPort(), new IAsyncCallback()
         {
             public void response(MessageIn msg)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java b/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java
index 93fe32e..4d1ae01 100644
--- a/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java
+++ b/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java
@@ -18,7 +18,6 @@
 */
 package org.apache.cassandra.net;
 
-import java.net.InetAddress;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -29,6 +28,7 @@ import com.google.common.util.concurrent.RateLimiter;
 
 import org.junit.Test;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.utils.TestTimeSource;
 import org.apache.cassandra.utils.TimeSource;
 
@@ -94,17 +94,17 @@ public class RateBasedBackPressureTest
         TestTimeSource timeSource = new TestTimeSource();
         RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
 
-        RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress());
+        RateBasedBackPressureState state = strategy.newState(InetAddressAndPort.getLoopbackAddress());
         state.onMessageSent(null);
         assertEquals(0, state.incomingRate.size());
         assertEquals(0, state.outgoingRate.size());
 
-        state = strategy.newState(InetAddress.getLoopbackAddress());
+        state = strategy.newState(InetAddressAndPort.getLoopbackAddress());
         state.onResponseReceived();
         assertEquals(1, state.incomingRate.size());
         assertEquals(1, state.outgoingRate.size());
 
-        state = strategy.newState(InetAddress.getLoopbackAddress());
+        state = strategy.newState(InetAddressAndPort.getLoopbackAddress());
         state.onResponseTimeout();
         assertEquals(0, state.incomingRate.size());
         assertEquals(1, state.outgoingRate.size());
@@ -116,7 +116,7 @@ public class RateBasedBackPressureTest
         long windowSize = 6000;
         TestTimeSource timeSource = new TestTimeSource();
         RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
-        RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress());
+        RateBasedBackPressureState state = strategy.newState(InetAddressAndPort.getLoopbackAddress());
 
         // Get initial rate:
         double initialRate = state.rateLimiter.getRate();
@@ -140,7 +140,7 @@ public class RateBasedBackPressureTest
         long windowSize = 6000;
         TestTimeSource timeSource = new TestTimeSource();
         RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
-        RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress());
+        RateBasedBackPressureState state = strategy.newState(InetAddressAndPort.getLoopbackAddress());
 
         // Get initial time:
         long current = state.getLastIntervalAcquire();
@@ -174,7 +174,7 @@ public class RateBasedBackPressureTest
         long windowSize = 6000;
         TestTimeSource timeSource = new TestTimeSource();
         RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
-        RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress());
+        RateBasedBackPressureState state = strategy.newState(InetAddressAndPort.getLoopbackAddress());
 
         // Update incoming and outgoing rate so that the ratio is 0.5:
         state.incomingRate.update(50);
@@ -194,7 +194,7 @@ public class RateBasedBackPressureTest
         long windowSize = 6000;
         TestTimeSource timeSource = new TestTimeSource();
         RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
-        RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress());
+        RateBasedBackPressureState state = strategy.newState(InetAddressAndPort.getLoopbackAddress());
 
         // Update incoming and outgoing rate so that the ratio is 0.5:
         state.incomingRate.update(50);
@@ -236,9 +236,9 @@ public class RateBasedBackPressureTest
         long windowSize = 6000;
         TestTimeSource timeSource = new TestTimeSource();
         TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
-        RateBasedBackPressureState state1 = strategy.newState(InetAddress.getByName("127.0.0.1"));
-        RateBasedBackPressureState state2 = strategy.newState(InetAddress.getByName("127.0.0.2"));
-        RateBasedBackPressureState state3 = strategy.newState(InetAddress.getByName("127.0.0.3"));
+        RateBasedBackPressureState state1 = strategy.newState(InetAddressAndPort.getByName("127.0.0.1"));
+        RateBasedBackPressureState state2 = strategy.newState(InetAddressAndPort.getByName("127.0.0.2"));
+        RateBasedBackPressureState state3 = strategy.newState(InetAddressAndPort.getByName("127.0.0.3"));
 
         // Update incoming and outgoing rates:
         state1.incomingRate.update(50);
@@ -265,9 +265,9 @@ public class RateBasedBackPressureTest
         long windowSize = 6000;
         TestTimeSource timeSource = new TestTimeSource();
         TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "SLOW"), timeSource, windowSize);
-        RateBasedBackPressureState state1 = strategy.newState(InetAddress.getByName("127.0.0.1"));
-        RateBasedBackPressureState state2 = strategy.newState(InetAddress.getByName("127.0.0.2"));
-        RateBasedBackPressureState state3 = strategy.newState(InetAddress.getByName("127.0.0.3"));
+        RateBasedBackPressureState state1 = strategy.newState(InetAddressAndPort.getByName("127.0.0.1"));
+        RateBasedBackPressureState state2 = strategy.newState(InetAddressAndPort.getByName("127.0.0.2"));
+        RateBasedBackPressureState state3 = strategy.newState(InetAddressAndPort.getByName("127.0.0.3"));
 
         // Update incoming and outgoing rates:
         state1.incomingRate.update(50);
@@ -294,10 +294,10 @@ public class RateBasedBackPressureTest
         long windowSize = 6000;
         TestTimeSource timeSource = new TestTimeSource();
         TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "SLOW"), timeSource, windowSize);
-        RateBasedBackPressureState state1 = strategy.newState(InetAddress.getByName("127.0.0.1"));
-        RateBasedBackPressureState state2 = strategy.newState(InetAddress.getByName("127.0.0.2"));
-        RateBasedBackPressureState state3 = strategy.newState(InetAddress.getByName("127.0.0.3"));
-        RateBasedBackPressureState state4 = strategy.newState(InetAddress.getByName("127.0.0.4"));
+        RateBasedBackPressureState state1 = strategy.newState(InetAddressAndPort.getByName("127.0.0.1"));
+        RateBasedBackPressureState state2 = strategy.newState(InetAddressAndPort.getByName("127.0.0.2"));
+        RateBasedBackPressureState state3 = strategy.newState(InetAddressAndPort.getByName("127.0.0.3"));
+        RateBasedBackPressureState state4 = strategy.newState(InetAddressAndPort.getByName("127.0.0.4"));
 
         // Update incoming and outgoing rates:
         state1.incomingRate.update(50); // this
@@ -333,9 +333,9 @@ public class RateBasedBackPressureTest
         long windowSize = 10000;
         TestTimeSource timeSource = new TestTimeSource();
         TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "SLOW"), timeSource, windowSize);
-        RateBasedBackPressureState state1 = strategy.newState(InetAddress.getByName("127.0.0.1"));
-        RateBasedBackPressureState state2 = strategy.newState(InetAddress.getByName("127.0.0.2"));
-        RateBasedBackPressureState state3 = strategy.newState(InetAddress.getByName("127.0.0.3"));
+        RateBasedBackPressureState state1 = strategy.newState(InetAddressAndPort.getByName("127.0.0.1"));
+        RateBasedBackPressureState state2 = strategy.newState(InetAddressAndPort.getByName("127.0.0.2"));
+        RateBasedBackPressureState state3 = strategy.newState(InetAddressAndPort.getByName("127.0.0.3"));
 
         // Update incoming and outgoing rates:
         state1.incomingRate.update(5); // slow

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java b/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
index 2d12baf..c8469a8 100644
--- a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
+++ b/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
@@ -18,7 +18,6 @@
 */
 package org.apache.cassandra.net;
 
-import java.net.InetAddress;
 import java.util.UUID;
 
 import org.junit.BeforeClass;
@@ -32,6 +31,7 @@ import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.RegularAndStaticColumns;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessagingService.Verb;
 import org.apache.cassandra.schema.MockSchema;
 import org.apache.cassandra.schema.TableMetadata;
@@ -65,7 +65,7 @@ public class WriteCallbackInfoTest
                          ? new Commit(UUID.randomUUID(), new PartitionUpdate.Builder(metadata, ByteBufferUtil.EMPTY_BYTE_BUFFER, RegularAndStaticColumns.NONE, 1).build())
                          : new Mutation(PartitionUpdate.simpleBuilder(metadata, "").build());
 
-        WriteCallbackInfo wcbi = new WriteCallbackInfo(InetAddress.getByName("192.168.1.1"), null, new MessageOut(verb, payload, null), null, cl, allowHints);
+        WriteCallbackInfo wcbi = new WriteCallbackInfo(InetAddressAndPort.getByName("192.168.1.1"), null, new MessageOut(verb, payload, null), null, cl, allowHints);
         Assert.assertEquals(expectHint, wcbi.shouldHint());
         if (expectHint)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/async/ChannelWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/ChannelWriterTest.java b/test/unit/org/apache/cassandra/net/async/ChannelWriterTest.java
index 128fe4b..0211512 100644
--- a/test/unit/org/apache/cassandra/net/async/ChannelWriterTest.java
+++ b/test/unit/org/apache/cassandra/net/async/ChannelWriterTest.java
@@ -24,6 +24,7 @@ import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import com.google.common.net.InetAddresses;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -37,6 +38,7 @@ import io.netty.channel.ChannelPromise;
 import io.netty.channel.WriteBufferWaterMark;
 import io.netty.channel.embedded.EmbeddedChannel;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.async.ChannelWriter.CoalescingChannelWriter;
@@ -68,8 +70,8 @@ public class ChannelWriterTest
     @Before
     public void setup()
     {
-        OutboundConnectionIdentifier id = OutboundConnectionIdentifier.small(new InetSocketAddress("127.0.0.1", 0),
-                                                                             new InetSocketAddress("127.0.0.2", 0));
+        OutboundConnectionIdentifier id = OutboundConnectionIdentifier.small(InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 0),
+                                                                             InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 0));
         channel = new EmbeddedChannel();
         omc = new NonSendingOutboundMessagingConnection(id, null, Optional.empty());
         channelWriter = ChannelWriter.create(channel, omc::handleMessageResult, Optional.empty());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/async/HandshakeHandlersTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/HandshakeHandlersTest.java b/test/unit/org/apache/cassandra/net/async/HandshakeHandlersTest.java
index 100e1e0..f92ce5a 100644
--- a/test/unit/org/apache/cassandra/net/async/HandshakeHandlersTest.java
+++ b/test/unit/org/apache/cassandra/net/async/HandshakeHandlersTest.java
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.Optional;
 
+import com.google.common.net.InetAddresses;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -38,6 +39,7 @@ import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.KeyspaceParams;
@@ -50,8 +52,8 @@ public class HandshakeHandlersTest
     private static final String KEYSPACE1 = "NettyPipilineTest";
     private static final String STANDARD1 = "Standard1";
 
-    private static final InetSocketAddress LOCAL_ADDR = new InetSocketAddress("127.0.0.1", 9999);
-    private static final InetSocketAddress REMOTE_ADDR = new InetSocketAddress("127.0.0.2", 9999);
+    private static final InetAddressAndPort LOCAL_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 9999);
+    private static final InetAddressAndPort REMOTE_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 9999);
     private static final int MESSAGING_VERSION = MessagingService.current_version;
     private static final OutboundConnectionIdentifier connectionId = OutboundConnectionIdentifier.small(LOCAL_ADDR, REMOTE_ADDR);
 
@@ -179,7 +181,7 @@ public class HandshakeHandlersTest
 
         InboundHandshakeHandler handler = new InboundHandshakeHandler(new TestAuthenticator(true));
         EmbeddedChannel inboundChannel = new EmbeddedChannel(handler);
-        handler.setupMessagingPipeline(inboundChannel.pipeline(), REMOTE_ADDR.getAddress(), compress, MESSAGING_VERSION);
+        handler.setupMessagingPipeline(inboundChannel.pipeline(), REMOTE_ADDR, compress, MESSAGING_VERSION);
 
         return new TestChannels(outboundChannel, inboundChannel);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/async/HandshakeProtocolTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/HandshakeProtocolTest.java b/test/unit/org/apache/cassandra/net/async/HandshakeProtocolTest.java
index a3d646d..af48636 100644
--- a/test/unit/org/apache/cassandra/net/async/HandshakeProtocolTest.java
+++ b/test/unit/org/apache/cassandra/net/async/HandshakeProtocolTest.java
@@ -85,7 +85,7 @@ public class HandshakeProtocolTest
     @Test
     public void thirdMessageTest() throws Exception
     {
-        ThirdHandshakeMessage before = new ThirdHandshakeMessage(MessagingService.current_version, FBUtilities.getBroadcastAddress());
+        ThirdHandshakeMessage before = new ThirdHandshakeMessage(MessagingService.current_version, FBUtilities.getBroadcastAddressAndPort());
         buf = before.encode(PooledByteBufAllocator.DEFAULT);
         ThirdHandshakeMessage after = ThirdHandshakeMessage.maybeDecode(buf);
         assertEquals(before, after);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org