You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/01/25 20:12:03 UTC

[06/19] cassandra git commit: Allow storage port to be configurable per node

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
index 84a4c32..52fc6ac 100644
--- a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
+++ b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
@@ -19,10 +19,14 @@ package org.apache.cassandra.utils;
 
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.util.*;
+import java.util.stream.Collectors;
 
 import com.datastax.driver.core.*;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.*;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.ColumnMetadata.ClusteringOrder;
@@ -34,33 +38,46 @@ import org.apache.cassandra.dht.Token.TokenFactory;
 import org.apache.cassandra.io.sstable.SSTableLoader;
 import org.apache.cassandra.schema.TableMetadata;
 
+import org.apache.cassandra.schema.CQLTypeParser;
+import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.schema.Types;
+
 public class NativeSSTableLoaderClient extends SSTableLoader.Client
 {
     protected final Map<String, TableMetadataRef> tables;
-    private final Collection<InetAddress> hosts;
+    private final Collection<InetSocketAddress> hosts;
     private final int port;
+    private final int storagePort;
     private final AuthProvider authProvider;
     private final SSLOptions sslOptions;
+    private final boolean allowServerPortDiscovery;
 
 
-    public NativeSSTableLoaderClient(Collection<InetAddress> hosts, int port, String username, String password, SSLOptions sslOptions)
+    public NativeSSTableLoaderClient(Collection<InetSocketAddress> hosts, int nativePort, int storagePort, String username, String password, SSLOptions sslOptions, boolean allowServerPortDiscovery)
     {
-        this(hosts, port, new PlainTextAuthProvider(username, password), sslOptions);
+        this(hosts, nativePort, storagePort, new PlainTextAuthProvider(username, password), sslOptions, allowServerPortDiscovery);
     }
 
-    public NativeSSTableLoaderClient(Collection<InetAddress> hosts, int port, AuthProvider authProvider, SSLOptions sslOptions)
+    public NativeSSTableLoaderClient(Collection<InetSocketAddress> hosts, int nativePort, int storagePort, AuthProvider authProvider, SSLOptions sslOptions, boolean allowServerPortDiscovery)
     {
         super();
         this.tables = new HashMap<>();
         this.hosts = hosts;
-        this.port = port;
+        this.port = nativePort;
         this.authProvider = authProvider;
         this.sslOptions = sslOptions;
+        this.allowServerPortDiscovery = allowServerPortDiscovery;
+        this.storagePort = storagePort;
     }
 
     public void init(String keyspace)
     {
-        Cluster.Builder builder = Cluster.builder().addContactPoints(hosts).withPort(port);
+        Set<InetAddress> hostAddresses = hosts.stream().map(host -> host.getAddress()).collect(Collectors.toSet());
+        Cluster.Builder builder = Cluster.builder().addContactPoints(hostAddresses).withPort(port).allowBetaProtocolVersion();
+
+        if (allowServerPortDiscovery)
+            builder = builder.allowServerPortDiscovery();
+
         if (sslOptions != null)
             builder.withSSL(sslOptions);
         if (authProvider != null)
@@ -82,7 +99,18 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client
                 Range<Token> range = new Range<>(tokenFactory.fromString(tokenRange.getStart().getValue().toString()),
                                                  tokenFactory.fromString(tokenRange.getEnd().getValue().toString()));
                 for (Host endpoint : endpoints)
-                    addRangeForEndpoint(range, endpoint.getAddress());
+                {
+                    int portToUse;
+                    if (allowServerPortDiscovery)
+                    {
+                        portToUse = endpoint.getBroadcastAddressOptPort().portOrElse(storagePort);
+                    }
+                    else
+                    {
+                        portToUse = storagePort;
+                    }
+                    addRangeForEndpoint(range, InetAddressAndPort.getByNameOverrideDefaults(endpoint.getAddress().getHostAddress(), portToUse));
+                }
             }
 
             Types types = fetchTypes(keyspace, session);
@@ -91,6 +119,10 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client
             // We only need the TableMetadata for the views, so we only load that.
             tables.putAll(fetchViews(keyspace, session, partitioner, types));
         }
