You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/01/25 20:12:00 UTC

[03/19] cassandra git commit: Allow storage port to be configurable per node

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/async/InboundHandshakeHandlerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/InboundHandshakeHandlerTest.java b/test/unit/org/apache/cassandra/net/async/InboundHandshakeHandlerTest.java
index 4d9829f..93a1c20 100644
--- a/test/unit/org/apache/cassandra/net/async/InboundHandshakeHandlerTest.java
+++ b/test/unit/org/apache/cassandra/net/async/InboundHandshakeHandlerTest.java
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.ArrayList;
 
+import com.google.common.net.InetAddresses;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -39,6 +40,7 @@ import io.netty.channel.embedded.EmbeddedChannel;
 import io.netty.handler.codec.compression.Lz4FrameDecoder;
 import io.netty.handler.codec.compression.Lz4FrameEncoder;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.async.HandshakeProtocol.FirstHandshakeMessage;
@@ -49,7 +51,7 @@ import static org.apache.cassandra.net.async.NettyFactory.Mode.MESSAGING;
 
 public class InboundHandshakeHandlerTest
 {
-    private static final InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 0);
+    private static final InetAddressAndPort addr = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 0);
     private static final int MESSAGING_VERSION = MessagingService.current_version;
     private static final int VERSION_30 = MessagingService.VERSION_30;
 
@@ -84,7 +86,7 @@ public class InboundHandshakeHandlerTest
     {
         handler = new InboundHandshakeHandler(new TestAuthenticator(true));
         channel = new EmbeddedChannel(handler);
-        boolean result = handler.handleAuthenticate(addr, channel.pipeline().firstContext());
+        boolean result = handler.handleAuthenticate(new InetSocketAddress(addr.address, addr.port), channel.pipeline().firstContext());
         Assert.assertTrue(result);
         Assert.assertTrue(channel.isOpen());
     }