+        catch (UnknownHostException e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 
     public TableMetadataRef getTableMetadata(String tableName)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/utils/UUIDGen.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/UUIDGen.java b/src/java/org/apache/cassandra/utils/UUIDGen.java
index 94b5832..19a0f83 100644
--- a/src/java/org/apache/cassandra/utils/UUIDGen.java
+++ b/src/java/org/apache/cassandra/utils/UUIDGen.java
@@ -356,7 +356,7 @@ public class UUIDGen
         * The spec says that one option is to take as many source that identify
         * this node as possible and hash them together. That's what we do here by
         * gathering all the ip of this host.
-        * Note that FBUtilities.getBroadcastAddress() should be enough to uniquely
+        * Note that FBUtilities.getJustBroadcastAddress() should be enough to uniquely
         * identify the node *in the cluster* but it triggers DatabaseDescriptor
         * instanciation and the UUID generator is used in Stress for instance,
         * where we don't want to require the yaml.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index ead2a88..640f9b3 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -17,6 +17,7 @@ hints_directory: build/test/cassandra/hints
 partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner
 listen_address: 127.0.0.1
 storage_port: 7010
+ssl_storage_port: 7011
 start_native_transport: true
 native_transport_port: 9042
 column_index_size_in_kb: 4
@@ -27,7 +28,7 @@ disk_access_mode: mmap
 seed_provider:
     - class_name: org.apache.cassandra.locator.SimpleSeedProvider
       parameters:
-          - seeds: "127.0.0.1"
+          - seeds: "127.0.0.1:7010"
 endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch
 dynamic_snitch: true
 server_encryption_options:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/data/serialization/4.0/gms.EndpointState.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/4.0/gms.EndpointState.bin b/test/data/serialization/4.0/gms.EndpointState.bin
index fb7d168..17fc088 100644
Binary files a/test/data/serialization/4.0/gms.EndpointState.bin and b/test/data/serialization/4.0/gms.EndpointState.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/data/serialization/4.0/gms.Gossip.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/4.0/gms.Gossip.bin b/test/data/serialization/4.0/gms.Gossip.bin
index af5ac57..2fbd5d4 100644
Binary files a/test/data/serialization/4.0/gms.Gossip.bin and b/test/data/serialization/4.0/gms.Gossip.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/data/serialization/4.0/service.SyncComplete.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/4.0/service.SyncComplete.bin b/test/data/serialization/4.0/service.SyncComplete.bin
index ba84349..15cccb8 100644
Binary files a/test/data/serialization/4.0/service.SyncComplete.bin and b/test/data/serialization/4.0/service.SyncComplete.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/data/serialization/4.0/service.SyncRequest.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/4.0/service.SyncRequest.bin b/test/data/serialization/4.0/service.SyncRequest.bin
index 6d688a4..f4eb532 100644
Binary files a/test/data/serialization/4.0/service.SyncRequest.bin and b/test/data/serialization/4.0/service.SyncRequest.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/data/serialization/4.0/service.ValidationComplete.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/4.0/service.ValidationComplete.bin b/test/data/serialization/4.0/service.ValidationComplete.bin
index 7433d64..edc90b3 100644
Binary files a/test/data/serialization/4.0/service.ValidationComplete.bin and b/test/data/serialization/4.0/service.ValidationComplete.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/data/serialization/4.0/service.ValidationRequest.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/4.0/service.ValidationRequest.bin b/test/data/serialization/4.0/service.ValidationRequest.bin
index a00763b..e45eb70 100644
Binary files a/test/data/serialization/4.0/service.ValidationRequest.bin and b/test/data/serialization/4.0/service.ValidationRequest.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/long/org/apache/cassandra/locator/DynamicEndpointSnitchLongTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/locator/DynamicEndpointSnitchLongTest.java b/test/long/org/apache/cassandra/locator/DynamicEndpointSnitchLongTest.java
index 35bf5b4..a5025a3 100644
--- a/test/long/org/apache/cassandra/locator/DynamicEndpointSnitchLongTest.java
+++ b/test/long/org/apache/cassandra/locator/DynamicEndpointSnitchLongTest.java
@@ -20,7 +20,6 @@
 package org.apache.cassandra.locator;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.*;
 
 import org.junit.Test;
@@ -53,19 +52,19 @@ public class DynamicEndpointSnitchLongTest
             StorageService.instance.unsafeInitialize();
             SimpleSnitch ss = new SimpleSnitch();
             DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(ss, String.valueOf(ss.hashCode()));
-            InetAddress self = FBUtilities.getBroadcastAddress();
+            InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
 
-            List<InetAddress> hosts = new ArrayList<>();
+            List<InetAddressAndPort> hosts = new ArrayList<>();
             // We want a big list of hosts so  sorting takes time, making it much more likely to reproduce the
             // problem we're looking for.
             for (int i = 0; i < 100; i++)
                 for (int j = 0; j < 256; j++)
-                    hosts.add(InetAddress.getByAddress(new byte[]{127, 0, (byte)i, (byte)j}));
+                    hosts.add(InetAddressAndPort.getByAddress(new byte[]{ 127, 0, (byte)i, (byte)j}));
 
             ScoreUpdater updater = new ScoreUpdater(dsnitch, hosts);
             updater.start();
 
-            List<InetAddress> result = null;
+            List<InetAddressAndPort> result = null;
             for (int i = 0; i < ITERATIONS; i++)
                 result = dsnitch.getSortedListByProximity(self, hosts);
 
@@ -85,10 +84,10 @@ public class DynamicEndpointSnitchLongTest
         public volatile boolean stopped;
 
         private final DynamicEndpointSnitch dsnitch;
-        private final List<InetAddress> hosts;
+        private final List<InetAddressAndPort> hosts;
         private final Random random = new Random();
 
-        public ScoreUpdater(DynamicEndpointSnitch dsnitch, List<InetAddress> hosts)
+        public ScoreUpdater(DynamicEndpointSnitch dsnitch, List<InetAddressAndPort> hosts)
         {
             this.dsnitch = dsnitch;
             this.hosts = hosts;
@@ -98,7 +97,7 @@ public class DynamicEndpointSnitchLongTest
         {
             while (!stopped)
             {
-                InetAddress host = hosts.get(random.nextInt(hosts.size()));
+                InetAddressAndPort host = hosts.get(random.nextInt(hosts.size()));
                 int score = random.nextInt(SCORE_RANGE);
                 dsnitch.receiveTiming(host, score);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
index 799ac77..bd7ef20 100644
--- a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
+++ b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
@@ -122,7 +122,7 @@ public class LongStreamingTest
             public void init(String keyspace)
             {
                 for (Range<Token> range : StorageService.instance.getLocalRanges(KS))
-                    addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
+                    addRangeForEndpoint(range, FBUtilities.getBroadcastAddressAndPort());
 
                 this.ks = keyspace;
             }
@@ -149,7 +149,7 @@ public class LongStreamingTest
             public void init(String keyspace)
             {
                 for (Range<Token> range : StorageService.instance.getLocalRanges(KS))
-                    addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
+                    addRangeForEndpoint(range, FBUtilities.getBroadcastAddressAndPort());
 
                 this.ks = keyspace;
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java
----------------------------------------------------------------------
diff --git a/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java b/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java
index 9ec1aa6..68cfd7e 100644
--- a/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java
+++ b/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java
@@ -25,11 +25,11 @@ import com.google.common.collect.Multimap;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.PendingRangeMaps;
 import org.openjdk.jmh.annotations.*;
 import org.openjdk.jmh.infra.Blackhole;
 
-import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Collection;
 import java.util.HashSet;
@@ -50,7 +50,7 @@ public class PendingRangesBench
     PendingRangeMaps pendingRangeMaps;
     int maxToken = 256 * 100;
 
-    Multimap<Range<Token>, InetAddress> oldPendingRanges;
+    Multimap<Range<Token>, InetAddressAndPort> oldPendingRanges;
 
     private Range<Token> genRange(String left, String right)
     {
@@ -63,7 +63,7 @@ public class PendingRangesBench
         pendingRangeMaps = new PendingRangeMaps();
         oldPendingRanges = HashMultimap.create();
 
-        InetAddress[] addresses = {InetAddress.getByName("127.0.0.1"), InetAddress.getByName("127.0.0.2")};
+        InetAddressAndPort[] addresses = { InetAddressAndPort.getByName("127.0.0.1"), InetAddressAndPort.getByName("127.0.0.2")};
 
         for (int i = 0; i < maxToken; i++)
         {
@@ -97,8 +97,8 @@ public class PendingRangesBench
     {
         int randomToken = ThreadLocalRandom.current().nextInt(maxToken * 10 + 5);
         Token searchToken = new RandomPartitioner.BigIntegerToken(Integer.toString(randomToken));
-        Set<InetAddress> endpoints = new HashSet<>();
-        for (Map.Entry<Range<Token>, Collection<InetAddress>> entry : oldPendingRanges.asMap().entrySet())
+        Set<InetAddressAndPort> endpoints = new HashSet<>();
+        for (Map.Entry<Range<Token>, Collection<InetAddressAndPort>> entry : oldPendingRanges.asMap().entrySet())
         {
             if (entry.getKey().contains(searchToken))
                 endpoints.addAll(entry.getValue());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java b/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java
index 0e4a3cf..d06caba 100644
--- a/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java
+++ b/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java
@@ -20,8 +20,15 @@ package org.apache.cassandra;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.YamlConfigurationLoader;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.locator.InetAddressAndPort;
 
 import java.io.File;
+import java.net.Inet6Address;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+import com.google.common.base.Joiner;
 
 
 public class OffsetAwareConfigurationLoader extends YamlConfigurationLoader
@@ -52,6 +59,31 @@ public class OffsetAwareConfigurationLoader extends YamlConfigurationLoader
         config.native_transport_port += offset;
         config.storage_port += offset;
 
+        //Rewrite the seed ports string
+        String[] hosts = config.seed_provider.parameters.get("seeds").split(",", -1);
+        String rewrittenSeeds = Joiner.on(", ").join(Arrays.stream(hosts).map(host -> {
+            StringBuilder sb = new StringBuilder();
+            try
+            {
+                InetAddressAndPort address = InetAddressAndPort.getByName(host.trim());
+                if (address.address instanceof Inet6Address)
+                {
+                     sb.append('[').append(address.address.getHostAddress()).append(']');
+                }
+                else
+                {
+                    sb.append(address.address.getHostAddress());
+                }
+                sb.append(':').append(address.port + offset);
+                return sb.toString();
+            }
+            catch (UnknownHostException e)
+            {
+                throw new ConfigurationException("Error in OffsetAwareConfigurationLoader reworking seed list", e);
+            }
+        }).collect(Collectors.toList()));
+        config.seed_provider.parameters.put("seeds", rewrittenSeeds);
+
         config.commitlog_directory += sep + offset;
         config.saved_caches_directory += sep + offset;
         config.hints_directory += sep + offset;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 7e62c41..1201efa 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -22,7 +22,6 @@ package org.apache.cassandra;
 import java.io.Closeable;
 import java.io.EOFException;
 import java.io.IOError;
-import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.*;
@@ -39,6 +38,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
@@ -192,7 +192,7 @@ public class Util
      * Creates initial set of nodes and tokens. Nodes are added to StorageService as 'normal'
      */
     public static void createInitialRing(StorageService ss, IPartitioner partitioner, List<Token> endpointTokens,
-                                   List<Token> keyTokens, List<InetAddress> hosts, List<UUID> hostIds, int howMany)
+                                         List<Token> keyTokens, List<InetAddressAndPort> hosts, List<UUID> hostIds, int howMany)
         throws UnknownHostException
     {
         // Expand pool of host IDs as necessary
@@ -210,10 +210,13 @@ public class Util
 
         for (int i=0; i<endpointTokens.size(); i++)
         {
-            InetAddress ep = InetAddress.getByName("127.0.0." + String.valueOf(i + 1));
+            InetAddressAndPort ep = InetAddressAndPort.getByName("127.0.0." + String.valueOf(i + 1));
             Gossiper.instance.initializeNodeUnsafe(ep, hostIds.get(i), 1);
             Gossiper.instance.injectApplicationState(ep, ApplicationState.TOKENS, new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(endpointTokens.get(i))));
             ss.onChange(ep,
+                        ApplicationState.STATUS_WITH_PORT,
+                        new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(endpointTokens.get(i))));
+            ss.onChange(ep,
                         ApplicationState.STATUS,
                         new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(endpointTokens.get(i))));
             hosts.add(ep);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java b/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java
index 7db1cfa..41564d9 100644
--- a/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java
+++ b/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java
@@ -17,11 +17,9 @@
  */
 package org.apache.cassandra.batchlog;
 
-import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 
 import com.google.common.collect.ImmutableMultimap;
@@ -29,6 +27,8 @@ import com.google.common.collect.Multimap;
 import org.junit.Test;
 import org.junit.matchers.JUnitMatchers;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
+
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 
@@ -39,89 +39,89 @@ public class BatchlogEndpointFilterTest
     @Test
     public void shouldSelect2hostsFromNonLocalRacks() throws UnknownHostException
     {
-        Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
-                .put(LOCAL, InetAddress.getByName("0"))
-                .put(LOCAL, InetAddress.getByName("00"))
-                .put("1", InetAddress.getByName("1"))
-                .put("1", InetAddress.getByName("11"))
-                .put("2", InetAddress.getByName("2"))
-                .put("2", InetAddress.getByName("22"))
+        Multimap<String, InetAddressAndPort> endpoints = ImmutableMultimap.<String, InetAddressAndPort> builder()
+                .put(LOCAL, InetAddressAndPort.getByName("0"))
+                .put(LOCAL, InetAddressAndPort.getByName("00"))
+                .put("1", InetAddressAndPort.getByName("1"))
+                .put("1", InetAddressAndPort.getByName("11"))
+                .put("2", InetAddressAndPort.getByName("2"))
+                .put("2", InetAddressAndPort.getByName("22"))
                 .build();
-        Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+        Collection<InetAddressAndPort> result = new TestEndpointFilter(LOCAL, endpoints).filter();
         assertThat(result.size(), is(2));
-        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("11")));
-        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("22")));
+        assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("11")));
+        assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("22")));
     }
 
     @Test
     public void shouldSelectHostFromLocal() throws UnknownHostException
     {
-        Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
-                .put(LOCAL, InetAddress.getByName("0"))
-                .put(LOCAL, InetAddress.getByName("00"))
-                .put("1", InetAddress.getByName("1"))
+        Multimap<String, InetAddressAndPort> endpoints = ImmutableMultimap.<String, InetAddressAndPort> builder()
+                .put(LOCAL, InetAddressAndPort.getByName("0"))
+                .put(LOCAL, InetAddressAndPort.getByName("00"))
+                .put("1", InetAddressAndPort.getByName("1"))
                 .build();
-        Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+        Collection<InetAddressAndPort> result = new TestEndpointFilter(LOCAL, endpoints).filter();
         assertThat(result.size(), is(2));
-        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("1")));
-        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0")));
+        assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("1")));
+        assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("0")));
     }
 
     @Test
     public void shouldReturnAsIsIfNoEnoughEndpoints() throws UnknownHostException
     {
-        Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
-                .put(LOCAL, InetAddress.getByName("0"))
+        Multimap<String, InetAddressAndPort> endpoints = ImmutableMultimap.<String, InetAddressAndPort> builder()
+                .put(LOCAL, InetAddressAndPort.getByName("0"))
                 .build();
-        Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+        Collection<InetAddressAndPort> result = new TestEndpointFilter(LOCAL, endpoints).filter();
         assertThat(result.size(), is(1));
-        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0")));
+        assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("0")));
     }
 
     @Test
     public void shouldSelectTwoRandomHostsFromSingleOtherRack() throws UnknownHostException
     {
-        Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
-                .put(LOCAL, InetAddress.getByName("0"))
-                .put(LOCAL, InetAddress.getByName("00"))
-                .put("1", InetAddress.getByName("1"))
-                .put("1", InetAddress.getByName("11"))
-                .put("1", InetAddress.getByName("111"))
+        Multimap<String, InetAddressAndPort> endpoints = ImmutableMultimap.<String, InetAddressAndPort> builder()
+                .put(LOCAL, InetAddressAndPort.getByName("0"))
+                .put(LOCAL, InetAddressAndPort.getByName("00"))
+                .put("1", InetAddressAndPort.getByName("1"))
+                .put("1", InetAddressAndPort.getByName("11"))
+                .put("1", InetAddressAndPort.getByName("111"))
                 .build();
-        Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+        Collection<InetAddressAndPort> result = new TestEndpointFilter(LOCAL, endpoints).filter();
         // result should be the last two non-local replicas
         // (Collections.shuffle has been replaced with Collections.reverse for testing)
         assertThat(result.size(), is(2));
-        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("11")));
-        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("111")));
+        assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("11")));
+        assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("111")));
     }
 
     @Test
     public void shouldSelectTwoRandomHostsFromSingleRack() throws UnknownHostException
     {
-        Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
-                .put(LOCAL, InetAddress.getByName("1"))
-                .put(LOCAL, InetAddress.getByName("11"))
-                .put(LOCAL, InetAddress.getByName("111"))
-                .put(LOCAL, InetAddress.getByName("1111"))
+        Multimap<String, InetAddressAndPort> endpoints = ImmutableMultimap.<String, InetAddressAndPort> builder()
+                .put(LOCAL, InetAddressAndPort.getByName("1"))
+                .put(LOCAL, InetAddressAndPort.getByName("11"))
+                .put(LOCAL, InetAddressAndPort.getByName("111"))
+                .put(LOCAL, InetAddressAndPort.getByName("1111"))
                 .build();
-        Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+        Collection<InetAddressAndPort> result = new TestEndpointFilter(LOCAL, endpoints).filter();
         // result should be the last two non-local replicas
         // (Collections.shuffle has been replaced with Collections.reverse for testing)
         assertThat(result.size(), is(2));
-        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("111")));
-        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("1111")));
+        assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("111")));
+        assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("1111")));
     }
 
     private static class TestEndpointFilter extends BatchlogManager.EndpointFilter
     {
-        TestEndpointFilter(String localRack, Multimap<String, InetAddress> endpoints)
+        TestEndpointFilter(String localRack, Multimap<String, InetAddressAndPort> endpoints)
         {
             super(localRack, endpoints);
         }
 
         @Override
-        protected boolean isValid(InetAddress input)
+        protected boolean isValid(InetAddressAndPort input)
         {
             // We will use always alive non-localhost endpoints
             return true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
index 34902fe..33fb209 100644
--- a/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
+++ b/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.batchlog;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
 
@@ -29,6 +28,7 @@ import org.junit.*;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.Util.PartitionerSwitcher;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.schema.Schema;
@@ -94,7 +94,7 @@ public class BatchlogManagerTest
     public void setUp() throws Exception
     {
         TokenMetadata metadata = StorageService.instance.getTokenMetadata();
-        InetAddress localhost = InetAddress.getByName("127.0.0.1");
+        InetAddressAndPort localhost = InetAddressAndPort.getByName("127.0.0.1");
         metadata.updateNormalToken(Util.token("A"), localhost);
         metadata.updateHostId(UUIDGen.getTimeUUID(), localhost);
         Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).truncateBlocking();
@@ -344,7 +344,7 @@ public class BatchlogManagerTest
     @Test
     public void testReplayWithNoPeers() throws Exception
     {
-        StorageService.instance.getTokenMetadata().removeEndpoint(InetAddress.getByName("127.0.0.1"));
+        StorageService.instance.getTokenMetadata().removeEndpoint(InetAddressAndPort.getByName("127.0.0.1"));
 
         long initialAllBatches = BatchlogManager.instance.countAllBatches();
         long initialReplayedBatches = BatchlogManager.instance.getTotalBatchesReplayed();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index b50a050..589afd5 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -77,6 +77,7 @@ public class DatabaseDescriptorRefTest
     "org.apache.cassandra.config.EncryptionOptions$ClientEncryptionOptions",
     "org.apache.cassandra.config.EncryptionOptions$ServerEncryptionOptions",
     "org.apache.cassandra.config.EncryptionOptions$ServerEncryptionOptions$InternodeEncryption",
+    "org.apache.cassandra.config.EncryptionOptions$ServerEncryptionOptions$OutgoingEncryptedPortSource",
     "org.apache.cassandra.config.YamlConfigurationLoader",
     "org.apache.cassandra.config.YamlConfigurationLoader$PropertiesChecker",
     "org.apache.cassandra.config.YamlConfigurationLoader$PropertiesChecker$1",
@@ -126,6 +127,7 @@ public class DatabaseDescriptorRefTest
     "org.apache.cassandra.config.EncryptionOptions$ServerEncryptionOptionsCustomizer",
     "org.apache.cassandra.ConsoleAppenderBeanInfo",
     "org.apache.cassandra.ConsoleAppenderCustomizer",
+    "org.apache.cassandra.locator.InetAddressAndPort"
     };
 
     static final Set<String> checkedClasses = new HashSet<>(Arrays.asList(validClasses));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 6993a65..69d2fb5 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -48,6 +48,7 @@ import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.index.SecondaryIndexManager;
 import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.functions.FunctionName;
@@ -144,9 +145,9 @@ public abstract class CQLTester
         // Register an EndpointSnitch which returns fixed values for test.
         DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch()
         {
-            @Override public String getRack(InetAddress endpoint) { return RACK1; }
-            @Override public String getDatacenter(InetAddress endpoint) { return DATA_CENTER; }
-            @Override public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2) { return 0; }
+            @Override public String getRack(InetAddressAndPort endpoint) { return RACK1; }
+            @Override public String getDatacenter(InetAddressAndPort endpoint) { return DATA_CENTER; }
+            @Override public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2) { return 0; }
         });
 
         try