@@ -92,7 +94,7 @@ public class InboundHandshakeHandlerTest
     @Test
     public void handleAuthenticate_Bad()
     {
-        boolean result = handler.handleAuthenticate(addr, channel.pipeline().firstContext());
+        boolean result = handler.handleAuthenticate(new InetSocketAddress(addr.address, addr.port), channel.pipeline().firstContext());
         Assert.assertFalse(result);
         Assert.assertFalse(channel.isOpen());
         Assert.assertFalse(channel.isActive());
@@ -178,7 +180,7 @@ public class InboundHandshakeHandlerTest
         if (buf.refCnt() > 0)
             buf.release();
 
-        buf = new ThirdHandshakeMessage(MESSAGING_VERSION, addr.getAddress()).encode(PooledByteBufAllocator.DEFAULT);
+        buf = new ThirdHandshakeMessage(MESSAGING_VERSION, addr).encode(PooledByteBufAllocator.DEFAULT);
         state = handler.handleMessagingStartResponse(channel.pipeline().firstContext(), buf);
 
         Assert.assertEquals(State.HANDSHAKE_COMPLETE, state);
@@ -203,7 +205,7 @@ public class InboundHandshakeHandlerTest
     {
         buf = Unpooled.buffer(32, 32);
         buf.writeInt(MESSAGING_VERSION + 1);
-        CompactEndpointSerializationHelper.serialize(addr.getAddress(), new ByteBufOutputStream(buf));
+        CompactEndpointSerializationHelper.instance.serialize(addr, new ByteBufDataOutputPlus(buf), MESSAGING_VERSION + 1);
         State state = handler.handleMessagingStartResponse(channel.pipeline().firstContext(), buf);
         Assert.assertEquals(State.HANDSHAKE_FAIL, state);
         Assert.assertFalse(channel.isOpen());
@@ -215,7 +217,7 @@ public class InboundHandshakeHandlerTest
     {
         buf = Unpooled.buffer(32, 32);
         buf.writeInt(MESSAGING_VERSION);
-        CompactEndpointSerializationHelper.serialize(addr.getAddress(), new ByteBufOutputStream(buf));
+        CompactEndpointSerializationHelper.instance.serialize(addr, new ByteBufDataOutputPlus(buf), MESSAGING_VERSION);
         State state = handler.handleMessagingStartResponse(channel.pipeline().firstContext(), buf);
         Assert.assertEquals(State.HANDSHAKE_COMPLETE, state);
         Assert.assertTrue(channel.isOpen());
@@ -228,7 +230,7 @@ public class InboundHandshakeHandlerTest
         ChannelPipeline pipeline = channel.pipeline();
         Assert.assertNotNull(pipeline.get(InboundHandshakeHandler.class));
 
-        handler.setupMessagingPipeline(pipeline, addr.getAddress(), false, MESSAGING_VERSION);
+        handler.setupMessagingPipeline(pipeline, addr, false, MESSAGING_VERSION);
         Assert.assertNotNull(pipeline.get(MessageInHandler.class));
         Assert.assertNull(pipeline.get(Lz4FrameDecoder.class));
         Assert.assertNull(pipeline.get(Lz4FrameEncoder.class));
@@ -241,7 +243,7 @@ public class InboundHandshakeHandlerTest
         ChannelPipeline pipeline = channel.pipeline();
         Assert.assertNotNull(pipeline.get(InboundHandshakeHandler.class));
 
-        handler.setupMessagingPipeline(pipeline, addr.getAddress(), true, MESSAGING_VERSION);
+        handler.setupMessagingPipeline(pipeline, addr, true, MESSAGING_VERSION);
         Assert.assertNotNull(pipeline.get(MessageInHandler.class));
         Assert.assertNotNull(pipeline.get(Lz4FrameDecoder.class));
         Assert.assertNull(pipeline.get(Lz4FrameEncoder.class));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/async/MessageInHandlerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/MessageInHandlerTest.java b/test/unit/org/apache/cassandra/net/async/MessageInHandlerTest.java
index bb82d2c..43cb964 100644
--- a/test/unit/org/apache/cassandra/net/async/MessageInHandlerTest.java
+++ b/test/unit/org/apache/cassandra/net/async/MessageInHandlerTest.java
@@ -20,40 +20,36 @@ package org.apache.cassandra.net.async;
 
 import java.io.EOFException;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.function.BiConsumer;
 
-import com.google.common.base.Charsets;
+import com.google.common.net.InetAddresses;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufOutputStream;
-import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
-import io.netty.channel.Channel;
 import io.netty.channel.embedded.EmbeddedChannel;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.util.concurrent.Future;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ParameterType;
 import org.apache.cassandra.net.async.MessageInHandler.MessageHeader;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.NanoTimeToCurrentTimeMillis;
+import org.apache.cassandra.utils.UUIDGen;
 
 public class MessageInHandlerTest
 {
-    private static final InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 0);
+    private static final InetAddressAndPort addr = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 0);
     private static final int MSG_VERSION = MessagingService.current_version;
 
     private static final int MSG_ID = 42;
@@ -81,7 +77,7 @@ public class MessageInHandlerTest
         buf.writeInt(-1);
         buf.writerIndex(len);
 
-        MessageInHandler handler = new MessageInHandler(addr.getAddress(), MSG_VERSION, null);
+        MessageInHandler handler = new MessageInHandler(addr, MSG_VERSION, null);
         EmbeddedChannel channel = new EmbeddedChannel(handler);
         Assert.assertTrue(channel.isOpen());
         channel.writeInbound(buf);
@@ -98,22 +94,25 @@ public class MessageInHandlerTest
     @Test
     public void decode_HappyPath_WithParameters() throws Exception
     {
-        Map<String, byte[]> parameters = new HashMap<>();
-        parameters.put("p1", "val1".getBytes(Charsets.UTF_8));
-        parameters.put("p2", "val2".getBytes(Charsets.UTF_8));
+        UUID uuid = UUIDGen.getTimeUUID();
+        Map<ParameterType, Object> parameters = new HashMap<>();
+        parameters.put(ParameterType.FAILURE_REASON, (short)42);
+        parameters.put(ParameterType.TRACE_SESSION, uuid);
         MessageInWrapper result = decode_HappyPath(parameters);
         Assert.assertEquals(2, result.messageIn.parameters.size());
+        Assert.assertEquals((short)42, result.messageIn.parameters.get(ParameterType.FAILURE_REASON));
+        Assert.assertEquals(uuid, result.messageIn.parameters.get(ParameterType.TRACE_SESSION));
     }
 
-    private MessageInWrapper decode_HappyPath(Map<String, byte[]> parameters) throws Exception
+    private MessageInWrapper decode_HappyPath(Map<ParameterType, Object> parameters) throws Exception
     {
         MessageOut msgOut = new MessageOut(MessagingService.Verb.ECHO);
-        for (Map.Entry<String, byte[]> param : parameters.entrySet())
+        for (Map.Entry<ParameterType, Object> param : parameters.entrySet())
             msgOut = msgOut.withParameter(param.getKey(), param.getValue());
         serialize(msgOut);
 
         MessageInWrapper wrapper = new MessageInWrapper();
-        MessageInHandler handler = new MessageInHandler(addr.getAddress(), MSG_VERSION, wrapper.messageConsumer);
+        MessageInHandler handler = new MessageInHandler(addr, MSG_VERSION, wrapper.messageConsumer);
         List<Object> out = new ArrayList<>();
         handler.decode(null, buf, out);
 
@@ -140,7 +139,7 @@ public class MessageInHandlerTest
     public void decode_WithHalfReceivedParameters() throws Exception
     {
         MessageOut msgOut = new MessageOut(MessagingService.Verb.ECHO);
-        msgOut = msgOut.withParameter("p3", "val1".getBytes(Charsets.UTF_8));
+        msgOut = msgOut.withParameter(ParameterType.FAILURE_REASON, (short)42);
 
         serialize(msgOut);
 
@@ -150,7 +149,7 @@ public class MessageInHandlerTest
         buf.writerIndex(originalWriterIndex - 6);
 
         MessageInWrapper wrapper = new MessageInWrapper();
-        MessageInHandler handler = new MessageInHandler(addr.getAddress(), MSG_VERSION, wrapper.messageConsumer);
+        MessageInHandler handler = new MessageInHandler(addr, MSG_VERSION, wrapper.messageConsumer);
         List<Object> out = new ArrayList<>();
         handler.decode(null, buf, out);
 
@@ -221,7 +220,7 @@ public class MessageInHandlerTest
     @Test
     public void exceptionHandled()
     {
-        MessageInHandler handler = new MessageInHandler(addr.getAddress(), MSG_VERSION, null);
+        MessageInHandler handler = new MessageInHandler(addr, MSG_VERSION, null);
         EmbeddedChannel channel = new EmbeddedChannel(handler);
         Assert.assertTrue(channel.isOpen());
         handler.exceptionCaught(channel.pipeline().firstContext(), new EOFException());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/async/MessageOutHandlerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/MessageOutHandlerTest.java b/test/unit/org/apache/cassandra/net/async/MessageOutHandlerTest.java
index 566dfdb..86112ae 100644
--- a/test/unit/org/apache/cassandra/net/async/MessageOutHandlerTest.java
+++ b/test/unit/org/apache/cassandra/net/async/MessageOutHandlerTest.java
@@ -44,8 +44,10 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ParameterType;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.UUIDGen;
 
@@ -65,15 +67,15 @@ public class MessageOutHandlerTest
     }
 
     @Before
-    public void setup()
+    public void setup() throws Exception
     {
         setup(MessageOutHandler.AUTO_FLUSH_THRESHOLD);
     }
 
-    private void setup(int flushThreshold)
+    private void setup(int flushThreshold) throws Exception
     {
-        OutboundConnectionIdentifier connectionId = OutboundConnectionIdentifier.small(new InetSocketAddress("127.0.0.1", 0),
-                                                                                       new InetSocketAddress("127.0.0.2", 0));
+        OutboundConnectionIdentifier connectionId = OutboundConnectionIdentifier.small(InetAddressAndPort.getByNameOverrideDefaults("127.0.0.1", 0),
+                                                                                       InetAddressAndPort.getByNameOverrideDefaults("127.0.0.2", 0));
         OutboundMessagingConnection omc = new NonSendingOutboundMessagingConnection(connectionId, null, Optional.empty());
         channel = new EmbeddedChannel();
         channelWriter = ChannelWriter.create(channel, omc::handleMessageResult, Optional.empty());
@@ -91,7 +93,7 @@ public class MessageOutHandlerTest
     }
 
     @Test
-    public void write_WithFlush() throws ExecutionException, InterruptedException, TimeoutException
+    public void write_WithFlush() throws Exception
     {
         setup(1);
         MessageOut message = new MessageOut(MessagingService.Verb.ECHO);
@@ -217,7 +219,7 @@ public class MessageOutHandlerTest
     public void captureTracingInfo_ForceException()
     {
         MessageOut message = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE)
-                             .withParameter(Tracing.TRACE_HEADER, new byte[9]);
+                             .withParameter(ParameterType.TRACE_SESSION, new byte[9]);
         handler.captureTracingInfo(new QueuedMessage(message, 42));
     }
 
@@ -226,7 +228,7 @@ public class MessageOutHandlerTest
     {
         UUID uuid = UUID.randomUUID();
         MessageOut message = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE)
-                             .withParameter(Tracing.TRACE_HEADER, UUIDGen.decompose(uuid));
+                             .withParameter(ParameterType.TRACE_SESSION, uuid);
         handler.captureTracingInfo(new QueuedMessage(message, 42));
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java b/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java
index 0550490..4607d5c 100644
--- a/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java
+++ b/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.net.async;
 
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.Optional;
 
@@ -47,6 +46,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.async.NettyFactory.InboundInitializer;
 import org.apache.cassandra.net.async.NettyFactory.OutboundInitializer;
@@ -56,8 +56,8 @@ import org.apache.cassandra.utils.NativeLibrary;
 
 public class NettyFactoryTest
 {
-    private static final InetSocketAddress LOCAL_ADDR = new InetSocketAddress("127.0.0.1", 9876);
-    private static final InetSocketAddress REMOTE_ADDR = new InetSocketAddress("127.0.0.2", 9876);
+    private static final InetAddressAndPort LOCAL_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 9876);
+    private static final InetAddressAndPort REMOTE_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 9876);
     private static final int receiveBufferSize = 1 << 16;
     private static final IInternodeAuthenticator AUTHENTICATOR = new AllowAllInternodeAuthenticator();
     private static final boolean EPOLL_AVAILABLE = NativeTransportService.useEpoll();
@@ -129,7 +129,6 @@ public class NettyFactoryTest
         Channel inboundChannel = null;
         try
         {
-            InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 9876);
             InboundInitializer inboundInitializer = new InboundInitializer(AUTHENTICATOR, null, channelGroup);
             inboundChannel = NettyFactory.instance.createInboundChannel(LOCAL_ADDR, inboundInitializer, receiveBufferSize);
             NettyFactory.instance.createInboundChannel(LOCAL_ADDR, inboundInitializer, receiveBufferSize);
@@ -144,7 +143,7 @@ public class NettyFactoryTest
     @Test(expected = ConfigurationException.class)
     public void createServerChannel_UnbindableAddress()
     {
-        InetSocketAddress addr = new InetSocketAddress("1.1.1.1", 9876);
+        InetAddressAndPort addr = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("1.1.1.1"), 9876);
         InboundInitializer inboundInitializer = new InboundInitializer(AUTHENTICATOR, null, channelGroup);
         NettyFactory.instance.createInboundChannel(addr, inboundInitializer, receiveBufferSize);
     }
@@ -162,10 +161,10 @@ public class NettyFactoryTest
         Assert.assertEquals(2, NettyFactory.determineAcceptGroupSize(serverEncryptionOptions));
         serverEncryptionOptions.enable_legacy_ssl_storage_port = false;
 
-        InetAddress originalBroadcastAddr = FBUtilities.getBroadcastAddress();
+        InetAddressAndPort originalBroadcastAddr = FBUtilities.getBroadcastAddressAndPort();
         try
         {
-            FBUtilities.setBroadcastInetAddress(InetAddresses.increment(FBUtilities.getLocalAddress()));
+            FBUtilities.setBroadcastInetAddress(InetAddresses.increment(FBUtilities.getLocalAddressAndPort().address));
             DatabaseDescriptor.setListenOnBroadcastAddress(true);
 
             serverEncryptionOptions.enabled = false;
@@ -178,7 +177,7 @@ public class NettyFactoryTest
         }
         finally
         {
-            FBUtilities.setBroadcastInetAddress(originalBroadcastAddr);
+            FBUtilities.setBroadcastInetAddress(originalBroadcastAddr.address);
             DatabaseDescriptor.setListenOnBroadcastAddress(false);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/async/OutboundHandshakeHandlerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/OutboundHandshakeHandlerTest.java b/test/unit/org/apache/cassandra/net/async/OutboundHandshakeHandlerTest.java
index f8bfab1..be71fd4 100644
--- a/test/unit/org/apache/cassandra/net/async/OutboundHandshakeHandlerTest.java
+++ b/test/unit/org/apache/cassandra/net/async/OutboundHandshakeHandlerTest.java
@@ -23,6 +23,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
 
+import com.google.common.net.InetAddresses;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -38,6 +39,7 @@ import io.netty.channel.embedded.EmbeddedChannel;
 import io.netty.handler.codec.compression.Lz4FrameDecoder;
 import io.netty.handler.codec.compression.Lz4FrameEncoder;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.async.HandshakeProtocol.SecondHandshakeMessage;
 import org.apache.cassandra.net.async.OutboundHandshakeHandler.HandshakeResult;
@@ -47,8 +49,8 @@ import static org.apache.cassandra.net.async.OutboundHandshakeHandler.HandshakeR
 public class OutboundHandshakeHandlerTest
 {
     private static final int MESSAGING_VERSION = MessagingService.current_version;
-    private static final InetSocketAddress localAddr = new InetSocketAddress("127.0.0.1", 0);
-    private static final InetSocketAddress remoteAddr = new InetSocketAddress("127.0.0.2", 0);
+    private static final InetAddressAndPort localAddr = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 0);
+    private static final InetAddressAndPort remoteAddr = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 0);
     private static final String HANDLER_NAME = "clientHandshakeHandler";
 
     private EmbeddedChannel channel;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java b/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
index 641c28c..bf6e066 100644
--- a/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
+++ b/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.net.ssl.SSLHandshakeException;
 
+import com.google.common.net.InetAddresses;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -45,6 +46,7 @@ import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.locator.AbstractEndpointSnitch;
 import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.MessagingServiceTest;
@@ -59,9 +61,9 @@ import static org.apache.cassandra.net.async.OutboundMessagingConnection.State.R
 
 public class OutboundMessagingConnectionTest
 {
-    private static final InetSocketAddress LOCAL_ADDR = new InetSocketAddress("127.0.0.1", 9998);
-    private static final InetSocketAddress REMOTE_ADDR = new InetSocketAddress("127.0.0.2", 9999);
-    private static final InetSocketAddress RECONNECT_ADDR = new InetSocketAddress("127.0.0.3", 9999);
+    private static final InetAddressAndPort LOCAL_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 9998);
+    private static final InetAddressAndPort REMOTE_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 9999);
+    private static final InetAddressAndPort RECONNECT_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.3"), 9999);
     private static final int MESSAGING_VERSION = MessagingService.current_version;
 
     private OutboundConnectionIdentifier connectionId;
@@ -131,47 +133,47 @@ public class OutboundMessagingConnectionTest
     public void shouldCompressConnection_None()
     {
         DatabaseDescriptor.setInternodeCompression(Config.InternodeCompression.none);
-        Assert.assertFalse(OutboundMessagingConnection.shouldCompressConnection(LOCAL_ADDR.getAddress(), REMOTE_ADDR.getAddress()));
+        Assert.assertFalse(OutboundMessagingConnection.shouldCompressConnection(LOCAL_ADDR, REMOTE_ADDR));
     }
 
     @Test
     public void shouldCompressConnection_All()
     {
         DatabaseDescriptor.setInternodeCompression(Config.InternodeCompression.all);
-        Assert.assertTrue(OutboundMessagingConnection.shouldCompressConnection(LOCAL_ADDR.getAddress(), REMOTE_ADDR.getAddress()));
+        Assert.assertTrue(OutboundMessagingConnection.shouldCompressConnection(LOCAL_ADDR, REMOTE_ADDR));
     }
 
     @Test
     public void shouldCompressConnection_SameDc()
     {
         TestSnitch snitch = new TestSnitch();
-        snitch.add(LOCAL_ADDR.getAddress(), "dc1");
-        snitch.add(REMOTE_ADDR.getAddress(), "dc1");
+        snitch.add(LOCAL_ADDR, "dc1");
+        snitch.add(REMOTE_ADDR, "dc1");
         DatabaseDescriptor.setEndpointSnitch(snitch);
         DatabaseDescriptor.setInternodeCompression(Config.InternodeCompression.dc);
-        Assert.assertFalse(OutboundMessagingConnection.shouldCompressConnection(LOCAL_ADDR.getAddress(), REMOTE_ADDR.getAddress()));
+        Assert.assertFalse(OutboundMessagingConnection.shouldCompressConnection(LOCAL_ADDR, REMOTE_ADDR));
     }
 
     private static class TestSnitch extends AbstractEndpointSnitch
     {
-        private Map<InetAddress, String> nodeToDc = new HashMap<>();
+        private Map<InetAddressAndPort, String> nodeToDc = new HashMap<>();
 
-        void add(InetAddress node, String dc)
+        void add(InetAddressAndPort node, String dc)
         {
             nodeToDc.put(node, dc);
         }
 
-        public String getRack(InetAddress endpoint)
+        public String getRack(InetAddressAndPort endpoint)
         {
             return null;
         }
 
-        public String getDatacenter(InetAddress endpoint)
+        public String getDatacenter(InetAddressAndPort endpoint)
         {
             return nodeToDc.get(endpoint);
         }
 
-        public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+        public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2)
         {
             return 0;
         }
@@ -181,11 +183,11 @@ public class OutboundMessagingConnectionTest
     public void shouldCompressConnection_DifferentDc()
     {
         TestSnitch snitch = new TestSnitch();
-        snitch.add(LOCAL_ADDR.getAddress(), "dc1");
-        snitch.add(REMOTE_ADDR.getAddress(), "dc2");
+        snitch.add(LOCAL_ADDR, "dc1");
+        snitch.add(REMOTE_ADDR, "dc2");
         DatabaseDescriptor.setEndpointSnitch(snitch);
         DatabaseDescriptor.setInternodeCompression(Config.InternodeCompression.dc);
-        Assert.assertTrue(OutboundMessagingConnection.shouldCompressConnection(LOCAL_ADDR.getAddress(), REMOTE_ADDR.getAddress()));
+        Assert.assertTrue(OutboundMessagingConnection.shouldCompressConnection(LOCAL_ADDR, REMOTE_ADDR));
     }
 
     @Test
@@ -247,7 +249,7 @@ public class OutboundMessagingConnectionTest
 
         MessageOut messageOut = new MessageOut(MessagingService.Verb.GOSSIP_DIGEST_ACK);
         OutboundMessagingPool pool = new OutboundMessagingPool(REMOTE_ADDR, LOCAL_ADDR, null,
-                                                               new MessagingServiceTest.MockBackPressureStrategy(null).newState(REMOTE_ADDR.getAddress()), auth);
+                                                               new MessagingServiceTest.MockBackPressureStrategy(null).newState(REMOTE_ADDR), auth);
         omc = pool.getConnection(messageOut);
         Assert.assertSame(State.NOT_READY, omc.getState());
         Assert.assertFalse(omc.connect());