@@ -829,6 +830,11 @@ public abstract class CQLTester
         return sessionNet(protocolVersion).execute(formatQuery(query), values);
     }
 
+    protected com.datastax.driver.core.ResultSet executeNetWithPaging(ProtocolVersion version, String query, int pageSize) throws Throwable
+    {
+        return sessionNet(version).execute(new SimpleStatement(formatQuery(query)).setFetchSize(pageSize));
+    }
+
     protected com.datastax.driver.core.ResultSet executeNetWithPaging(String query, int pageSize) throws Throwable
     {
         return sessionNet().execute(new SimpleStatement(formatQuery(query)).setFetchSize(pageSize));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
index 0a314da..ce5de62 100644
--- a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
@@ -223,7 +223,6 @@ public class PreparedStatementsTest extends CQLTester
                                  .withClusterName("Test Cluster")
                                  .withPort(nativePort)
                                  .withoutJMXReporting()
-                                 .allowBetaProtocolVersion()
                                  .build())
         {
             try (Session newSession = newCluster.connect())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/cql3/ViewComplexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/ViewComplexTest.java b/test/unit/org/apache/cassandra/cql3/ViewComplexTest.java
index 26b9d65..665bc44 100644
--- a/test/unit/org/apache/cassandra/cql3/ViewComplexTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ViewComplexTest.java
@@ -851,10 +851,10 @@ public class ViewComplexTest extends CQLTester
         for (String view : Arrays.asList("mv1", "mv2"))
         {
             // paging
-            assertEquals(1, executeNetWithPaging(String.format("SELECT k,a,b FROM %s limit 1", view), 1).all().size());
-            assertEquals(2, executeNetWithPaging(String.format("SELECT k,a,b FROM %s limit 2", view), 1).all().size());
-            assertEquals(2, executeNetWithPaging(String.format("SELECT k,a,b FROM %s", view), 1).all().size());
-            assertRowsNet(executeNetWithPaging(String.format("SELECT k,a,b FROM %s ", view), 1),
+            assertEquals(1, executeNetWithPaging(protocolVersion, String.format("SELECT k,a,b FROM %s limit 1", view), 1).all().size());
+            assertEquals(2, executeNetWithPaging(protocolVersion, String.format("SELECT k,a,b FROM %s limit 2", view), 1).all().size());
+            assertEquals(2, executeNetWithPaging(protocolVersion, String.format("SELECT k,a,b FROM %s", view), 1).all().size());
+            assertRowsNet(protocolVersion, executeNetWithPaging(protocolVersion, String.format("SELECT k,a,b FROM %s ", view), 1),
                           row(50, 50, 50),
                           row(100, 100, 100));
             // limit

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
index ac261ca..dc90b4e 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.cql3.validation.operations;
 
-import java.net.InetAddress;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
@@ -26,6 +25,7 @@ import java.util.UUID;
 
 import org.junit.Test;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.schema.Schema;
@@ -514,13 +514,13 @@ public class CreateTest extends CQLTester
         DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch()
         {
             @Override
-            public String getRack(InetAddress endpoint) { return RACK1; }
+            public String getRack(InetAddressAndPort endpoint) { return RACK1; }
 
             @Override
-            public String getDatacenter(InetAddress endpoint) { return "us-east-1"; }
+            public String getDatacenter(InetAddressAndPort endpoint) { return "us-east-1"; }
 
             @Override
-            public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2) { return 0; }
+            public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2) { return 0; }
         });
 
         execute("CREATE KEYSPACE Foo WITH replication = { 'class' : 'NetworkTopologyStrategy', 'us-east-1' : 1 };");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/CleanupTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CleanupTest.java b/test/unit/org/apache/cassandra/db/CleanupTest.java
index 044a49e..a096c78 100644
--- a/test/unit/org/apache/cassandra/db/CleanupTest.java
+++ b/test/unit/org/apache/cassandra/db/CleanupTest.java
@@ -19,7 +19,6 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.AbstractMap;
@@ -37,6 +36,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.KeyspaceMetadata;
@@ -89,13 +89,13 @@ public class CleanupTest
         DatabaseDescriptor.setEndpointSnitch(new AbstractNetworkTopologySnitch()
         {
             @Override
-            public String getRack(InetAddress endpoint)
+            public String getRack(InetAddressAndPort endpoint)
             {
                 return "RC1";
             }
 
             @Override
-            public String getDatacenter(InetAddress endpoint)
+            public String getDatacenter(InetAddressAndPort endpoint)
             {
                 return "DC1";
             }
@@ -166,8 +166,8 @@ public class CleanupTest
         byte[] tk1 = new byte[1], tk2 = new byte[1];
         tk1[0] = 2;
         tk2[0] = 1;
-        tmd.updateNormalToken(new BytesToken(tk1), InetAddress.getByName("127.0.0.1"));
-        tmd.updateNormalToken(new BytesToken(tk2), InetAddress.getByName("127.0.0.2"));
+        tmd.updateNormalToken(new BytesToken(tk1), InetAddressAndPort.getByName("127.0.0.1"));
+        tmd.updateNormalToken(new BytesToken(tk2), InetAddressAndPort.getByName("127.0.0.2"));
 
         CompactionManager.instance.performCleanup(cfs, 2);
 
@@ -198,8 +198,8 @@ public class CleanupTest
         byte[] tk1 = new byte[1], tk2 = new byte[1];
         tk1[0] = 2;
         tk2[0] = 1;
-        tmd.updateNormalToken(new BytesToken(tk1), InetAddress.getByName("127.0.0.1"));
-        tmd.updateNormalToken(new BytesToken(tk2), InetAddress.getByName("127.0.0.2"));
+        tmd.updateNormalToken(new BytesToken(tk1), InetAddressAndPort.getByName("127.0.0.1"));
+        tmd.updateNormalToken(new BytesToken(tk2), InetAddressAndPort.getByName("127.0.0.2"));
         CompactionManager.instance.performCleanup(cfs, 2);
 
         assertEquals(0, Util.getAll(Util.cmd(cfs).build()).size());
@@ -222,9 +222,9 @@ public class CleanupTest
 
         TokenMetadata tmd = StorageService.instance.getTokenMetadata();
         tmd.clearUnsafe();
-        tmd.updateHostId(UUID.randomUUID(), InetAddress.getByName("127.0.0.1"));
+        tmd.updateHostId(UUID.randomUUID(), InetAddressAndPort.getByName("127.0.0.1"));
         byte[] tk1 = {2};
-        tmd.updateNormalToken(new BytesToken(tk1), InetAddress.getByName("127.0.0.1"));
+        tmd.updateNormalToken(new BytesToken(tk1), InetAddressAndPort.getByName("127.0.0.1"));
 
 
         Keyspace keyspace = Keyspace.open(KEYSPACE2);
@@ -270,8 +270,8 @@ public class CleanupTest
         byte[] tk1 = new byte[1], tk2 = new byte[1];
         tk1[0] = 2;
         tk2[0] = 1;
-        tmd.updateNormalToken(new BytesToken(tk1), InetAddress.getByName("127.0.0.1"));
-        tmd.updateNormalToken(new BytesToken(tk2), InetAddress.getByName("127.0.0.2"));
+        tmd.updateNormalToken(new BytesToken(tk1), InetAddressAndPort.getByName("127.0.0.1"));
+        tmd.updateNormalToken(new BytesToken(tk2), InetAddressAndPort.getByName("127.0.0.2"));
 
         for(SSTableReader r: cfs.getLiveSSTables())
             CompactionManager.instance.forceUserDefinedCleanup(r.getFilename());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java b/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
index c56368f..6e2a714 100644
--- a/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
@@ -19,7 +19,6 @@
 package org.apache.cassandra.db;
 
 import java.io.File;
-import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.List;
 
@@ -30,6 +29,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.dht.BootStrapper;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
@@ -51,7 +51,7 @@ public class DiskBoundaryManagerTest extends CQLTester
     {
         BlacklistedDirectories.clearUnwritableUnsafe();
         TokenMetadata metadata = StorageService.instance.getTokenMetadata();
-        metadata.updateNormalTokens(BootStrapper.getRandomTokens(metadata, 10), FBUtilities.getBroadcastAddress());
+        metadata.updateNormalTokens(BootStrapper.getRandomTokens(metadata, 10), FBUtilities.getBroadcastAddressAndPort());
         createTable("create table %s (id int primary key, x text)");
         dirs = new Directories(getCurrentColumnFamilyStore().metadata(), Lists.newArrayList(new Directories.DataDirectory(new File("/tmp/1")),
                                                                                           new Directories.DataDirectory(new File("/tmp/2")),
@@ -86,7 +86,7 @@ public class DiskBoundaryManagerTest extends CQLTester
     public void updateTokensTest() throws UnknownHostException
     {
         DiskBoundaries dbv1 = dbm.getDiskBoundaries(mock);
-        StorageService.instance.getTokenMetadata().updateNormalTokens(BootStrapper.getRandomTokens(StorageService.instance.getTokenMetadata(), 10), InetAddress.getByName("127.0.0.10"));
+        StorageService.instance.getTokenMetadata().updateNormalTokens(BootStrapper.getRandomTokens(StorageService.instance.getTokenMetadata(), 10), InetAddressAndPort.getByName("127.0.0.10"));
         DiskBoundaries dbv2 = dbm.getDiskBoundaries(mock);
         assertFalse(dbv1.equals(dbv2));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/ReadCommandTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
index 64ac627..6bb0a1a 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@ -50,9 +50,11 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
+import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -312,7 +314,8 @@ public class ReadCommandTest
         }
     }
 
-    public void serializerTest() throws IOException
+    @Test
+    public void testSerializer() throws IOException
     {
         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
 
@@ -324,10 +327,11 @@ public class ReadCommandTest
 
         ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).includeRow("dd").build();
         int messagingVersion = MessagingService.current_version;
-        long size = ReadCommand.serializer.serializedSize(readCommand, messagingVersion);
-
         FakeOutputStream out = new FakeOutputStream();
-        ReadCommand.serializer.serialize(readCommand, new WrappedDataOutputStreamPlus(out), messagingVersion);
+        Tracing.instance.newSession(Tracing.TraceType.QUERY);
+        MessageOut<ReadCommand> messageOut = new MessageOut(MessagingService.Verb.READ, readCommand, ReadCommand.serializer);
+        long size = messageOut.serializedSize(messagingVersion);
+        messageOut.serialize(new WrappedDataOutputStreamPlus(out), messagingVersion);
         Assert.assertEquals(size, out.count);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/RowCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowCacheTest.java b/test/unit/org/apache/cassandra/db/RowCacheTest.java
index fee3f2c..5ca1eef 100644
--- a/test/unit/org/apache/cassandra/db/RowCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.db;
 
-import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -32,6 +31,7 @@ import org.junit.Test;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.cache.RowCacheKey;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.rows.*;
@@ -300,8 +300,8 @@ public class RowCacheTest
         byte[] tk1, tk2;
         tk1 = "key1000".getBytes();
         tk2 = "key1050".getBytes();
-        tmd.updateNormalToken(new BytesToken(tk1), InetAddress.getByName("127.0.0.1"));
-        tmd.updateNormalToken(new BytesToken(tk2), InetAddress.getByName("127.0.0.2"));
+        tmd.updateNormalToken(new BytesToken(tk1), InetAddressAndPort.getByName("127.0.0.1"));
+        tmd.updateNormalToken(new BytesToken(tk2), InetAddressAndPort.getByName("127.0.0.2"));
         store.cleanupCache();
         assertEquals(50, CacheService.instance.rowCache.size());
         CacheService.instance.setRowCacheCapacityInMB(0);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator40Test.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator40Test.java b/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator40Test.java
new file mode 100644
index 0000000..1c051f5
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator40Test.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.junit.Assert.assertEquals;
+
+public class SystemKeyspaceMigrator40Test extends CQLTester
+{
+    @Test
+    public void testMigratePeers() throws Throwable
+    {
+        String insert = String.format("INSERT INTO %s ("
+                                      + "peer, "
+                                      + "data_center, "
+                                      + "host_id, "
+                                      + "preferred_ip, "
+                                      + "rack, "
+                                      + "release_version, "
+                                      + "rpc_address, "
+                                      + "schema_version, "
+                                      + "tokens) "
+                                      + " values ( ?, ?, ? , ? , ?, ?, ?, ?, ?)",
+                                      SystemKeyspaceMigrator40.legacyPeersName);
+        UUID hostId = UUIDGen.getTimeUUID();
+        UUID schemaVersion = UUIDGen.getTimeUUID();
+        execute(insert,
+                InetAddress.getByName("127.0.0.1"),
+                "dcFoo",
+                hostId,
+                InetAddress.getByName("127.0.0.2"),
+                "rackFoo", "4.0",
+                InetAddress.getByName("127.0.0.3"),
+                schemaVersion,
+                ImmutableSet.of("foobar"));
+        SystemKeyspaceMigrator40.migrate();
+
+        int rowCount = 0;
+        for (UntypedResultSet.Row row : execute(String.format("SELECT * FROM %s", SystemKeyspaceMigrator40.peersName)))
+        {
+            rowCount++;
+            assertEquals(InetAddress.getByName("127.0.0.1"), row.getInetAddress("peer"));
+            assertEquals(DatabaseDescriptor.getStoragePort(), row.getInt("peer_port"));
+            assertEquals("dcFoo", row.getString("data_center"));
+            assertEquals(hostId, row.getUUID("host_id"));
+            assertEquals(InetAddress.getByName("127.0.0.2"), row.getInetAddress("preferred_ip"));
+            assertEquals(DatabaseDescriptor.getStoragePort(), row.getInt("preferred_port"));
+            assertEquals("rackFoo", row.getString("rack"));
+            assertEquals("4.0", row.getString("release_version"));
+            assertEquals(InetAddress.getByName("127.0.0.3"), row.getInetAddress("native_address"));
+            assertEquals(DatabaseDescriptor.getNativeTransportPort(), row.getInt("native_port"));
+            assertEquals(schemaVersion, row.getUUID("schema_version"));
+            assertEquals(ImmutableSet.of("foobar"), row.getSet("tokens", UTF8Type.instance));
+        }
+        assertEquals(1, rowCount);
+
+        //Test nulls/missing don't prevent the row from propagating
+        execute(String.format("TRUNCATE %s", SystemKeyspaceMigrator40.legacyPeersName));
+        execute(String.format("TRUNCATE %s", SystemKeyspaceMigrator40.peersName));
+
+        execute(String.format("INSERT INTO %s (peer) VALUES (?)", SystemKeyspaceMigrator40.legacyPeersName),
+                              InetAddress.getByName("127.0.0.1"));
+        SystemKeyspaceMigrator40.migrate();
+
+        rowCount = 0;
+        for (UntypedResultSet.Row row : execute(String.format("SELECT * FROM %s", SystemKeyspaceMigrator40.peersName)))
+        {
+            rowCount++;
+        }
+        assertEquals(1, rowCount);
+    }
+
+    @Test
+    public void testMigratePeerEvents() throws Throwable
+    {
+        String insert = String.format("INSERT INTO %s ("
+                                      + "peer, "
+                                      + "hints_dropped) "
+                                      + " values ( ?, ? )",
+                                      SystemKeyspaceMigrator40.legacyPeerEventsName);
+        UUID uuid = UUIDGen.getTimeUUID();
+        execute(insert,
+                InetAddress.getByName("127.0.0.1"),
+                ImmutableMap.of(uuid, 42));
+        SystemKeyspaceMigrator40.migrate();
+
+        int rowCount = 0;
+        for (UntypedResultSet.Row row : execute(String.format("SELECT * FROM %s", SystemKeyspaceMigrator40.peerEventsName)))
+        {
+            rowCount++;
+            assertEquals(InetAddress.getByName("127.0.0.1"), row.getInetAddress("peer"));
+            assertEquals(DatabaseDescriptor.getStoragePort(), row.getInt("peer_port"));
+            assertEquals(ImmutableMap.of(uuid, 42), row.getMap("hints_dropped", UUIDType.instance, Int32Type.instance));
+        }
+        assertEquals(1, rowCount);
+
+        //Test nulls/missing don't prevent the row from propagating
+        execute(String.format("TRUNCATE %s", SystemKeyspaceMigrator40.legacyPeerEventsName));
+        execute(String.format("TRUNCATE %s", SystemKeyspaceMigrator40.peerEventsName));
+
+        execute(String.format("INSERT INTO %s (peer) VALUES (?)", SystemKeyspaceMigrator40.legacyPeerEventsName),
+                InetAddress.getByName("127.0.0.1"));
+        SystemKeyspaceMigrator40.migrate();
+
+        rowCount = 0;
+        for (UntypedResultSet.Row row : execute(String.format("SELECT * FROM %s", SystemKeyspaceMigrator40.peerEventsName)))
+        {
+            rowCount++;
+        }
+        assertEquals(1, rowCount);
+    }
+
+    @Test
+    public void testMigrateTransferredRanges() throws Throwable
+    {
+        String insert = String.format("INSERT INTO %s ("
+                                      + "operation, "
+                                      + "peer, "
+                                      + "keyspace_name, "
+                                      + "ranges) "
+                                      + " values ( ?, ?, ?, ? )",
+                                      SystemKeyspaceMigrator40.legacyTransferredRangesName);
+        execute(insert,
+                "foo",
+                InetAddress.getByName("127.0.0.1"),
+                "bar",
+                ImmutableSet.of(ByteBuffer.wrap(new byte[] { 42 })));
+        SystemKeyspaceMigrator40.migrate();
+
+        int rowCount = 0;
+        for (UntypedResultSet.Row row : execute(String.format("SELECT * FROM %s", SystemKeyspaceMigrator40.transferredRangesName)))
+        {
+            rowCount++;
+            assertEquals("foo", row.getString("operation"));
+            assertEquals(InetAddress.getByName("127.0.0.1"), row.getInetAddress("peer"));
+            assertEquals(DatabaseDescriptor.getStoragePort(), row.getInt("peer_port"));
+            assertEquals("bar", row.getString("keyspace_name"));
+            assertEquals(ImmutableSet.of(ByteBuffer.wrap(new byte[] { 42 })), row.getSet("ranges", BytesType.instance));
+        }
+        assertEquals(1, rowCount);
+
+        //Test nulls/missing don't prevent the row from propagating
+        execute(String.format("TRUNCATE %s", SystemKeyspaceMigrator40.legacyTransferredRangesName));
+        execute(String.format("TRUNCATE %s", SystemKeyspaceMigrator40.transferredRangesName));
+
+        execute(String.format("INSERT INTO %s (operation, peer, keyspace_name) VALUES (?, ?, ?)", SystemKeyspaceMigrator40.legacyTransferredRangesName),
+                "foo",
+                InetAddress.getByName("127.0.0.1"),
+                "bar");
+        SystemKeyspaceMigrator40.migrate();
+
+        rowCount = 0;
+        for (UntypedResultSet.Row row : execute(String.format("SELECT * FROM %s", SystemKeyspaceMigrator40.transferredRangesName)))
+        {
+            rowCount++;
+        }
+        assertEquals(1, rowCount);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java b/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
index d1b8ff5..3bc04c1 100644
--- a/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
 
@@ -26,6 +25,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
@@ -53,7 +53,7 @@ public class SystemKeyspaceTest
     public void testLocalTokens()
     {
         // Remove all existing tokens
-        Collection<Token> current = SystemKeyspace.loadTokens().asMap().get(FBUtilities.getLocalAddress());
+        Collection<Token> current = SystemKeyspace.loadTokens().asMap().get(FBUtilities.getLocalAddressAndPort());
         if (current != null && !current.isEmpty())
             SystemKeyspace.updateTokens(current);
 
@@ -74,7 +74,7 @@ public class SystemKeyspaceTest
     public void testNonLocalToken() throws UnknownHostException
     {
         BytesToken token = new BytesToken(ByteBufferUtil.bytes("token3"));
-        InetAddress address = InetAddress.getByName("127.0.0.2");
+        InetAddressAndPort address = InetAddressAndPort.getByName("127.0.0.2");
         SystemKeyspace.updateTokens(address, Collections.<Token>singletonList(token));
         assert SystemKeyspace.loadTokens().get(address).contains(token);
         SystemKeyspace.removeEndpoint(address);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java b/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
index 0baad3b..3e38dfc 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
@@ -19,7 +19,6 @@
 package org.apache.cassandra.db.compaction;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
@@ -31,6 +30,7 @@ import org.junit.Ignore;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.cql3.QueryProcessor;
@@ -67,7 +67,7 @@ public class AbstractPendingRepairTest extends AbstractRepairTest
         // cutoff messaging service
         MessagingService.instance().addMessageSink(new IMessageSink()
         {
-            public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
+            public boolean allowOutgoingMessage(MessageOut message, int id, InetAddressAndPort to)
             {
                 return false;
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 5f05fab..6e7e184 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db.compaction;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
@@ -35,6 +34,7 @@ import org.junit.After;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
@@ -96,7 +96,7 @@ public class AntiCompactionTest
     private void registerParentRepairSession(UUID sessionID, Collection<Range<Token>> ranges, long repairedAt, UUID pendingRepair) throws IOException
     {
         ActiveRepairService.instance.registerParentRepairSession(sessionID,
-                                                                 InetAddress.getByName("10.0.0.1"),
+                                                                 InetAddressAndPort.getByName("10.0.0.1"),
                                                                  Lists.newArrayList(cfs), ranges,
                                                                  pendingRepair != null || repairedAt != UNREPAIRED_SSTABLE,
                                                                  repairedAt, true, PreviewKind.NONE);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java
index b9e3c17..8290adf 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.db.compaction;
 
-import java.net.InetAddress;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.Set;
@@ -33,6 +32,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.schema.Schema;
@@ -58,7 +58,7 @@ public class CompactionManagerGetSSTablesForValidationTest
     private String ks;
     private static final String tbl = "tbl";
     private ColumnFamilyStore cfs;
-    private static InetAddress coordinator;
+    private static InetAddressAndPort coordinator;
 
     private static Token MT;
 
@@ -73,7 +73,7 @@ public class CompactionManagerGetSSTablesForValidationTest
     public static void setupClass() throws Exception
     {
         SchemaLoader.prepareServer();
-        coordinator = InetAddress.getByName("10.0.0.1");
+        coordinator = InetAddressAndPort.getByName("10.0.0.1");
         MT = DatabaseDescriptor.getPartitioner().getMinimumToken();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 624f119..567984d 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -195,7 +195,7 @@ public class LeveledCompactionStrategyTest
         int gcBefore = keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(FBUtilities.nowInSeconds());
         UUID parentRepSession = UUID.randomUUID();
         ActiveRepairService.instance.registerParentRepairSession(parentRepSession,
-                                                                 FBUtilities.getBroadcastAddress(),
+                                                                 FBUtilities.getBroadcastAddressAndPort(),
                                                                  Arrays.asList(cfs),
                                                                  Arrays.asList(range),
                                                                  false,
@@ -203,7 +203,7 @@ public class LeveledCompactionStrategyTest
                                                                  true,
                                                                  PreviewKind.NONE);
         RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, Arrays.asList(range));
-        Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore, PreviewKind.NONE);
+        Validator validator = new Validator(desc, FBUtilities.getBroadcastAddressAndPort(), gcBefore, PreviewKind.NONE);
         CompactionManager.instance.submitValidation(cfs, validator).get();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java b/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
index 658b87a..423ad28 100644
--- a/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
+++ b/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.db.view;
 
-import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -28,6 +27,7 @@ import org.junit.Test;
 
 import junit.framework.Assert;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.OrderPreservingPartitioner.StringToken;
@@ -59,12 +59,12 @@ public class ViewUtilsTest
         metadata.clearUnsafe();
 
         // DC1
-        metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1"));
-        metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.2"));
+        metadata.updateNormalToken(new StringToken("A"), InetAddressAndPort.getByName("127.0.0.1"));
+        metadata.updateNormalToken(new StringToken("C"), InetAddressAndPort.getByName("127.0.0.2"));
 
         // DC2
-        metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4"));
-        metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5"));
+        metadata.updateNormalToken(new StringToken("B"), InetAddressAndPort.getByName("127.0.0.4"));
+        metadata.updateNormalToken(new StringToken("D"), InetAddressAndPort.getByName("127.0.0.5"));
 
         Map<String, String> replicationMap = new HashMap<>();
         replicationMap.put(ReplicationParams.CLASS, NetworkTopologyStrategy.class.getName());
@@ -76,12 +76,12 @@ public class ViewUtilsTest
         KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, replicationMap));
         Schema.instance.load(meta);
 
-        Optional<InetAddress> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
-                                                                       new StringToken("CA"),
-                                                                       new StringToken("BB"));
+        Optional<InetAddressAndPort> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
+                                                                                        new StringToken("CA"),
+                                                                                        new StringToken("BB"));
 
         Assert.assertTrue(naturalEndpoint.isPresent());
-        Assert.assertEquals(InetAddress.getByName("127.0.0.2"), naturalEndpoint.get());
+        Assert.assertEquals(InetAddressAndPort.getByName("127.0.0.2"), naturalEndpoint.get());
     }
 
 
@@ -92,12 +92,12 @@ public class ViewUtilsTest
         metadata.clearUnsafe();
 
         // DC1
-        metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1"));
-        metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.2"));
+        metadata.updateNormalToken(new StringToken("A"), InetAddressAndPort.getByName("127.0.0.1"));
+        metadata.updateNormalToken(new StringToken("C"), InetAddressAndPort.getByName("127.0.0.2"));
 
         // DC2
-        metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4"));
-        metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5"));
+        metadata.updateNormalToken(new StringToken("B"), InetAddressAndPort.getByName("127.0.0.4"));
+        metadata.updateNormalToken(new StringToken("D"), InetAddressAndPort.getByName("127.0.0.5"));
 
         Map<String, String> replicationMap = new HashMap<>();
         replicationMap.put(ReplicationParams.CLASS, NetworkTopologyStrategy.class.getName());
@@ -109,12 +109,12 @@ public class ViewUtilsTest
         KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, replicationMap));
         Schema.instance.load(meta);
 
-        Optional<InetAddress> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
-                                                                       new StringToken("CA"),
-                                                                       new StringToken("BB"));
+        Optional<InetAddressAndPort> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
+                                                                                        new StringToken("CA"),
+                                                                                        new StringToken("BB"));
 
         Assert.assertTrue(naturalEndpoint.isPresent());
-        Assert.assertEquals(InetAddress.getByName("127.0.0.1"), naturalEndpoint.get());
+        Assert.assertEquals(InetAddressAndPort.getByName("127.0.0.1"), naturalEndpoint.get());
     }
 
     @Test
@@ -124,12 +124,12 @@ public class ViewUtilsTest
         metadata.clearUnsafe();
 
         // DC1
-        metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1"));
-        metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.2"));
+        metadata.updateNormalToken(new StringToken("A"), InetAddressAndPort.getByName("127.0.0.1"));
+        metadata.updateNormalToken(new StringToken("C"), InetAddressAndPort.getByName("127.0.0.2"));
 
         // DC2
-        metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4"));
-        metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5"));
+        metadata.updateNormalToken(new StringToken("B"), InetAddressAndPort.getByName("127.0.0.4"));
+        metadata.updateNormalToken(new StringToken("D"), InetAddressAndPort.getByName("127.0.0.5"));
 
         Map<String, String> replicationMap = new HashMap<>();
         replicationMap.put(ReplicationParams.CLASS, NetworkTopologyStrategy.class.getName());
@@ -141,9 +141,9 @@ public class ViewUtilsTest
         KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, replicationMap));
         Schema.instance.load(meta);
 