@@ -371,7 +373,7 @@ public class OutboundMessagingConnectionTest
         Assert.assertFalse(channelWriter.isClosed());
         Assert.assertEquals(channelWriter, omc.getChannelWriter());
         Assert.assertEquals(READY, omc.getState());
-        Assert.assertEquals(MESSAGING_VERSION, MessagingService.instance().getVersion(REMOTE_ADDR.getAddress()));
+        Assert.assertEquals(MESSAGING_VERSION, MessagingService.instance().getVersion(REMOTE_ADDR));
         Assert.assertNull(omc.getConnectionTimeoutFuture());
         Assert.assertTrue(connectionTimeoutFuture.isCancelled());
     }
@@ -391,7 +393,7 @@ public class OutboundMessagingConnectionTest
         Assert.assertTrue(channelWriter.isClosed());
         Assert.assertNull(omc.getChannelWriter());
         Assert.assertEquals(CLOSED, omc.getState());
-        Assert.assertEquals(MESSAGING_VERSION, MessagingService.instance().getVersion(REMOTE_ADDR.getAddress()));
+        Assert.assertEquals(MESSAGING_VERSION, MessagingService.instance().getVersion(REMOTE_ADDR));
         Assert.assertNull(omc.getConnectionTimeoutFuture());
         Assert.assertTrue(connectionTimeoutFuture.isCancelled());
     }
@@ -408,7 +410,7 @@ public class OutboundMessagingConnectionTest
         omc.finishHandshake(result);
         Assert.assertNotNull(omc.getChannelWriter());
         Assert.assertEquals(CREATING_CHANNEL, omc.getState());
-        Assert.assertEquals(MESSAGING_VERSION, MessagingService.instance().getVersion(REMOTE_ADDR.getAddress()));
+        Assert.assertEquals(MESSAGING_VERSION, MessagingService.instance().getVersion(REMOTE_ADDR));
         Assert.assertEquals(count, omc.backlogSize());
     }
 
@@ -423,7 +425,7 @@ public class OutboundMessagingConnectionTest
         HandshakeResult result = HandshakeResult.failed();
         omc.finishHandshake(result);
         Assert.assertEquals(NOT_READY, omc.getState());
-        Assert.assertEquals(MESSAGING_VERSION, MessagingService.instance().getVersion(REMOTE_ADDR.getAddress()));
+        Assert.assertEquals(MESSAGING_VERSION, MessagingService.instance().getVersion(REMOTE_ADDR));
         Assert.assertEquals(0, omc.backlogSize());
     }
 
@@ -512,8 +514,8 @@ public class OutboundMessagingConnectionTest
         OutboundConnectionIdentifier connectionId = omc.getConnectionId();
         omc.maybeUpdateConnectionId();
         Assert.assertNotEquals(connectionId, omc.getConnectionId());