-        Optional<InetAddress> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
-                                                                       new StringToken("AB"),
-                                                                       new StringToken("BB"));
+        Optional<InetAddressAndPort> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
+                                                                                        new StringToken("AB"),
+                                                                                        new StringToken("BB"));
 
         Assert.assertFalse(naturalEndpoint.isPresent());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
index a1054bb..f11cb62 100644
--- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
+++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.dht;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Collection;
 import java.util.HashSet;
@@ -41,6 +40,7 @@ import org.junit.runner.RunWith;
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.tokenallocator.TokenAllocation;
@@ -96,32 +96,32 @@ public class BootStrapperTest
 
         generateFakeEndpoints(numOldNodes);
         Token myToken = tmd.partitioner.getRandomToken();
-        InetAddress myEndpoint = InetAddress.getByName("127.0.0.1");
+        InetAddressAndPort myEndpoint = InetAddressAndPort.getByName("127.0.0.1");
 
         assertEquals(numOldNodes, tmd.sortedTokens().size());
         RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, StreamOperation.BOOTSTRAP, true, DatabaseDescriptor.getEndpointSnitch(), new StreamStateStore(), false, 1);
         IFailureDetector mockFailureDetector = new IFailureDetector()
         {
-            public boolean isAlive(InetAddress ep)
+            public boolean isAlive(InetAddressAndPort ep)
             {
                 return true;
             }
 
-            public void interpret(InetAddress ep) { throw new UnsupportedOperationException(); }
-            public void report(InetAddress ep) { throw new UnsupportedOperationException(); }
+            public void interpret(InetAddressAndPort ep) { throw new UnsupportedOperationException(); }
+            public void report(InetAddressAndPort ep) { throw new UnsupportedOperationException(); }
             public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener) { throw new UnsupportedOperationException(); }
             public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener) { throw new UnsupportedOperationException(); }