-        Assert.assertEquals(new InetSocketAddress(REMOTE_ADDR.getAddress(), DatabaseDescriptor.getSSLStoragePort()), omc.getConnectionId().remoteAddress());
-        Assert.assertEquals(new InetSocketAddress(REMOTE_ADDR.getAddress(), DatabaseDescriptor.getSSLStoragePort()), omc.getConnectionId().connectionAddress());
+        Assert.assertEquals(InetAddressAndPort.getByAddressOverrideDefaults(REMOTE_ADDR.address, DatabaseDescriptor.getSSLStoragePort()), omc.getConnectionId().remote());
+        Assert.assertEquals(InetAddressAndPort.getByAddressOverrideDefaults(REMOTE_ADDR.address, DatabaseDescriptor.getSSLStoragePort()), omc.getConnectionId().connectionAddress());
         Assert.assertEquals(peerVersion, omc.getTargetVersion());
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/async/OutboundMessagingPoolTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/OutboundMessagingPoolTest.java b/test/unit/org/apache/cassandra/net/async/OutboundMessagingPoolTest.java
index 655cd15..ecd8697 100644
--- a/test/unit/org/apache/cassandra/net/async/OutboundMessagingPoolTest.java
+++ b/test/unit/org/apache/cassandra/net/async/OutboundMessagingPoolTest.java
@@ -18,10 +18,10 @@
 
 package org.apache.cassandra.net.async;
 
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
 
+import com.google.common.net.InetAddresses;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -35,6 +35,7 @@ import org.apache.cassandra.gms.GossipDigestSyn;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.BackPressureState;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
@@ -42,9 +43,9 @@ import org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionTyp
 
 public class OutboundMessagingPoolTest
 {
-    private static final InetSocketAddress LOCAL_ADDR = new InetSocketAddress("127.0.0.1", 9476);
-    private static final InetSocketAddress REMOTE_ADDR = new InetSocketAddress("127.0.0.2", 9476);
-    private static final InetSocketAddress RECONNECT_ADDR = new InetSocketAddress("127.0.0.3", 9476);
+    private static final InetAddressAndPort LOCAL_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 9476);
+    private static final InetAddressAndPort REMOTE_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 9476);
+    private static final InetAddressAndPort RECONNECT_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.3"), 9476);
     private static final List<ConnectionType> INTERNODE_MESSAGING_CONN_TYPES = new ArrayList<ConnectionType>()
             {{ add(ConnectionType.GOSSIP); add(ConnectionType.LARGE_MESSAGE); add(ConnectionType.SMALL_MESSAGE); }};
 
@@ -59,7 +60,7 @@ public class OutboundMessagingPoolTest
     @Before
     public void setup()
     {
-        BackPressureState backPressureState = DatabaseDescriptor.getBackPressureStrategy().newState(REMOTE_ADDR.getAddress());
+        BackPressureState backPressureState = DatabaseDescriptor.getBackPressureStrategy().newState(REMOTE_ADDR);
         pool = new OutboundMessagingPool(REMOTE_ADDR, LOCAL_ADDR, null, backPressureState, new AllowAllInternodeAuthenticator());
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java b/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
index 21c51c6..970e648 100644
--- a/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
+++ b/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.repair;
 
-import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Set;
 import java.util.UUID;
@@ -33,6 +32,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -41,19 +41,19 @@ import org.apache.cassandra.utils.UUIDGen;
 @Ignore
 public abstract class AbstractRepairTest
 {
-    protected static final InetAddress COORDINATOR;
-    protected static final InetAddress PARTICIPANT1;
-    protected static final InetAddress PARTICIPANT2;
-    protected static final InetAddress PARTICIPANT3;
+    protected static final InetAddressAndPort COORDINATOR;
+    protected static final InetAddressAndPort PARTICIPANT1;
+    protected static final InetAddressAndPort PARTICIPANT2;
+    protected static final InetAddressAndPort PARTICIPANT3;
 
     static
     {
         try
         {
-            COORDINATOR = InetAddress.getByName("10.0.0.1");
-            PARTICIPANT1 = InetAddress.getByName("10.0.0.1");
-            PARTICIPANT2 = InetAddress.getByName("10.0.0.2");
-            PARTICIPANT3 = InetAddress.getByName("10.0.0.3");
+            COORDINATOR = InetAddressAndPort.getByName("10.0.0.1");
+            PARTICIPANT1 = InetAddressAndPort.getByName("10.0.0.1");
+            PARTICIPANT2 = InetAddressAndPort.getByName("10.0.0.2");
+            PARTICIPANT3 = InetAddressAndPort.getByName("10.0.0.3");
         }
         catch (UnknownHostException e)
         {
@@ -64,7 +64,7 @@ public abstract class AbstractRepairTest
         DatabaseDescriptor.daemonInitialization();
     }
 
-    protected static final Set<InetAddress> PARTICIPANTS = ImmutableSet.of(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3);
+    protected static final Set<InetAddressAndPort> PARTICIPANTS = ImmutableSet.of(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3);
 
     protected static Token t(int v)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
index 7f3dbff..95046bd 100644
--- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.repair;
 
-import java.net.InetAddress;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
@@ -36,6 +35,7 @@ import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
@@ -78,8 +78,8 @@ public class LocalSyncTaskTest extends AbstractRepairTest
     @Test
     public void testNoDifference() throws Throwable
     {
-        final InetAddress ep1 = InetAddress.getByName("127.0.0.1");
-        final InetAddress ep2 = InetAddress.getByName("127.0.0.1");
+        final InetAddressAndPort ep1 = InetAddressAndPort.getByName("127.0.0.1");
+        final InetAddressAndPort ep2 = InetAddressAndPort.getByName("127.0.0.1");
 
         Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
         RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range));
@@ -106,7 +106,7 @@ public class LocalSyncTaskTest extends AbstractRepairTest
         Keyspace keyspace = Keyspace.open(KEYSPACE1);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
 
-        ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddress(),
+        ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddressAndPort(),
                                                                  Arrays.asList(cfs), Arrays.asList(range), false,
                                                                  ActiveRepairService.UNREPAIRED_SSTABLE, false,
                                                                  PreviewKind.NONE);
@@ -128,8 +128,8 @@ public class LocalSyncTaskTest extends AbstractRepairTest
 
         // difference the trees
         // note: we reuse the same endpoint which is bogus in theory but fine here
-        TreeResponse r1 = new TreeResponse(InetAddress.getByName("127.0.0.1"), tree1);
-        TreeResponse r2 = new TreeResponse(InetAddress.getByName("127.0.0.2"), tree2);
+        TreeResponse r1 = new TreeResponse(InetAddressAndPort.getByName("127.0.0.1"), tree1);
+        TreeResponse r2 = new TreeResponse(InetAddressAndPort.getByName("127.0.0.2"), tree2);
         LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, false, PreviewKind.NONE);
         task.run();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java b/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java
index db76f73..2044106 100644
--- a/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Sets;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.RepairRunnable.CommonRange;
 
 import static org.apache.cassandra.repair.RepairRunnable.filterCommonRanges;
@@ -53,7 +54,7 @@ public class RepairRunnableTest extends AbstractRepairTest
     {
         CommonRange cr1 = new CommonRange(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2), Sets.newHashSet(RANGE1, RANGE2));
         CommonRange cr2 = new CommonRange(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3), Sets.newHashSet(RANGE3));
-        Set<InetAddress> liveEndpoints = Sets.newHashSet(PARTICIPANT2, PARTICIPANT3); // PARTICIPANT1 is excluded
+        Set<InetAddressAndPort> liveEndpoints = Sets.newHashSet(PARTICIPANT2, PARTICIPANT3); // PARTICIPANT1 is excluded
 
         List<CommonRange> initial = Lists.newArrayList(cr1, cr2);
         List<CommonRange> expected = Lists.newArrayList(new CommonRange(Sets.newHashSet(PARTICIPANT2), Sets.newHashSet(RANGE1, RANGE2)),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
index 984218d..54f0511 100644
--- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
@@ -19,7 +19,6 @@
 package org.apache.cassandra.repair;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.Arrays;
 import java.util.Set;
 import java.util.UUID;
@@ -35,7 +34,7 @@ import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.UUIDGen;
@@ -54,7 +53,7 @@ public class RepairSessionTest
     @Test
     public void testConviction() throws Exception
     {
-        InetAddress remote = InetAddress.getByName("127.0.0.2");
+        InetAddressAndPort remote = InetAddressAndPort.getByName("127.0.0.2");
         Gossiper.instance.initializeNodeUnsafe(remote, UUID.randomUUID(), 1);
 
         // Set up RepairSession
@@ -62,7 +61,7 @@ public class RepairSessionTest
         UUID sessionId = UUID.randomUUID();
         IPartitioner p = Murmur3Partitioner.instance;
         Range<Token> repairRange = new Range<>(p.getToken(ByteBufferUtil.bytes(0)), p.getToken(ByteBufferUtil.bytes(100)));
-        Set<InetAddress> endpoints = Sets.newHashSet(remote);
+        Set<InetAddressAndPort> endpoints = Sets.newHashSet(remote);
         RepairSession session = new RepairSession(parentSessionId, sessionId, Arrays.asList(repairRange),
                                                   "Keyspace1", RepairParallelism.SEQUENTIAL,
                                                   endpoints, false, false, false,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index b45edc1..322772a 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.repair;
 
-import java.net.InetAddress;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
@@ -34,6 +33,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.BufferDecoratedKey;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -95,7 +95,7 @@ public class ValidatorTest
 
         final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
 
-        InetAddress remote = InetAddress.getByName("127.0.0.2");
+        InetAddressAndPort remote = InetAddressAndPort.getByName("127.0.0.2");
 
         ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily);
 
@@ -134,7 +134,7 @@ public class ValidatorTest
 
         final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
 
-        InetAddress remote = InetAddress.getByName("127.0.0.2");
+        InetAddressAndPort remote = InetAddressAndPort.getByName("127.0.0.2");
 
         Validator validator = new Validator(desc, remote, 0, PreviewKind.NONE);
         validator.fail();
@@ -189,12 +189,12 @@ public class ValidatorTest
                                                cfs.getTableName(), Collections.singletonList(new Range<>(sstable.first.getToken(),
                                                                                                                 sstable.last.getToken())));
 
-        ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddress(),
+        ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddressAndPort(),
                                                                  Collections.singletonList(cfs), desc.ranges, false, ActiveRepairService.UNREPAIRED_SSTABLE,
                                                                  false, PreviewKind.NONE);
 
         final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
-        Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), 0, true, false, PreviewKind.NONE);
+        Validator validator = new Validator(desc, FBUtilities.getBroadcastAddressAndPort(), 0, true, false, PreviewKind.NONE);
         CompactionManager.instance.submitValidation(cfs, validator);
 
         MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
@@ -218,7 +218,7 @@ public class ValidatorTest
         final CompletableFuture<MessageOut> future = new CompletableFuture<>();
         MessagingService.instance().addMessageSink(new IMessageSink()
         {
-            public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
+            public boolean allowOutgoingMessage(MessageOut message, int id, InetAddressAndPort to)
             {
                 future.complete(message);
                 return false;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/asymmetric/DifferenceHolderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/asymmetric/DifferenceHolderTest.java b/test/unit/org/apache/cassandra/repair/asymmetric/DifferenceHolderTest.java
index 52a43e6..9693010 100644
--- a/test/unit/org/apache/cassandra/repair/asymmetric/DifferenceHolderTest.java
+++ b/test/unit/org/apache/cassandra/repair/asymmetric/DifferenceHolderTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.repair.asymmetric;
 
-import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Iterator;
 
@@ -30,6 +29,7 @@ import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.TreeResponse;
 import org.apache.cassandra.utils.MerkleTree;
 import org.apache.cassandra.utils.MerkleTrees;
@@ -43,8 +43,8 @@ public class DifferenceHolderTest
     @Test
     public void testFromEmptyMerkleTrees() throws UnknownHostException
     {
-        InetAddress a1 = InetAddress.getByName("127.0.0.1");
-        InetAddress a2 = InetAddress.getByName("127.0.0.2");
+        InetAddressAndPort a1 = InetAddressAndPort.getByName("127.0.0.1");
+        InetAddressAndPort a2 = InetAddressAndPort.getByName("127.0.0.2");
 
         MerkleTrees mt1 = new MerkleTrees(Murmur3Partitioner.instance);
         MerkleTrees mt2 = new MerkleTrees(Murmur3Partitioner.instance);
@@ -64,8 +64,8 @@ public class DifferenceHolderTest
         IPartitioner partitioner = Murmur3Partitioner.instance;
         Range<Token> fullRange = new Range<>(partitioner.getMinimumToken(), partitioner.getMinimumToken());
         int maxsize = 16;
-        InetAddress a1 = InetAddress.getByName("127.0.0.1");
-        InetAddress a2 = InetAddress.getByName("127.0.0.2");
+        InetAddressAndPort a1 = InetAddressAndPort.getByName("127.0.0.1");
+        InetAddressAndPort a2 = InetAddressAndPort.getByName("127.0.0.2");
         // merkle tree building stolen from MerkleTreesTest:
         MerkleTrees mt1 = new MerkleTrees(partitioner);
         MerkleTrees mt2 = new MerkleTrees(partitioner);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/asymmetric/ReduceHelperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/asymmetric/ReduceHelperTest.java b/test/unit/org/apache/cassandra/repair/asymmetric/ReduceHelperTest.java
index 19c42fb..6c64b1a 100644
--- a/test/unit/org/apache/cassandra/repair/asymmetric/ReduceHelperTest.java
+++ b/test/unit/org/apache/cassandra/repair/asymmetric/ReduceHelperTest.java
@@ -19,11 +19,9 @@
 package org.apache.cassandra.repair.asymmetric;
 
 
-import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -33,13 +31,13 @@ import java.util.Map;
 import java.util.Set;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import org.junit.Test;
 
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
 
 import static junit.framework.TestCase.fail;
 import static org.junit.Assert.assertEquals;
@@ -47,24 +45,24 @@ import static org.junit.Assert.assertTrue;
 
 public class ReduceHelperTest
 {
-    private static final InetAddress[] addresses;
-    private static final InetAddress A;
-    private static final InetAddress B;
-    private static final InetAddress C;
-    private static final InetAddress D;
-    private static final InetAddress E;
+    private static final InetAddressAndPort[] addresses;
+    private static final InetAddressAndPort A;
+    private static final InetAddressAndPort B;
+    private static final InetAddressAndPort C;
+    private static final InetAddressAndPort D;
+    private static final InetAddressAndPort E;
 
     static
     {
         try
         {
-            A = InetAddress.getByName("127.0.0.0");
-            B = InetAddress.getByName("127.0.0.1");
-            C = InetAddress.getByName("127.0.0.2");
-            D = InetAddress.getByName("127.0.0.3");
-            E = InetAddress.getByName("127.0.0.4");
+            A = InetAddressAndPort.getByName("127.0.0.0");
+            B = InetAddressAndPort.getByName("127.0.0.1");
+            C = InetAddressAndPort.getByName("127.0.0.2");
+            D = InetAddressAndPort.getByName("127.0.0.3");
+            E = InetAddressAndPort.getByName("127.0.0.4");
             // for diff creation in loops:
-            addresses = new InetAddress[]{ A, B, C, D, E };
+            addresses = new InetAddressAndPort[]{ A, B, C, D, E };
         }
         catch (UnknownHostException e)
         {
@@ -89,7 +87,7 @@ public class ReduceHelperTest
         C             x   x
         D                 =
          */
-        Map<InetAddress, HostDifferences> differences = new HashMap<>();
+        Map<InetAddressAndPort, HostDifferences> differences = new HashMap<>();
         for (int i = 0; i < 4; i++)
         {
             HostDifferences hostDiffs = new HostDifferences();
@@ -105,7 +103,7 @@ public class ReduceHelperTest
 
         }
         DifferenceHolder differenceHolder = new DifferenceHolder(differences);
-        Map<InetAddress, IncomingRepairStreamTracker> tracker = ReduceHelper.createIncomingRepairStreamTrackers(differenceHolder);
+        Map<InetAddressAndPort, IncomingRepairStreamTracker> tracker = ReduceHelper.createIncomingRepairStreamTrackers(differenceHolder);
 
         assertEquals(set(set(C), set(E,D)), streams(tracker.get(A)));
         assertEquals(set(set(C), set(E,D)), streams(tracker.get(B)));
@@ -113,7 +111,7 @@ public class ReduceHelperTest
         assertEquals(set(set(A,B), set(C)), streams(tracker.get(D)));
         assertEquals(set(set(A,B), set(C)), streams(tracker.get(E)));
 
-        ImmutableMap<InetAddress, HostDifferences> reduced = ReduceHelper.reduce(differenceHolder, (x,y) -> y);
+        ImmutableMap<InetAddressAndPort, HostDifferences> reduced = ReduceHelper.reduce(differenceHolder, (x, y) -> y);
 
         HostDifferences n0 = reduced.get(A);
         assertEquals(0, n0.get(A).size());
@@ -163,7 +161,7 @@ public class ReduceHelperTest
         C             x   x
         D                 =
          */
-        Map<InetAddress, HostDifferences> differences = new HashMap<>();
+        Map<InetAddressAndPort, HostDifferences> differences = new HashMap<>();
         for (int i = 0; i < 4; i++)
         {
             HostDifferences hostDifferences = new HostDifferences();
@@ -179,7 +177,7 @@ public class ReduceHelperTest
         }
 
         DifferenceHolder differenceHolder = new DifferenceHolder(differences);
-        Map<InetAddress, IncomingRepairStreamTracker> tracker = ReduceHelper.createIncomingRepairStreamTrackers(differenceHolder);
+        Map<InetAddressAndPort, IncomingRepairStreamTracker> tracker = ReduceHelper.createIncomingRepairStreamTrackers(differenceHolder);
         assertEquals(set(set(C), set(E, D)), streams(tracker.get(A)));
         assertEquals(set(set(C), set(E, D)), streams(tracker.get(B)));
         assertEquals(set(set(A, B), set(E, D)), streams(tracker.get(C)));
@@ -187,7 +185,7 @@ public class ReduceHelperTest
         assertEquals(set(set(A, B), set(C)), streams(tracker.get(E)));
 
         // if there is an option, never stream from node 1:
-        ImmutableMap<InetAddress, HostDifferences> reduced = ReduceHelper.reduce(differenceHolder, (x,y) -> Sets.difference(y, set(B)));
+        ImmutableMap<InetAddressAndPort, HostDifferences> reduced = ReduceHelper.reduce(differenceHolder, (x,y) -> Sets.difference(y, set(B)));
 
         HostDifferences n0 = reduced.get(A);
         assertEquals(0, n0.get(A).size());
@@ -223,7 +221,7 @@ public class ReduceHelperTest
         assertEquals(0, n4.get(E).size());
     }
 
-    private Iterable<Set<InetAddress>> streams(IncomingRepairStreamTracker incomingRepairStreamTracker)
+    private Iterable<Set<InetAddressAndPort>> streams(IncomingRepairStreamTracker incomingRepairStreamTracker)
     {
         return incomingRepairStreamTracker.getIncoming().values().iterator().next().allStreams();
     }
@@ -248,12 +246,12 @@ public class ReduceHelperTest
          B streams (0, 50] from {C}, (50, 100] from {A, C}
          C streams (0, 50] from {A, B}, (50, 100] from B
          */
-        Map<InetAddress, HostDifferences> differences = new HashMap<>();
+        Map<InetAddressAndPort, HostDifferences> differences = new HashMap<>();
         addDifference(A, differences, B, list(range(50, 100)));
         addDifference(A, differences, C, list(range(0, 50)));
         addDifference(B, differences, C, list(range(0, 100)));
         DifferenceHolder differenceHolder = new DifferenceHolder(differences);
-        Map<InetAddress, IncomingRepairStreamTracker> tracker = ReduceHelper.createIncomingRepairStreamTrackers(differenceHolder);
+        Map<InetAddressAndPort, IncomingRepairStreamTracker> tracker = ReduceHelper.createIncomingRepairStreamTrackers(differenceHolder);
         assertEquals(set(set(C)), tracker.get(A).getIncoming().get(range(0, 50)).allStreams());
         assertEquals(set(set(B)), tracker.get(A).getIncoming().get(range(50, 100)).allStreams());
         assertEquals(set(set(C)), tracker.get(B).getIncoming().get(range(0, 50)).allStreams());
@@ -261,7 +259,7 @@ public class ReduceHelperTest
         assertEquals(set(set(A,B)), tracker.get(C).getIncoming().get(range(0, 50)).allStreams());
         assertEquals(set(set(B)), tracker.get(C).getIncoming().get(range(50, 100)).allStreams());
 
-        ImmutableMap<InetAddress, HostDifferences> reduced = ReduceHelper.reduce(differenceHolder, (x, y) -> y);
+        ImmutableMap<InetAddressAndPort, HostDifferences> reduced = ReduceHelper.reduce(differenceHolder, (x, y) -> y);
 
         HostDifferences n0 = reduced.get(A);
 
@@ -270,7 +268,7 @@ public class ReduceHelperTest
 
         HostDifferences n1 = reduced.get(B);
         assertEquals(0, n1.get(B).size());
-        if (n1.get(A) != null)
+        if (!n1.get(A).isEmpty())
         {
             assertTrue(n1.get(C).equals(list(range(0, 50))));
             assertTrue(n1.get(A).equals(list(range(50, 100))));
@@ -281,7 +279,7 @@ public class ReduceHelperTest
         }
         HostDifferences n2 = reduced.get(C);
         assertEquals(0, n2.get(C).size());
-        if (n2.get(A) != null)
+        if (!n2.get(A).isEmpty())
         {
             assertTrue(n2.get(A).equals(list(range(0,50))));
             assertTrue(n2.get(B).equals(list(range(50, 100))));
@@ -312,13 +310,13 @@ public class ReduceHelperTest
          B == C on (5, 10], (40, 45]
          */
 
-        Map<InetAddress, HostDifferences> differences = new HashMap<>();
+        Map<InetAddressAndPort, HostDifferences> differences = new HashMap<>();
         addDifference(A, differences, B, list(range(5, 45)));
         addDifference(A, differences, C, list(range(0, 10), range(40,50)));
         addDifference(B, differences, C, list(range(0, 5), range(10,40), range(45,50)));
 
         DifferenceHolder differenceHolder = new DifferenceHolder(differences);
-        Map<InetAddress, IncomingRepairStreamTracker> tracker = ReduceHelper.createIncomingRepairStreamTrackers(differenceHolder);
+        Map<InetAddressAndPort, IncomingRepairStreamTracker> tracker = ReduceHelper.createIncomingRepairStreamTrackers(differenceHolder);
 
         Map<Range<Token>, StreamFromOptions> ranges = tracker.get(A).getIncoming();
         assertEquals(5, ranges.size());
@@ -344,21 +342,21 @@ public class ReduceHelperTest
         assertEquals(set(set(B)), ranges.get(range(10, 40)).allStreams());
         assertEquals(set(set(A)), ranges.get(range(40, 45)).allStreams());
         assertEquals(set(set(A,B)), ranges.get(range(45, 50)).allStreams());
-        ImmutableMap<InetAddress, HostDifferences> reduced = ReduceHelper.reduce(differenceHolder, (x, y) -> y);
+        ImmutableMap<InetAddressAndPort, HostDifferences> reduced = ReduceHelper.reduce(differenceHolder, (x, y) -> y);
 
         assertNoOverlap(A, reduced.get(A), list(range(0, 50)));
         assertNoOverlap(B, reduced.get(B), list(range(0, 50)));
         assertNoOverlap(C, reduced.get(C), list(range(0, 50)));
     }
 
-    private void assertNoOverlap(InetAddress incomingNode, HostDifferences node, List<Range<Token>> expectedAfterNormalize)
+    private void assertNoOverlap(InetAddressAndPort incomingNode, HostDifferences node, List<Range<Token>> expectedAfterNormalize)
     {
         Set<Range<Token>> allRanges = new HashSet<>();
-        Set<InetAddress> remoteNodes = Sets.newHashSet(A,B,C);
+        Set<InetAddressAndPort> remoteNodes = Sets.newHashSet(A,B,C);
         remoteNodes.remove(incomingNode);
-        Iterator<InetAddress> iter = remoteNodes.iterator();
+        Iterator<InetAddressAndPort> iter = remoteNodes.iterator();
         allRanges.addAll(node.get(iter.next()));
-        InetAddress i = iter.next();
+        InetAddressAndPort i = iter.next();
         for (Range<Token> r : node.get(i))
         {
             for (Range<Token> existing : allRanges)
@@ -379,14 +377,14 @@ public class ReduceHelperTest
         return ranges;
     }
 
-    private static Set<InetAddress> set(InetAddress ... elem)
+    private static Set<InetAddressAndPort> set(InetAddressAndPort ... elem)
     {
         return Sets.newHashSet(elem);
     }
     @SafeVarargs
-    private static Set<Set<InetAddress>> set(Set<InetAddress> ... elem)
+    private static Set<Set<InetAddressAndPort>> set(Set<InetAddressAndPort> ... elem)
     {
-        Set<Set<InetAddress>> ret = Sets.newHashSet();
+        Set<Set<InetAddressAndPort>> ret = Sets.newHashSet();
         ret.addAll(Arrays.asList(elem));
         return ret;
     }
@@ -418,7 +416,7 @@ public class ReduceHelperTest
         assertTrue(r1.size() > 0 ^ r2.size() > 0);
     }
 
-    private void addDifference(InetAddress host1, Map<InetAddress, HostDifferences> differences, InetAddress host2, List<Range<Token>> ranges)
+    private void addDifference(InetAddressAndPort host1, Map<InetAddressAndPort, HostDifferences> differences, InetAddressAndPort host2, List<Range<Token>> ranges)
     {
         differences.computeIfAbsent(host1, (x) -> new HostDifferences()).add(host2, ranges);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/asymmetric/StreamFromOptionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/asymmetric/StreamFromOptionsTest.java b/test/unit/org/apache/cassandra/repair/asymmetric/StreamFromOptionsTest.java
index 3ba3cfe..e2a7700 100644
--- a/test/unit/org/apache/cassandra/repair/asymmetric/StreamFromOptionsTest.java
+++ b/test/unit/org/apache/cassandra/repair/asymmetric/StreamFromOptionsTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.repair.asymmetric;
 
-import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Collections;
 import java.util.HashSet;
@@ -30,6 +29,7 @@ import org.junit.Test;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
 
 import static junit.framework.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -41,16 +41,16 @@ public class StreamFromOptionsTest
     public void addAllDiffingTest() throws UnknownHostException
     {
         StreamFromOptions sfo = new StreamFromOptions(new MockDiffs(true), range(0, 10));
-        Set<InetAddress> toAdd = new HashSet<>();
-        toAdd.add(InetAddress.getByName("127.0.0.1"));
-        toAdd.add(InetAddress.getByName("127.0.0.2"));
-        toAdd.add(InetAddress.getByName("127.0.0.3"));
+        Set<InetAddressAndPort> toAdd = new HashSet<>();
+        toAdd.add(InetAddressAndPort.getByName("127.0.0.1"));
+        toAdd.add(InetAddressAndPort.getByName("127.0.0.2"));
+        toAdd.add(InetAddressAndPort.getByName("127.0.0.3"));
         toAdd.forEach(sfo::add);
 
         // if all added have differences, each set will contain a single host
         assertEquals(3, Iterables.size(sfo.allStreams()));
-        Set<InetAddress> allStreams = new HashSet<>();
-        for (Set<InetAddress> streams : sfo.allStreams())
+        Set<InetAddressAndPort> allStreams = new HashSet<>();
+        for (Set<InetAddressAndPort> streams : sfo.allStreams())
         {
             assertEquals(1, streams.size());
             allStreams.addAll(streams);
@@ -62,10 +62,10 @@ public class StreamFromOptionsTest
     public void addAllMatchingTest() throws UnknownHostException
     {
         StreamFromOptions sfo = new StreamFromOptions(new MockDiffs(false), range(0, 10));
-        Set<InetAddress> toAdd = new HashSet<>();
-        toAdd.add(InetAddress.getByName("127.0.0.1"));
-        toAdd.add(InetAddress.getByName("127.0.0.2"));
-        toAdd.add(InetAddress.getByName("127.0.0.3"));
+        Set<InetAddressAndPort> toAdd = new HashSet<>();
+        toAdd.add(InetAddressAndPort.getByName("127.0.0.1"));
+        toAdd.add(InetAddressAndPort.getByName("127.0.0.2"));
+        toAdd.add(InetAddressAndPort.getByName("127.0.0.3"));
         toAdd.forEach(sfo::add);
 
         // if all added match, the set will contain all hosts
@@ -83,10 +83,10 @@ public class StreamFromOptionsTest
     private void splitTestHelper(boolean diffing) throws UnknownHostException
     {
         StreamFromOptions sfo = new StreamFromOptions(new MockDiffs(diffing), range(0, 10));
-        Set<InetAddress> toAdd = new HashSet<>();
-        toAdd.add(InetAddress.getByName("127.0.0.1"));
-        toAdd.add(InetAddress.getByName("127.0.0.2"));
-        toAdd.add(InetAddress.getByName("127.0.0.3"));
+        Set<InetAddressAndPort> toAdd = new HashSet<>();
+        toAdd.add(InetAddressAndPort.getByName("127.0.0.1"));
+        toAdd.add(InetAddressAndPort.getByName("127.0.0.2"));
+        toAdd.add(InetAddressAndPort.getByName("127.0.0.3"));
         toAdd.forEach(sfo::add);
         StreamFromOptions sfo1 = sfo.copy(range(0, 5));
         StreamFromOptions sfo2 = sfo.copy(range(5, 10));
@@ -95,8 +95,8 @@ public class StreamFromOptionsTest
         assertEquals(range(5, 10), sfo2.range);
         assertTrue(Iterables.elementsEqual(sfo1.allStreams(), sfo2.allStreams()));
         // verify the backing set is not shared between the copies:
-        sfo1.add(InetAddress.getByName("127.0.0.4"));
-        sfo2.add(InetAddress.getByName("127.0.0.5"));
+        sfo1.add(InetAddressAndPort.getByName("127.0.0.4"));
+        sfo2.add(InetAddressAndPort.getByName("127.0.0.5"));
         assertFalse(Iterables.elementsEqual(sfo1.allStreams(), sfo2.allStreams()));
     }
 
@@ -116,7 +116,7 @@ public class StreamFromOptionsTest
         }
 
         @Override
-        public boolean hasDifferenceBetween(InetAddress node1, InetAddress node2, Range<Token> range)
+        public boolean hasDifferenceBetween(InetAddressAndPort node1, InetAddressAndPort node2, Range<Token> range)
         {
             return hasDifference;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java
index 367fea9..4570328 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.repair.consistent;
 
-import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Set;
 import java.util.UUID;
@@ -33,6 +32,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -41,19 +41,19 @@ import org.apache.cassandra.utils.UUIDGen;
 @Ignore
 public abstract class AbstractConsistentSessionTest
 {
-    protected static final InetAddress COORDINATOR;
-    protected static final InetAddress PARTICIPANT1;
-    protected static final InetAddress PARTICIPANT2;
-    protected static final InetAddress PARTICIPANT3;
+    protected static final InetAddressAndPort COORDINATOR;
+    protected static final InetAddressAndPort PARTICIPANT1;
+    protected static final InetAddressAndPort PARTICIPANT2;
+    protected static final InetAddressAndPort PARTICIPANT3;
 
     static
     {
         try
         {
-            COORDINATOR = InetAddress.getByName("10.0.0.1");
-            PARTICIPANT1 = InetAddress.getByName("10.0.0.1");
-            PARTICIPANT2 = InetAddress.getByName("10.0.0.2");
-            PARTICIPANT3 = InetAddress.getByName("10.0.0.3");
+            COORDINATOR = InetAddressAndPort.getByName("10.0.0.1");
+            PARTICIPANT1 = InetAddressAndPort.getByName("10.0.0.1");
+            PARTICIPANT2 = InetAddressAndPort.getByName("10.0.0.2");
+            PARTICIPANT3 = InetAddressAndPort.getByName("10.0.0.3");
         }
         catch (UnknownHostException e)
         {
@@ -64,7 +64,7 @@ public abstract class AbstractConsistentSessionTest
         DatabaseDescriptor.daemonInitialization();
     }
 
-    protected static final Set<InetAddress> PARTICIPANTS = ImmutableSet.of(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3);
+    protected static final Set<InetAddressAndPort> PARTICIPANTS = ImmutableSet.of(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3);
 
     protected static Token t(int v)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
index fb312c3..5d054d3 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.repair.consistent;
 
-import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -35,6 +34,7 @@ import com.google.common.util.concurrent.SettableFuture;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.AbstractRepairTest;
 import org.apache.cassandra.repair.RepairSessionResult;
 import org.apache.cassandra.repair.messages.FailSession;
@@ -77,7 +77,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
         return new RepairSessionResult(coordinator.sessionID, "ks", coordinator.ranges, null, false);
     }
 
-    private static void assertMessageSent(InstrumentedCoordinatorSession coordinator, InetAddress participant, RepairMessage expected)
+    private static void assertMessageSent(InstrumentedCoordinatorSession coordinator, InetAddressAndPort participant, RepairMessage expected)
     {
         Assert.assertTrue(coordinator.sentMessages.containsKey(participant));
         Assert.assertEquals(1, coordinator.sentMessages.get(participant).size());
@@ -91,9 +91,9 @@ public class CoordinatorSessionTest extends AbstractRepairTest
             super(builder);
         }
 
-        Map<InetAddress, List<RepairMessage>> sentMessages = new HashMap<>();
+        Map<InetAddressAndPort, List<RepairMessage>> sentMessages = new HashMap<>();
 
-        protected void sendMessage(InetAddress destination, RepairMessage message)
+        protected void sendMessage(InetAddressAndPort destination, RepairMessage message)
         {
             if (!sentMessages.containsKey(destination))
             {
@@ -189,7 +189,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
 
         coordinator.fail();
         Assert.assertEquals(FAILED, coordinator.getState());
-        for (InetAddress participant : PARTICIPANTS)
+        for (InetAddressAndPort participant : PARTICIPANTS)
         {
             assertMessageSent(coordinator, participant, new FailSession(coordinator.sessionID));
         }
@@ -221,7 +221,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
         Assert.assertTrue(coordinator.sentMessages.isEmpty());
         ListenableFuture sessionResult = coordinator.execute(sessionSupplier, hasFailures);
 
-        for (InetAddress participant : PARTICIPANTS)
+        for (InetAddressAndPort participant : PARTICIPANTS)
         {
 
             RepairMessage expected = new PrepareConsistentRequest(coordinator.sessionID, COORDINATOR, new HashSet<>(PARTICIPANTS));
@@ -254,7 +254,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
         repairFuture.set(results);
 
         // propose messages should have been sent once all repair sessions completed successfully
-        for (InetAddress participant : PARTICIPANTS)
+        for (InetAddressAndPort participant : PARTICIPANTS)
         {
             RepairMessage expected = new FinalizePropose(coordinator.sessionID);
             assertMessageSent(coordinator, participant, expected);
@@ -277,7 +277,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
         Assert.assertTrue(coordinator.finalizeCommitCalled);
 
         Assert.assertEquals(ConsistentSession.State.FINALIZED, coordinator.getState());
-        for (InetAddress participant : PARTICIPANTS)
+        for (InetAddressAndPort participant : PARTICIPANTS)
         {
             RepairMessage expected = new FinalizeCommit(coordinator.sessionID);
             assertMessageSent(coordinator, participant, expected);
@@ -304,7 +304,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
         Assert.assertFalse(repairSubmitted.get());
         Assert.assertTrue(coordinator.sentMessages.isEmpty());
         ListenableFuture sessionResult = coordinator.execute(sessionSupplier, hasFailures);
-        for (InetAddress participant : PARTICIPANTS)
+        for (InetAddressAndPort participant : PARTICIPANTS)
         {
             PrepareConsistentRequest expected = new PrepareConsistentRequest(coordinator.sessionID, COORDINATOR, new HashSet<>(PARTICIPANTS));
             assertMessageSent(coordinator, participant, expected);
@@ -339,7 +339,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
         Assert.assertTrue(coordinator.failCalled);
 
         // all participants should have been notified of session failure
-        for (InetAddress participant : PARTICIPANTS)
+        for (InetAddressAndPort participant : PARTICIPANTS)
         {
             RepairMessage expected = new FailSession(coordinator.sessionID);
             assertMessageSent(coordinator, participant, expected);
@@ -366,7 +366,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
         Assert.assertFalse(repairSubmitted.get());
         Assert.assertTrue(coordinator.sentMessages.isEmpty());
         ListenableFuture sessionResult = coordinator.execute(sessionSupplier, hasFailures);
-        for (InetAddress participant : PARTICIPANTS)
+        for (InetAddressAndPort participant : PARTICIPANTS)
         {
             PrepareConsistentRequest expected = new PrepareConsistentRequest(coordinator.sessionID, COORDINATOR, new HashSet<>(PARTICIPANTS));
             assertMessageSent(coordinator, participant, expected);
@@ -394,7 +394,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
         Assert.assertFalse(repairSubmitted.get());
 
         // all participants should have been notified of session failure
-        for (InetAddress participant : PARTICIPANTS)
+        for (InetAddressAndPort participant : PARTICIPANTS)
         {
             RepairMessage expected = new FailSession(coordinator.sessionID);
             assertMessageSent(coordinator, participant, expected);
@@ -422,7 +422,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
         Assert.assertTrue(coordinator.sentMessages.isEmpty());
         ListenableFuture sessionResult = coordinator.execute(sessionSupplier, hasFailures);
 
-        for (InetAddress participant : PARTICIPANTS)
+        for (InetAddressAndPort participant : PARTICIPANTS)
         {
 
             RepairMessage expected = new PrepareConsistentRequest(coordinator.sessionID, COORDINATOR, new HashSet<>(PARTICIPANTS));
@@ -455,7 +455,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
         repairFuture.set(results);
 
         // propose messages should have been sent once all repair sessions completed successfully
-        for (InetAddress participant : PARTICIPANTS)
+        for (InetAddressAndPort participant : PARTICIPANTS)
         {
             RepairMessage expected = new FinalizePropose(coordinator.sessionID);
             assertMessageSent(coordinator, participant, expected);
@@ -481,7 +481,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
         Assert.assertEquals(ConsistentSession.State.FAILED, coordinator.getState());
 
         // failure messages should have been sent to all participants
-        for (InetAddress participant : PARTICIPANTS)
+        for (InetAddressAndPort participant : PARTICIPANTS)
         {
             RepairMessage expected = new FailSession(coordinator.sessionID);
             assertMessageSent(coordinator, participant, expected);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java
index b40e185..9bf4270 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.repair.consistent;
 
-import java.net.InetAddress;
 import java.util.Set;
 import java.util.UUID;
 
@@ -29,6 +28,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.AbstractRepairTest;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.Schema;
@@ -55,9 +55,9 @@ public class CoordinatorSessionsTest extends AbstractRepairTest
         }
 
         int prepareResponseCalls = 0;
-        InetAddress preparePeer = null;
+        InetAddressAndPort preparePeer = null;
         boolean prepareSuccess = false;
-        public synchronized void handlePrepareResponse(InetAddress participant, boolean success)
+        public synchronized void handlePrepareResponse(InetAddressAndPort participant, boolean success)
         {
             prepareResponseCalls++;
             preparePeer = participant;
@@ -65,9 +65,9 @@ public class CoordinatorSessionsTest extends AbstractRepairTest
         }
 
         int finalizePromiseCalls = 0;
-        InetAddress promisePeer = null;
+        InetAddressAndPort promisePeer = null;
         boolean promiseSuccess = false;
-        public synchronized void handleFinalizePromise(InetAddress participant, boolean success)
+        public synchronized void handleFinalizePromise(InetAddressAndPort participant, boolean success)
         {
             finalizePromiseCalls++;
             promisePeer = participant;
@@ -93,7 +93,7 @@ public class CoordinatorSessionsTest extends AbstractRepairTest
             return (InstrumentedCoordinatorSession) super.getSession(sessionId);
         }
 
-        public InstrumentedCoordinatorSession registerSession(UUID sessionId, Set<InetAddress> peers)
+        public InstrumentedCoordinatorSession registerSession(UUID sessionId, Set<InetAddressAndPort> peers)
         {
             return (InstrumentedCoordinatorSession) super.registerSession(sessionId, peers);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java
index 6808efe..3ea888d 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java
@@ -18,10 +18,10 @@
 
 package org.apache.cassandra.repair.consistent;
 
-import java.net.InetAddress;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.service.ActiveRepairService;
 
 /**
@@ -36,7 +36,7 @@ public class LocalSessionAccessor
         ARS.consistent.local.start();
     }
 
-    public static void prepareUnsafe(UUID sessionID, InetAddress coordinator, Set<InetAddress> peers)
+    public static void prepareUnsafe(UUID sessionID, InetAddressAndPort coordinator, Set<InetAddressAndPort> peers)
     {
         ActiveRepairService.ParentRepairSession prs = ARS.getParentRepairSession(sessionID);
         assert prs != null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
index 6e6d222..5fa43a9 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.repair.consistent;
 
-import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -42,6 +41,7 @@ import org.junit.Test;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.cql3.statements.CreateTableStatement;
 import org.apache.cassandra.repair.AbstractRepairTest;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaConstants;
@@ -106,20 +106,20 @@ public class LocalSessionTest extends AbstractRepairTest
         }
     }
 
-    private static void assertNoMessagesSent(InstrumentedLocalSessions sessions, InetAddress to)
+    private static void assertNoMessagesSent(InstrumentedLocalSessions sessions, InetAddressAndPort to)
     {
         Assert.assertNull(sessions.sentMessages.get(to));
     }
 
-    private static void assertMessagesSent(InstrumentedLocalSessions sessions, InetAddress to, RepairMessage... expected)
+    private static void assertMessagesSent(InstrumentedLocalSessions sessions, InetAddressAndPort to, RepairMessage... expected)
     {
         Assert.assertEquals(Lists.newArrayList(expected), sessions.sentMessages.get(to));
     }
 
     static class InstrumentedLocalSessions extends LocalSessions
     {
-        Map<InetAddress, List<RepairMessage>> sentMessages = new HashMap<>();
-        protected void sendMessage(InetAddress destination, RepairMessage message)
+        Map<InetAddressAndPort, List<RepairMessage>> sentMessages = new HashMap<>();
+        protected void sendMessage(InetAddressAndPort destination, RepairMessage message)
         {
             if (!sentMessages.containsKey(destination))
             {
@@ -159,12 +159,13 @@ public class LocalSessionTest extends AbstractRepairTest
             return getSession(sessionID);
         }
 
-        protected InetAddress getBroadcastAddress()
+        @Override
+        protected InetAddressAndPort getBroadcastAddressAndPort()
         {
             return PARTICIPANT1;
         }
 
-        protected boolean isAlive(InetAddress address)
+        protected boolean isAlive(InetAddressAndPort address)
         {
             return true;
         }
@@ -811,7 +812,7 @@ public class LocalSessionTest extends AbstractRepairTest
         sessions.start();
         Assert.assertNotNull(sessions.getSession(session.sessionID));
 
-        QueryProcessor.instance.executeInternal("DELETE participants FROM system.repairs WHERE parent_id=?", session.sessionID);
+        QueryProcessor.instance.executeInternal("DELETE participants, participants_wp FROM system.repairs WHERE parent_id=?", session.sessionID);
 
         sessions = new LocalSessions();
         sessions.start();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java b/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java
index 5aeab3e..213cdd3 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.repair.consistent;
 
-import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -43,6 +42,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
@@ -148,7 +148,7 @@ public class PendingAntiCompactionTest
 
         // create a session so the anti compaction can fine it
         UUID sessionID = UUIDGen.getTimeUUID();
-        ActiveRepairService.instance.registerParentRepairSession(sessionID, InetAddress.getLocalHost(), Lists.newArrayList(cfs), ranges, true, 1, true, PreviewKind.NONE);
+        ActiveRepairService.instance.registerParentRepairSession(sessionID, InetAddressAndPort.getLocalHost(), Lists.newArrayList(cfs), ranges, true, 1, true, PreviewKind.NONE);
 
         PendingAntiCompaction pac;
         ExecutorService executor = Executors.newSingleThreadExecutor();
@@ -352,7 +352,7 @@ public class PendingAntiCompactionTest
         PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
         UUID sessionID = UUIDGen.getTimeUUID();
         ActiveRepairService.instance.registerParentRepairSession(sessionID,
-                                                                 InetAddress.getByName("127.0.0.1"),
+                                                                 InetAddressAndPort.getByName("127.0.0.1"),
                                                                  Lists.newArrayList(cfs),
                                                                  FULL_RANGE,
                                                                  true,0,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org