-            public void remove(InetAddress ep) { throw new UnsupportedOperationException(); }
-            public void forceConviction(InetAddress ep) { throw new UnsupportedOperationException(); }
+            public void remove(InetAddressAndPort ep) { throw new UnsupportedOperationException(); }
+            public void forceConviction(InetAddressAndPort ep) { throw new UnsupportedOperationException(); }
         };
         s.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(mockFailureDetector));
         s.addRanges(keyspaceName, Keyspace.open(keyspaceName).getReplicationStrategy().getPendingAddressRanges(tmd, myToken, myEndpoint));
 
-        Collection<Map.Entry<InetAddress, Collection<Range<Token>>>> toFetch = s.toFetch().get(keyspaceName);
+        Collection<Map.Entry<InetAddressAndPort, Collection<Range<Token>>>> toFetch = s.toFetch().get(keyspaceName);
 
         // Check we get get RF new ranges in total
         Set<Range<Token>> ranges = new HashSet<>();
-        for (Map.Entry<InetAddress, Collection<Range<Token>>> e : toFetch)
+        for (Map.Entry<InetAddressAndPort, Collection<Range<Token>>> e : toFetch)
             ranges.addAll(e.getValue());
 
         assertEquals(replicationFactor, ranges.size());
@@ -151,7 +151,7 @@ public class BootStrapperTest
         for (int i = 1; i <= numOldNodes; i++)
         {
             // leave .1 for myEndpoint
-            InetAddress addr = InetAddress.getByName("127." + dc + "." + rack + "." + (i + 1));
+            InetAddressAndPort addr = InetAddressAndPort.getByName("127." + dc + "." + rack + "." + (i + 1));
             List<Token> tokens = Lists.newArrayListWithCapacity(numVNodes);
             for (int j = 0; j < numVNodes; ++j)
                 tokens.add(p.getRandomToken());
@@ -167,7 +167,7 @@ public class BootStrapperTest
         String ks = "BootStrapperTestKeyspace3";
         TokenMetadata tm = new TokenMetadata();
         generateFakeEndpoints(tm, 10, vn);
-        InetAddress addr = FBUtilities.getBroadcastAddress();
+        InetAddressAndPort addr = FBUtilities.getBroadcastAddressAndPort();
         allocateTokensForNode(vn, ks, tm, addr);
     }
 
@@ -184,15 +184,15 @@ public class BootStrapperTest
             // Register peers with expected DC for NetworkTopologyStrategy.
             TokenMetadata metadata = StorageService.instance.getTokenMetadata();
             metadata.clearUnsafe();
-            metadata.updateHostId(UUID.randomUUID(), InetAddress.getByName("127.1.0.99"));
-            metadata.updateHostId(UUID.randomUUID(), InetAddress.getByName("127.15.0.99"));
+            metadata.updateHostId(UUID.randomUUID(), InetAddressAndPort.getByName("127.1.0.99"));
+            metadata.updateHostId(UUID.randomUUID(), InetAddressAndPort.getByName("127.15.0.99"));
 
             SchemaLoader.createKeyspace(ks, KeyspaceParams.nts(dc, replicas, "15", 15), SchemaLoader.standardCFMD(ks, "Standard1"));
             TokenMetadata tm = StorageService.instance.getTokenMetadata();
             tm.clearUnsafe();
             for (int i = 0; i < rackCount; ++i)
                 generateFakeEndpoints(tm, 10, vn, dc, Integer.toString(i));
-            InetAddress addr = InetAddress.getByName("127." + dc + ".0.99");
+            InetAddressAndPort addr = InetAddressAndPort.getByName("127." + dc + ".0.99");
             allocateTokensForNode(vn, ks, tm, addr);
             // Note: Not matching replication factor in second datacentre, but this should not affect us.
         } finally {
@@ -230,7 +230,7 @@ public class BootStrapperTest
         testAllocateTokensNetworkStrategy(1, 1);
     }
 
-    private void allocateTokensForNode(int vn, String ks, TokenMetadata tm, InetAddress addr)
+    private void allocateTokensForNode(int vn, String ks, TokenMetadata tm, InetAddressAndPort addr)
     {
         SummaryStatistics os = TokenAllocation.replicatedOwnershipStats(tm.cloneOnlyTokenMap(), Keyspace.open(ks).getReplicationStrategy(), addr);
         Collection<Token> tokens = BootStrapper.allocateTokens(tm, addr, ks, vn, 0);
@@ -260,14 +260,14 @@ public class BootStrapperTest
         TokenMetadata tm = new TokenMetadata();
         generateFakeEndpoints(tm, 10, vn);
         
-        InetAddress dcaddr = FBUtilities.getBroadcastAddress();
+        InetAddressAndPort dcaddr = FBUtilities.getBroadcastAddressAndPort();
         SummaryStatistics os3 = TokenAllocation.replicatedOwnershipStats(tm, Keyspace.open(ks3).getReplicationStrategy(), dcaddr);
         SummaryStatistics os2 = TokenAllocation.replicatedOwnershipStats(tm, Keyspace.open(ks2).getReplicationStrategy(), dcaddr);
         String cks = ks3;
         String nks = ks2;
         for (int i=11; i<=20; ++i)
         {
-            allocateTokensForNode(vn, cks, tm, InetAddress.getByName("127.0.0." + (i + 1)));
+            allocateTokensForNode(vn, cks, tm, InetAddressAndPort.getByName("127.0.0." + (i + 1)));
             String t = cks; cks = nks; nks = t;
         }
         


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org