You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/01/25 20:12:10 UTC

[13/19] cassandra git commit: Allow storage port to be configurable per node

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/async/NettyFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/NettyFactory.java b/src/java/org/apache/cassandra/net/async/NettyFactory.java
index 7844c9b..d891043 100644
--- a/src/java/org/apache/cassandra/net/async/NettyFactory.java
+++ b/src/java/org/apache/cassandra/net/async/NettyFactory.java
@@ -48,6 +48,7 @@ import org.apache.cassandra.auth.IInternodeAuthenticator;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.service.NativeTransportService;
@@ -184,9 +185,9 @@ public final class NettyFactory
      * Create a {@link Channel} that listens on the {@code localAddr}. This method will block while trying to bind to the address,
      * but it does not make a remote call.
      */
-    public Channel createInboundChannel(InetSocketAddress localAddr, InboundInitializer initializer, int receiveBufferSize) throws ConfigurationException
+    public Channel createInboundChannel(InetAddressAndPort localAddr, InboundInitializer initializer, int receiveBufferSize) throws ConfigurationException
     {
-        String nic = FBUtilities.getNetworkInterface(localAddr.getAddress());
+        String nic = FBUtilities.getNetworkInterface(localAddr.address);
         logger.info("Starting Messaging Service on {} {}, encryption: {}",
                     localAddr, nic == null ? "" : String.format(" (%s)", nic), encryptionLogStatement(initializer.encryptionOptions));
         Class<? extends ServerChannel> transport = useEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class;
@@ -202,7 +203,7 @@ public final class NettyFactory
         if (receiveBufferSize > 0)
             bootstrap.childOption(ChannelOption.SO_RCVBUF, receiveBufferSize);
 
-        ChannelFuture channelFuture = bootstrap.bind(localAddr);
+        ChannelFuture channelFuture = bootstrap.bind(new InetSocketAddress(localAddr.address, localAddr.port));
 
         if (!channelFuture.awaitUninterruptibly().isSuccess())
         {
@@ -333,8 +334,9 @@ public final class NettyFactory
                               .option(ChannelOption.TCP_NODELAY, params.tcpNoDelay)
                               .option(ChannelOption.WRITE_BUFFER_WATER_MARK, params.waterMark)
                               .handler(new OutboundInitializer(params));
-        bootstrap.localAddress(params.connectionId.local(), 0);
-        bootstrap.remoteAddress(params.connectionId.connectionAddress());
+        bootstrap.localAddress(params.connectionId.local().address, 0);
+        InetAddressAndPort remoteAddress = params.connectionId.connectionAddress();
+        bootstrap.remoteAddress(new InetSocketAddress(remoteAddress.address, remoteAddress.port));
         return bootstrap;
     }
 
@@ -362,7 +364,8 @@ public final class NettyFactory
             {
                 SslContext sslContext = SSLFactory.getSslContext(params.encryptionOptions, true, false);
                 // for some reason channel.remoteAddress() will return null
-                InetSocketAddress peer = params.encryptionOptions.require_endpoint_verification ? params.connectionId.remoteAddress() : null;
+                InetAddressAndPort address = params.connectionId.remote();
+                InetSocketAddress peer = params.encryptionOptions.require_endpoint_verification ? new InetSocketAddress(address.address, address.port) : null;
                 SslHandler sslHandler = newSslHandler(channel, sslContext, peer);
                 logger.trace("creating outbound netty SslContext: context={}, engine={}", sslContext.getClass().getName(), sslHandler.engine().getClass().getName());
                 pipeline.addFirst(SSL_CHANNEL_HANDLER_NAME, sslHandler);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
index 6b2ff0d..f3cb554 100644
--- a/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
+++ b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
@@ -21,6 +21,8 @@ package org.apache.cassandra.net.async;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
+
 /**
  * Identifies an outbound messaging connection.
  *
@@ -38,24 +40,24 @@ public class OutboundConnectionIdentifier
     /**
      * Memoization of the local node's broadcast address.
      */
-    private final InetSocketAddress localAddr;
+    private final InetAddressAndPort localAddr;
 
     /**
      * The address by which the remote is identified. This may be different from {@link #remoteConnectionAddr} for
      * something like EC2 public IP address which need to be used for communication between EC2 regions.
      */
-    private final InetSocketAddress remoteAddr;
+    private final InetAddressAndPort remoteAddr;
 
     /**
      * The address to which we're connecting to the node (often the same as {@link #remoteAddr} but not always).
      */
-    private final InetSocketAddress remoteConnectionAddr;
+    private final InetAddressAndPort remoteConnectionAddr;
 
     private final ConnectionType connectionType;
 
-    private OutboundConnectionIdentifier(InetSocketAddress localAddr,
-                                         InetSocketAddress remoteAddr,
-                                         InetSocketAddress remoteConnectionAddr,
+    private OutboundConnectionIdentifier(InetAddressAndPort localAddr,
+                                         InetAddressAndPort remoteAddr,
+                                         InetAddressAndPort remoteConnectionAddr,
                                          ConnectionType connectionType)
     {
         this.localAddr = localAddr;
@@ -64,8 +66,8 @@ public class OutboundConnectionIdentifier
         this.connectionType = connectionType;
     }
 
-    private OutboundConnectionIdentifier(InetSocketAddress localAddr,
-                                         InetSocketAddress remoteAddr,
+    private OutboundConnectionIdentifier(InetAddressAndPort localAddr,
+                                         InetAddressAndPort remoteAddr,
                                          ConnectionType connectionType)
     {
         this(localAddr, remoteAddr, remoteAddr, connectionType);
@@ -75,7 +77,7 @@ public class OutboundConnectionIdentifier
      * Creates an identifier for a small message connection and using the remote "identifying" address as its connection
      * address.
      */
-    public static OutboundConnectionIdentifier small(InetSocketAddress localAddr, InetSocketAddress remoteAddr)
+    public static OutboundConnectionIdentifier small(InetAddressAndPort localAddr, InetAddressAndPort remoteAddr)
     {
         return new OutboundConnectionIdentifier(localAddr, remoteAddr, ConnectionType.SMALL_MESSAGE);
     }
@@ -84,7 +86,7 @@ public class OutboundConnectionIdentifier
      * Creates an identifier for a large message connection and using the remote "identifying" address as its connection
      * address.
      */
-    public static OutboundConnectionIdentifier large(InetSocketAddress localAddr, InetSocketAddress remoteAddr)
+    public static OutboundConnectionIdentifier large(InetAddressAndPort localAddr, InetAddressAndPort remoteAddr)
     {
         return new OutboundConnectionIdentifier(localAddr, remoteAddr, ConnectionType.LARGE_MESSAGE);
     }
@@ -93,7 +95,7 @@ public class OutboundConnectionIdentifier
      * Creates an identifier for a gossip connection and using the remote "identifying" address as its connection
      * address.
      */
-    public static OutboundConnectionIdentifier gossip(InetSocketAddress localAddr, InetSocketAddress remoteAddr)
+    public static OutboundConnectionIdentifier gossip(InetAddressAndPort localAddr, InetAddressAndPort remoteAddr)
     {
         return new OutboundConnectionIdentifier(localAddr, remoteAddr, ConnectionType.GOSSIP);
     }
@@ -102,7 +104,7 @@ public class OutboundConnectionIdentifier
      * Creates an identifier for a gossip connection and using the remote "identifying" address as its connection
      * address.
      */
-    public static OutboundConnectionIdentifier stream(InetSocketAddress localAddr, InetSocketAddress remoteAddr)
+    public static OutboundConnectionIdentifier stream(InetAddressAndPort localAddr, InetAddressAndPort remoteAddr)
     {
         return new OutboundConnectionIdentifier(localAddr, remoteAddr, ConnectionType.STREAM);
     }
@@ -115,45 +117,37 @@ public class OutboundConnectionIdentifier
      * @return a newly created connection identifier that differs from this one only by using {@code remoteConnectionAddr}
      * as connection address to the remote.
      */
-    public OutboundConnectionIdentifier withNewConnectionAddress(InetSocketAddress remoteConnectionAddr)
+    public OutboundConnectionIdentifier withNewConnectionAddress(InetAddressAndPort remoteConnectionAddr)
     {
         return new OutboundConnectionIdentifier(localAddr, remoteAddr, remoteConnectionAddr, connectionType);
     }
 
     public OutboundConnectionIdentifier withNewConnectionPort(int port)
     {
-        return new OutboundConnectionIdentifier(localAddr, new InetSocketAddress(remoteAddr.getAddress(), port),
-                                                new InetSocketAddress(remoteConnectionAddr.getAddress(), port), connectionType);
+        return new OutboundConnectionIdentifier(localAddr, InetAddressAndPort.getByAddressOverrideDefaults(remoteAddr.address, port),
+                                                InetAddressAndPort.getByAddressOverrideDefaults(remoteConnectionAddr.address, port), connectionType);
     }
 
     /**
      * The local node address.
      */
-    public InetAddress local()
+    public InetAddressAndPort local()
     {
-        return localAddr.getAddress();
+        return localAddr;
     }
 
     /**
      * The remote node identifying address (the one to use for anything else than connecting to the node).
      */
-    public InetSocketAddress remoteAddress()
+    public  InetAddressAndPort remote()
     {
         return remoteAddr;
     }
 
     /**
-     * The remote node identifying address (the one to use for anything else than connecting to the node).
-     */
-    public  InetAddress remote()
-    {
-        return remoteAddr.getAddress();
-    }
-
-    /**
      * The remote node connection address (the one to use to actually connect to the remote, and only that).
      */
-    public InetSocketAddress connectionAddress()
+    public InetAddressAndPort connectionAddress()
     {
         return remoteConnectionAddr;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java b/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
index 4522ba4..28775ef 100644
--- a/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
+++ b/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
@@ -43,6 +43,7 @@ import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.async.NettyFactory.Mode;
@@ -258,12 +259,12 @@ public class OutboundMessagingConnection
         logger.debug("connection attempt {} to {}", connectAttemptCount, connectionId);
 
 
-        InetSocketAddress remote = connectionId.remoteAddress();
-        if (!authenticator.authenticate(remote.getAddress(), remote.getPort()))
+        InetAddressAndPort remote = connectionId.remote();
+        if (!authenticator.authenticate(remote.address, remote.port))
         {
             logger.warn("Internode auth failed connecting to {}", connectionId);
             //Remove the connection pool and other thread so messages aren't queued
-            MessagingService.instance().destroyConnectionPool(remote.getAddress());
+            MessagingService.instance().destroyConnectionPool(remote);
 
             // don't update the state field as destroyConnectionPool() *should* call OMC.close()
             // on all the connections in the OMP for the remoteAddress
@@ -284,7 +285,7 @@ public class OutboundMessagingConnection
     }
 
     @VisibleForTesting
-    static boolean shouldCompressConnection(InetAddress localHost, InetAddress remoteHost)
+    static boolean shouldCompressConnection(InetAddressAndPort localHost, InetAddressAndPort remoteHost)
     {
         return (DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.all)
                || ((DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.dc) && !isLocalDC(localHost, remoteHost));
@@ -355,7 +356,7 @@ public class OutboundMessagingConnection
         return null;
     }
 
-    static boolean isLocalDC(InetAddress localHost, InetAddress remoteHost)
+    static boolean isLocalDC(InetAddressAndPort localHost, InetAddressAndPort remoteHost)
     {
         String remoteDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(remoteHost);
         String localDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(localHost);
@@ -585,7 +586,7 @@ public class OutboundMessagingConnection
      * Any outstanding messages in the existing channel will still be sent to the previous address (we won't/can't move them from
      * one channel to another).
      */
-    void reconnectWithNewIp(InetSocketAddress newAddr)
+    void reconnectWithNewIp(InetAddressAndPort newAddr)
     {
         State currentState = state.get();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java b/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java
index 0086da8..c701229 100644
--- a/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java
+++ b/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.metrics.ConnectionMetrics;
 import org.apache.cassandra.net.BackPressureState;
 import org.apache.cassandra.net.MessageOut;
@@ -56,14 +57,14 @@ public class OutboundMessagingPool
      * An override address on which to communicate with the peer. Typically used for something like EC2 public IP addresses
      * which need to be used for communication between EC2 regions.
      */
-    private InetSocketAddress preferredRemoteAddr;
+    private InetAddressAndPort preferredRemoteAddr;
 
-    public OutboundMessagingPool(InetSocketAddress remoteAddr, InetSocketAddress localAddr, ServerEncryptionOptions encryptionOptions,
+    public OutboundMessagingPool(InetAddressAndPort remoteAddr, InetAddressAndPort localAddr, ServerEncryptionOptions encryptionOptions,
                                  BackPressureState backPressureState, IInternodeAuthenticator authenticator)
     {
         preferredRemoteAddr = remoteAddr;
         this.backPressureState = backPressureState;
-        metrics = new ConnectionMetrics(localAddr.getAddress(), this);
+        metrics = new ConnectionMetrics(localAddr, this);
 
 
         smallMessageChannel = new OutboundMessagingConnection(OutboundConnectionIdentifier.small(localAddr, preferredRemoteAddr),
@@ -76,10 +77,10 @@ public class OutboundMessagingPool
                                                         encryptionOptions, Optional.empty(), authenticator);
     }
 
-    private static Optional<CoalescingStrategy> coalescingStrategy(InetSocketAddress remoteAddr)
+    private static Optional<CoalescingStrategy> coalescingStrategy(InetAddressAndPort remoteAddr)
     {
         String strategyName = DatabaseDescriptor.getOtcCoalescingStrategy();
-        String displayName = remoteAddr.getAddress().getHostAddress();
+        String displayName = remoteAddr.toString();
         return CoalescingStrategies.newCoalescingStrategy(strategyName,
                                                           DatabaseDescriptor.getOtcCoalescingWindow(),
                                                           OutboundMessagingConnection.logger,
@@ -117,7 +118,7 @@ public class OutboundMessagingPool
      *
      * @param addr IP Address to use (and prefer) going forward for connecting to the peer
      */
-    public void reconnectWithNewIp(InetSocketAddress addr)
+    public void reconnectWithNewIp(InetAddressAndPort addr)
     {
         preferredRemoteAddr = addr;
         gossipChannel.reconnectWithNewIp(addr);
@@ -166,7 +167,7 @@ public class OutboundMessagingPool
         return metrics.timeouts.getCount();
     }
 
-    public InetSocketAddress getPreferredRemoteAddr()
+    public InetAddressAndPort getPreferredRemoteAddr()
     {
         return preferredRemoteAddr;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
index 5464520..2c4fae4 100644
--- a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
@@ -18,13 +18,13 @@
 
 package org.apache.cassandra.repair;
 
-import java.net.InetAddress;
 import java.util.List;
 import java.util.UUID;
 
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.streaming.ProgressInfo;
 import org.apache.cassandra.streaming.StreamEvent;
@@ -41,15 +41,15 @@ public class AsymmetricLocalSyncTask extends AsymmetricSyncTask implements Strea
     private final UUID pendingRepair;
     private final TraceState state = Tracing.instance.get();
 
-    public AsymmetricLocalSyncTask(RepairJobDesc desc, InetAddress fetchFrom, List<Range<Token>> rangesToFetch, UUID pendingRepair, PreviewKind previewKind)
+    public AsymmetricLocalSyncTask(RepairJobDesc desc, InetAddressAndPort fetchFrom, List<Range<Token>> rangesToFetch, UUID pendingRepair, PreviewKind previewKind)
     {
-        super(desc, FBUtilities.getBroadcastAddress(), fetchFrom, rangesToFetch, previewKind);
+        super(desc, FBUtilities.getBroadcastAddressAndPort(), fetchFrom, rangesToFetch, previewKind);
         this.pendingRepair = pendingRepair;
     }
 
     public void startSync(List<Range<Token>> rangesToFetch)
     {
-        InetAddress preferred = SystemKeyspace.getPreferredIP(fetchFrom);
+        InetAddressAndPort preferred = SystemKeyspace.getPreferredIP(fetchFrom);
         StreamPlan plan = new StreamPlan(StreamOperation.REPAIR,
                                          1, false,
                                          false,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
index d70975d..e24d854 100644
--- a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
@@ -18,12 +18,12 @@
 
 package org.apache.cassandra.repair;
 
-import java.net.InetAddress;
 import java.util.List;
 
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.RepairException;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.AsymmetricSyncRequest;
 import org.apache.cassandra.streaming.PreviewKind;
@@ -33,14 +33,14 @@ import org.apache.cassandra.utils.FBUtilities;
 
 public class AsymmetricRemoteSyncTask extends AsymmetricSyncTask implements CompletableRemoteSyncTask
 {
-    public AsymmetricRemoteSyncTask(RepairJobDesc desc, InetAddress fetchNode, InetAddress fetchFrom, List<Range<Token>> rangesToFetch, PreviewKind previewKind)
+    public AsymmetricRemoteSyncTask(RepairJobDesc desc, InetAddressAndPort fetchNode, InetAddressAndPort fetchFrom, List<Range<Token>> rangesToFetch, PreviewKind previewKind)
     {
         super(desc, fetchNode, fetchFrom, rangesToFetch, previewKind);
     }
 
     public void startSync(List<Range<Token>> rangesToFetch)
     {
-        InetAddress local = FBUtilities.getBroadcastAddress();
+        InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
         AsymmetricSyncRequest request = new AsymmetricSyncRequest(desc, local, fetchingNode, fetchFrom, rangesToFetch, previewKind);
         String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.fetchingNode, request.fetchFrom);
         Tracing.traceRepair(message);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java
index fe00058..4d38e8a 100644
--- a/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.repair;
 
-import java.net.InetAddress;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -30,6 +29,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.tracing.Tracing;
 
@@ -37,15 +37,15 @@ public abstract class AsymmetricSyncTask extends AbstractFuture<SyncStat> implem
 {
     private static Logger logger = LoggerFactory.getLogger(AsymmetricSyncTask.class);
     protected final RepairJobDesc desc;
-    protected final InetAddress fetchFrom;
+    protected final InetAddressAndPort fetchFrom;
     protected final List<Range<Token>> rangesToFetch;
-    protected final InetAddress fetchingNode;
+    protected final InetAddressAndPort fetchingNode;
     protected final PreviewKind previewKind;
     private long startTime = Long.MIN_VALUE;
     protected volatile SyncStat stat;
 
 
-    public AsymmetricSyncTask(RepairJobDesc desc, InetAddress fetchingNode, InetAddress fetchFrom, List<Range<Token>> rangesToFetch, PreviewKind previewKind)
+    public AsymmetricSyncTask(RepairJobDesc desc, InetAddressAndPort fetchingNode, InetAddressAndPort fetchFrom, List<Range<Token>> rangesToFetch, PreviewKind previewKind)
     {
         this.desc = desc;
         this.fetchFrom = fetchFrom;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/LocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
index 8545b22..60d571b 100644
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.repair;
 
-import java.net.InetAddress;
 import java.util.List;
 import java.util.UUID;
 
@@ -28,7 +27,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.streaming.ProgressInfo;
 import org.apache.cassandra.streaming.StreamEvent;
@@ -39,8 +38,6 @@ import org.apache.cassandra.streaming.StreamOperation;
 import org.apache.cassandra.tracing.TraceState;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.MerkleTree;
-import org.apache.cassandra.utils.MerkleTrees;
 
 /**
  * LocalSyncTask performs streaming between local(coordinator) node and remote replica.
@@ -62,7 +59,7 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler
     }
 
     @VisibleForTesting
-    StreamPlan createStreamPlan(InetAddress dst, InetAddress preferred, List<Range<Token>> differences)
+    StreamPlan createStreamPlan(InetAddressAndPort dst, InetAddressAndPort preferred, List<Range<Token>> differences)
     {
         StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, false, pendingRepair, previewKind)
                           .listeners(this)
@@ -84,10 +81,10 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler
     @Override
     protected void startSync(List<Range<Token>> differences)
     {
-        InetAddress local = FBUtilities.getBroadcastAddress();
+        InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
         // We can take anyone of the node as source or destination, however if one is localhost, we put at source to avoid a forwarding
-        InetAddress dst = r2.endpoint.equals(local) ? r1.endpoint : r2.endpoint;
-        InetAddress preferred = SystemKeyspace.getPreferredIP(dst);
+        InetAddressAndPort dst = r2.endpoint.equals(local) ? r1.endpoint : r2.endpoint;
+        InetAddressAndPort preferred = SystemKeyspace.getPreferredIP(dst);
 
         String message = String.format("Performing streaming repair of %d ranges with %s", differences.size(), dst);
         logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/NodePair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/NodePair.java b/src/java/org/apache/cassandra/repair/NodePair.java
index a73c61a..bfb237e 100644
--- a/src/java/org/apache/cassandra/repair/NodePair.java
+++ b/src/java/org/apache/cassandra/repair/NodePair.java
@@ -18,13 +18,13 @@
 package org.apache.cassandra.repair;
 
 import java.io.IOException;
-import java.net.InetAddress;
 
 import com.google.common.base.Objects;
 
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 
 /**
@@ -36,10 +36,10 @@ public class NodePair
 {
     public static IVersionedSerializer<NodePair> serializer = new NodePairSerializer();
 
-    public final InetAddress endpoint1;
-    public final InetAddress endpoint2;
+    public final InetAddressAndPort endpoint1;
+    public final InetAddressAndPort endpoint2;
 
-    public NodePair(InetAddress endpoint1, InetAddress endpoint2)
+    public NodePair(InetAddressAndPort endpoint1, InetAddressAndPort endpoint2)
     {
         this.endpoint1 = endpoint1;
         this.endpoint2 = endpoint2;
@@ -56,6 +56,12 @@ public class NodePair
     }
 
     @Override
+    public String toString()
+    {
+        return endpoint1.toString() + " - " + endpoint2.toString();
+    }
+
+    @Override
     public int hashCode()
     {
         return Objects.hashCode(endpoint1, endpoint2);
@@ -65,20 +71,21 @@ public class NodePair
     {
         public void serialize(NodePair nodePair, DataOutputPlus out, int version) throws IOException
         {
-            CompactEndpointSerializationHelper.serialize(nodePair.endpoint1, out);
-            CompactEndpointSerializationHelper.serialize(nodePair.endpoint2, out);
+            CompactEndpointSerializationHelper.instance.serialize(nodePair.endpoint1, out, version);
+            CompactEndpointSerializationHelper.instance.serialize(nodePair.endpoint2, out, version);
         }
 
         public NodePair deserialize(DataInputPlus in, int version) throws IOException
         {
-            InetAddress ep1 = CompactEndpointSerializationHelper.deserialize(in);
-            InetAddress ep2 = CompactEndpointSerializationHelper.deserialize(in);
+            InetAddressAndPort ep1 = CompactEndpointSerializationHelper.instance.deserialize(in, version);
+            InetAddressAndPort ep2 = CompactEndpointSerializationHelper.instance.deserialize(in, version);
             return new NodePair(ep1, ep2);
         }
 
         public long serializedSize(NodePair nodePair, int version)
         {
-            return 2 * CompactEndpointSerializationHelper.serializedSize(nodePair.endpoint1);
+            return CompactEndpointSerializationHelper.instance.serializedSize(nodePair.endpoint1, version)
+                 + CompactEndpointSerializationHelper.instance.serializedSize(nodePair.endpoint2, version);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
index 93feb72..0a47f73 100644
--- a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.repair;
 
-import java.net.InetAddress;
 import java.util.List;
 
 import org.slf4j.Logger;
@@ -26,6 +25,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.RepairException;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.SyncRequest;
 import org.apache.cassandra.streaming.PreviewKind;
@@ -51,7 +51,7 @@ public class RemoteSyncTask extends SyncTask implements CompletableRemoteSyncTas
     @Override
     protected void startSync(List<Range<Token>> differences)
     {
-        InetAddress local = FBUtilities.getBroadcastAddress();
+        InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
         SyncRequest request = new SyncRequest(desc, local, r1.endpoint, r2.endpoint, differences, previewKind);
         String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.src, request.dst);
         logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index 7b8eb92..48973d2 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.repair;
 
-import java.net.InetAddress;
 import java.util.*;
 import java.util.stream.Collectors;
 
@@ -33,6 +32,7 @@ import org.apache.cassandra.repair.asymmetric.DifferenceHolder;
 import org.apache.cassandra.repair.asymmetric.HostDifferences;
 import org.apache.cassandra.repair.asymmetric.PreferedNodeFilter;
 import org.apache.cassandra.repair.asymmetric.ReduceHelper;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -83,14 +83,14 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
         Keyspace ks = Keyspace.open(desc.keyspace);
         ColumnFamilyStore cfs = ks.getColumnFamilyStore(desc.columnFamily);
         cfs.metric.repairsStarted.inc();
-        List<InetAddress> allEndpoints = new ArrayList<>(session.endpoints);
-        allEndpoints.add(FBUtilities.getBroadcastAddress());
+        List<InetAddressAndPort> allEndpoints = new ArrayList<>(session.endpoints);
+        allEndpoints.add(FBUtilities.getBroadcastAddressAndPort());
 
         ListenableFuture<List<TreeResponse>> validations;
         // Create a snapshot at all nodes unless we're using pure parallel repairs
         if (parallelismDegree != RepairParallelism.PARALLEL)
         {
-            ListenableFuture<List<InetAddress>> allSnapshotTasks;
+            ListenableFuture<List<InetAddressAndPort>> allSnapshotTasks;
             if (isIncremental)
             {
                 // consistent repair does it's own "snapshotting"
@@ -99,8 +99,8 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
             else
             {
                 // Request snapshot to all replica
-                List<ListenableFuture<InetAddress>> snapshotTasks = new ArrayList<>(allEndpoints.size());
-                for (InetAddress endpoint : allEndpoints)
+                List<ListenableFuture<InetAddressAndPort>> snapshotTasks = new ArrayList<>(allEndpoints.size());
+                for (InetAddressAndPort endpoint : allEndpoints)
                 {
                     SnapshotTask snapshotTask = new SnapshotTask(desc, endpoint);
                     snapshotTasks.add(snapshotTask);
@@ -110,9 +110,9 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
             }
 
             // When all snapshot complete, send validation requests
-            validations = Futures.transformAsync(allSnapshotTasks, new AsyncFunction<List<InetAddress>, List<TreeResponse>>()
+            validations = Futures.transformAsync(allSnapshotTasks, new AsyncFunction<List<InetAddressAndPort>, List<TreeResponse>>()
             {
-                public ListenableFuture<List<TreeResponse>> apply(List<InetAddress> endpoints)
+                public ListenableFuture<List<TreeResponse>> apply(List<InetAddressAndPort> endpoints)
                 {
                     if (parallelismDegree == RepairParallelism.SEQUENTIAL)
                         return sendSequentialValidationRequest(endpoints);
@@ -164,7 +164,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
     {
         return trees ->
         {
-            InetAddress local = FBUtilities.getLocalAddress();
+            InetAddressAndPort local = FBUtilities.getLocalAddressAndPort();
 
             List<SyncTask> syncTasks = new ArrayList<>();
             // We need to difference all trees one against another
@@ -198,7 +198,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
     {
         return trees ->
         {
-            InetAddress local = FBUtilities.getLocalAddress();
+            InetAddressAndPort local = FBUtilities.getLocalAddressAndPort();
 
             List<AsymmetricSyncTask> syncTasks = new ArrayList<>();
             // We need to difference all trees one against another
@@ -210,16 +210,16 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
                                                               .filter(node -> getDC(streaming)
                                                                               .equals(getDC(node)))
                                                               .collect(Collectors.toSet());
-            ImmutableMap<InetAddress, HostDifferences> reducedDifferences = ReduceHelper.reduce(diffHolder, preferSameDCFilter);
+            ImmutableMap<InetAddressAndPort, HostDifferences> reducedDifferences = ReduceHelper.reduce(diffHolder, preferSameDCFilter);
 
             for (int i = 0; i < trees.size(); i++)
             {
-                InetAddress address = trees.get(i).endpoint;
+                InetAddressAndPort address = trees.get(i).endpoint;
                 HostDifferences streamsFor = reducedDifferences.get(address);
                 if (streamsFor != null)
                 {
                     assert streamsFor.get(address).isEmpty() : "We should not fetch ranges from ourselves";
-                    for (InetAddress fetchFrom : streamsFor.hosts())
+                    for (InetAddressAndPort fetchFrom : streamsFor.hosts())
                     {
                         List<Range<Token>> toFetch = streamsFor.get(fetchFrom);
                         logger.debug("{} is about to fetch {} from {}", address, toFetch, fetchFrom);
@@ -246,7 +246,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
         };
     }
 
-    private String getDC(InetAddress address)
+    private String getDC(InetAddressAndPort address)
     {
         return DatabaseDescriptor.getEndpointSnitch().getDatacenter(address);
     }
@@ -257,14 +257,14 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
      * @param endpoints Endpoint addresses to send validation request
      * @return Future that can get all {@link TreeResponse} from replica, if all validation succeed.
      */
-    private ListenableFuture<List<TreeResponse>> sendValidationRequest(Collection<InetAddress> endpoints)
+    private ListenableFuture<List<TreeResponse>> sendValidationRequest(Collection<InetAddressAndPort> endpoints)
     {
         String message = String.format("Requesting merkle trees for %s (to %s)", desc.columnFamily, endpoints);
         logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
         Tracing.traceRepair(message);
         int nowInSec = FBUtilities.nowInSeconds();
         List<ListenableFuture<TreeResponse>> tasks = new ArrayList<>(endpoints.size());
-        for (InetAddress endpoint : endpoints)
+        for (InetAddressAndPort endpoint : endpoints)
         {
             ValidationTask task = new ValidationTask(desc, endpoint, nowInSec, previewKind);
             tasks.add(task);
@@ -277,7 +277,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
     /**
      * Creates {@link ValidationTask} and submit them to task executor so that tasks run sequentially.
      */
-    private ListenableFuture<List<TreeResponse>> sendSequentialValidationRequest(Collection<InetAddress> endpoints)
+    private ListenableFuture<List<TreeResponse>> sendSequentialValidationRequest(Collection<InetAddressAndPort> endpoints)
     {
         String message = String.format("Requesting merkle trees for %s (to %s)", desc.columnFamily, endpoints);
         logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
@@ -285,8 +285,8 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
         int nowInSec = FBUtilities.nowInSeconds();
         List<ListenableFuture<TreeResponse>> tasks = new ArrayList<>(endpoints.size());
 
-        Queue<InetAddress> requests = new LinkedList<>(endpoints);
-        InetAddress address = requests.poll();
+        Queue<InetAddressAndPort> requests = new LinkedList<>(endpoints);
+        InetAddressAndPort address = requests.poll();
         ValidationTask firstTask = new ValidationTask(desc, address, nowInSec, previewKind);
         logger.info("Validating {}", address);
         session.waitForValidation(Pair.create(desc, address), firstTask);
@@ -294,7 +294,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
         ValidationTask currentTask = firstTask;
         while (requests.size() > 0)
         {
-            final InetAddress nextAddress = requests.poll();
+            final InetAddressAndPort nextAddress = requests.poll();
             final ValidationTask nextTask = new ValidationTask(desc, nextAddress, nowInSec, previewKind);
             tasks.add(nextTask);
             Futures.addCallback(currentTask, new FutureCallback<TreeResponse>()
@@ -319,7 +319,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
     /**
      * Creates {@link ValidationTask} and submit them to task executor so that tasks run sequentially within each dc.
      */
-    private ListenableFuture<List<TreeResponse>> sendDCAwareValidationRequest(Collection<InetAddress> endpoints)
+    private ListenableFuture<List<TreeResponse>> sendDCAwareValidationRequest(Collection<InetAddressAndPort> endpoints)
     {
         String message = String.format("Requesting merkle trees for %s (to %s)", desc.columnFamily, endpoints);
         logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
@@ -327,11 +327,11 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
         int nowInSec = FBUtilities.nowInSeconds();
         List<ListenableFuture<TreeResponse>> tasks = new ArrayList<>(endpoints.size());
 
-        Map<String, Queue<InetAddress>> requestsByDatacenter = new HashMap<>();
-        for (InetAddress endpoint : endpoints)
+        Map<String, Queue<InetAddressAndPort>> requestsByDatacenter = new HashMap<>();
+        for (InetAddressAndPort endpoint : endpoints)
         {
             String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint);
-            Queue<InetAddress> queue = requestsByDatacenter.get(dc);
+            Queue<InetAddressAndPort> queue = requestsByDatacenter.get(dc);
             if (queue == null)
             {
                 queue = new LinkedList<>();
@@ -340,10 +340,10 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
             queue.add(endpoint);
         }
 
-        for (Map.Entry<String, Queue<InetAddress>> entry : requestsByDatacenter.entrySet())
+        for (Map.Entry<String, Queue<InetAddressAndPort>> entry : requestsByDatacenter.entrySet())
         {
-            Queue<InetAddress> requests = entry.getValue();
-            InetAddress address = requests.poll();
+            Queue<InetAddressAndPort> requests = entry.getValue();
+            InetAddressAndPort address = requests.poll();
             ValidationTask firstTask = new ValidationTask(desc, address, nowInSec, previewKind);
             logger.info("Validating {}", address);
             session.waitForValidation(Pair.create(desc, address), firstTask);
@@ -351,7 +351,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
             ValidationTask currentTask = firstTask;
             while (requests.size() > 0)
             {
-                final InetAddress nextAddress = requests.poll();
+                final InetAddressAndPort nextAddress = requests.poll();
                 final ValidationTask nextTask = new ValidationTask(desc, nextAddress, nowInSec, previewKind);
                 tasks.add(nextTask);
                 Futures.addCallback(currentTask, new FutureCallback<TreeResponse>()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index c26d4d1..4c0a564 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.repair;
 
-import java.net.InetAddress;
 import java.util.*;
 
 import com.google.common.base.Predicate;
@@ -28,10 +27,12 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ParameterType;
 import org.apache.cassandra.repair.messages.*;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -225,11 +226,11 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
         }
     }
 
-    private void logErrorAndSendFailureResponse(String errorMessage, InetAddress to, int id)
+    private void logErrorAndSendFailureResponse(String errorMessage, InetAddressAndPort to, int id)
     {
         logger.error(errorMessage);
         MessageOut reply = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE)
-                               .withParameter(MessagingService.FAILURE_RESPONSE_PARAM, MessagingService.ONE_BYTE);
+                               .withParameter(ParameterType.FAILURE_RESPONSE, MessagingService.ONE_BYTE);
         MessagingService.instance().sendReply(reply, id, to);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 1c9778b..5121874 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -60,6 +60,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.repair.consistent.CoordinatorSession;
 import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.messages.RepairOption;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.QueryState;
@@ -141,10 +142,10 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
     @VisibleForTesting
     static class CommonRange
     {
-        public final Set<InetAddress> endpoints;
+        public final Set<InetAddressAndPort> endpoints;
         public final Collection<Range<Token>> ranges;
 
-        public CommonRange(Set<InetAddress> endpoints, Collection<Range<Token>> ranges)
+        public CommonRange(Set<InetAddressAndPort> endpoints, Collection<Range<Token>> ranges)
         {
             Preconditions.checkArgument(endpoints != null && !endpoints.isEmpty());
             Preconditions.checkArgument(ranges != null && !ranges.isEmpty());
@@ -232,7 +233,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
             traceState = null;
         }
 
-        final Set<InetAddress> allNeighbors = new HashSet<>();
+        final Set<InetAddressAndPort> allNeighbors = new HashSet<>();
         List<CommonRange> commonRanges = new ArrayList<>();
 
         //pre-calculate output of getLocalRanges and pass it to getNeighbors to increase performance and prevent
@@ -243,9 +244,9 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
         {
             for (Range<Token> range : options.getRanges())
             {
-                Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range,
-                                                                              options.getDataCenters(),
-                                                                              options.getHosts());
+                Set<InetAddressAndPort> neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range,
+                                                                                     options.getDataCenters(),
+                                                                                     options.getHosts());
 
                 addRangeToNeighbors(commonRanges, range, neighbors);
                 allNeighbors.addAll(neighbors);
@@ -286,7 +287,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
 
         try (Timer.Context ctx = Keyspace.open(keyspace).metric.repairPrepareTime.time())
         {
-            ActiveRepairService.instance.prepareForRepair(parentSession, FBUtilities.getBroadcastAddress(), allNeighbors, options, columnFamilyStores);
+            ActiveRepairService.instance.prepareForRepair(parentSession, FBUtilities.getBroadcastAddressAndPort(), allNeighbors, options, columnFamilyStores);
             progress.incrementAndGet();
         }
         catch (Throwable t)
@@ -362,7 +363,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
      * removes dead nodes from common ranges, and exludes ranges left without any participants
      */
     @VisibleForTesting
-    static List<CommonRange> filterCommonRanges(List<CommonRange> commonRanges, Set<InetAddress> liveEndpoints, boolean force)
+    static List<CommonRange> filterCommonRanges(List<CommonRange> commonRanges, Set<InetAddressAndPort> liveEndpoints, boolean force)
     {
         if (!force)
         {
@@ -374,7 +375,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
 
             for (CommonRange commonRange: commonRanges)
             {
-                Set<InetAddress> endpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.endpoints, liveEndpoints::contains));
+                Set<InetAddressAndPort> endpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.endpoints, liveEndpoints::contains));
 
                 // this node is implicitly a participant in this repair, so a single endpoint is ok here
                 if (!endpoints.isEmpty())
@@ -391,15 +392,15 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
                                    long startTime,
                                    boolean forceRepair,
                                    TraceState traceState,
-                                   Set<InetAddress> allNeighbors,
+                                   Set<InetAddressAndPort> allNeighbors,
                                    List<CommonRange> commonRanges,
                                    String... cfnames)
     {
         // the local node also needs to be included in the set of participants, since coordinator sessions aren't persisted
-        Predicate<InetAddress> isAlive = FailureDetector.instance::isAlive;
-        Set<InetAddress> allParticipants = ImmutableSet.<InetAddress>builder()
+        Predicate<InetAddressAndPort> isAlive = FailureDetector.instance::isAlive;
+        Set<InetAddressAndPort> allParticipants = ImmutableSet.<InetAddressAndPort>builder()
                                            .addAll(forceRepair ? Iterables.filter(allNeighbors, isAlive) : allNeighbors)
-                                           .add(FBUtilities.getBroadcastAddress())
+                                           .add(FBUtilities.getBroadcastAddressAndPort())
                                            .build();
 
         List<CommonRange> allRanges = filterCommonRanges(commonRanges, allParticipants, forceRepair);
@@ -673,7 +674,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
                                                ImmutableList.of(failureMessage, completionMessage));
     }
 
-    private void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, Set<InetAddress> neighbors)
+    private void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, Set<InetAddressAndPort> neighbors)
     {
         for (int i = 0; i < neighborRangeList.size(); i++)
         {
@@ -708,7 +709,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
                 SelectStatement statement = (SelectStatement) QueryProcessor.parseStatement(query).prepare().statement;
 
                 ByteBuffer sessionIdBytes = ByteBufferUtil.bytes(sessionId);
-                InetAddress source = FBUtilities.getBroadcastAddress();
+                InetAddressAndPort source = FBUtilities.getBroadcastAddressAndPort();
 
                 HashSet<UUID>[] seen = new HashSet[] { new HashSet<>(), new HashSet<>() };
                 int si = 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index 609ec56..91d767d 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.repair;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -34,6 +33,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.*;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.streaming.SessionSummary;
 import org.apache.cassandra.tracing.Tracing;
@@ -95,14 +95,14 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
 
     /** Range to repair */
     public final Collection<Range<Token>> ranges;
-    public final Set<InetAddress> endpoints;
+    public final Set<InetAddressAndPort> endpoints;
     public final boolean isIncremental;
     public final PreviewKind previewKind;
 
     private final AtomicBoolean isFailed = new AtomicBoolean(false);
 
     // Each validation task waits response from replica in validating ConcurrentMap (keyed by CF name and endpoint address)
-    private final ConcurrentMap<Pair<RepairJobDesc, InetAddress>, ValidationTask> validating = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Pair<RepairJobDesc, InetAddressAndPort>, ValidationTask> validating = new ConcurrentHashMap<>();
     // Remote syncing jobs wait response in syncingTasks map
     private final ConcurrentMap<Pair<RepairJobDesc, NodePair>, CompletableRemoteSyncTask> syncingTasks = new ConcurrentHashMap<>();
 
@@ -130,7 +130,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
                          Collection<Range<Token>> ranges,
                          String keyspace,
                          RepairParallelism parallelismDegree,
-                         Set<InetAddress> endpoints,
+                         Set<InetAddressAndPort> endpoints,
                          boolean isIncremental,
                          boolean pullRepair,
                          boolean force,
@@ -152,8 +152,8 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         if (force)
         {
             logger.debug("force flag set, removing dead endpoints");
-            final Set<InetAddress> removeCandidates = new HashSet<>();
-            for (final InetAddress endpoint : endpoints)
+            final Set<InetAddressAndPort> removeCandidates = new HashSet<>();
+            for (final InetAddressAndPort endpoint : endpoints)
             {
                 if (!FailureDetector.instance.isAlive(endpoint))
                 {
@@ -189,7 +189,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         return ranges;
     }
 
-    public void waitForValidation(Pair<RepairJobDesc, InetAddress> key, ValidationTask task)
+    public void waitForValidation(Pair<RepairJobDesc, InetAddressAndPort> key, ValidationTask task)
     {
         validating.put(key, task);
     }
@@ -207,7 +207,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
      * @param endpoint endpoint that sent merkle tree
      * @param trees calculated merkle trees, or null if validation failed
      */
-    public void validationComplete(RepairJobDesc desc, InetAddress endpoint, MerkleTrees trees)
+    public void validationComplete(RepairJobDesc desc, InetAddressAndPort endpoint, MerkleTrees trees)
     {
         ValidationTask task = validating.remove(Pair.create(desc, endpoint));
         if (task == null)
@@ -245,8 +245,8 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
     private String repairedNodes()
     {
         StringBuilder sb = new StringBuilder();
-        sb.append(FBUtilities.getBroadcastAddress());
-        for (InetAddress ep : endpoints)
+        sb.append(FBUtilities.getBroadcastAddressAndPort());
+        for (InetAddressAndPort ep : endpoints)
             sb.append(", ").append(ep);
         return sb.toString();
     }
@@ -285,7 +285,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         }
 
         // Checking all nodes are live
-        for (InetAddress endpoint : endpoints)
+        for (InetAddressAndPort endpoint : endpoints)
         {
             if (!FailureDetector.instance.isAlive(endpoint) && !skippedReplicas)
             {
@@ -353,23 +353,23 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         terminate();
     }
 
-    public void onJoin(InetAddress endpoint, EndpointState epState) {}
-    public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
-    public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
-    public void onAlive(InetAddress endpoint, EndpointState state) {}
-    public void onDead(InetAddress endpoint, EndpointState state) {}
+    public void onJoin(InetAddressAndPort endpoint, EndpointState epState) {}
+    public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
+    public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) {}
+    public void onAlive(InetAddressAndPort endpoint, EndpointState state) {}
+    public void onDead(InetAddressAndPort endpoint, EndpointState state) {}
 
-    public void onRemove(InetAddress endpoint)
+    public void onRemove(InetAddressAndPort endpoint)
     {
         convict(endpoint, Double.MAX_VALUE);
     }
 
-    public void onRestart(InetAddress endpoint, EndpointState epState)
+    public void onRestart(InetAddressAndPort endpoint, EndpointState epState)
     {
         convict(endpoint, Double.MAX_VALUE);
     }
 
-    public void convict(InetAddress endpoint, double phi)
+    public void convict(InetAddressAndPort endpoint, double phi)
     {
         if (!endpoints.contains(endpoint))
             return;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/SnapshotTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SnapshotTask.java b/src/java/org/apache/cassandra/repair/SnapshotTask.java
index 2b267a7..acc5186 100644
--- a/src/java/org/apache/cassandra/repair/SnapshotTask.java
+++ b/src/java/org/apache/cassandra/repair/SnapshotTask.java
@@ -17,13 +17,13 @@
  */
 package org.apache.cassandra.repair;
 
-import java.net.InetAddress;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.util.concurrent.AbstractFuture;
 
 import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
@@ -32,12 +32,12 @@ import org.apache.cassandra.repair.messages.SnapshotMessage;
 /**
  * SnapshotTask is a task that sends snapshot request.
  */
-public class SnapshotTask extends AbstractFuture<InetAddress> implements RunnableFuture<InetAddress>
+public class SnapshotTask extends AbstractFuture<InetAddressAndPort> implements RunnableFuture<InetAddressAndPort>
 {
     private final RepairJobDesc desc;
-    private final InetAddress endpoint;
+    private final InetAddressAndPort endpoint;
 
-    public SnapshotTask(RepairJobDesc desc, InetAddress endpoint)
+    public SnapshotTask(RepairJobDesc desc, InetAddressAndPort endpoint)
     {
         this.desc = desc;
         this.endpoint = endpoint;
@@ -74,7 +74,7 @@ public class SnapshotTask extends AbstractFuture<InetAddress> implements Runnabl
 
         public boolean isLatencyForSnitch() { return false; }
 
-        public void onFailure(InetAddress from, RequestFailureReason failureReason)
+        public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason)
         {
             //listener.failedSnapshot();
             task.setException(new RuntimeException("Could not create snapshot at " + from));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index a1b7459..59fee0b 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.repair;
 
-import java.net.InetAddress;
 import java.util.UUID;
 import java.util.Collections;
 import java.util.Collection;
@@ -29,6 +28,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.SyncComplete;
 import org.apache.cassandra.streaming.PreviewKind;
@@ -48,14 +49,14 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
 
     private final RepairJobDesc desc;
     private final boolean asymmetric;
-    private final InetAddress initiator;
-    private final InetAddress src;
-    private final InetAddress dst;
+    private final InetAddressAndPort initiator;
+    private final InetAddressAndPort src;
+    private final InetAddressAndPort dst;
     private final Collection<Range<Token>> ranges;
     private final UUID pendingRepair;
     private final PreviewKind previewKind;
 
-    public StreamingRepairTask(RepairJobDesc desc, InetAddress initiator, InetAddress src, InetAddress dst, Collection<Range<Token>> ranges,  UUID pendingRepair, PreviewKind previewKind, boolean asymmetric)
+    public StreamingRepairTask(RepairJobDesc desc, InetAddressAndPort initiator, InetAddressAndPort src, InetAddressAndPort dst, Collection<Range<Token>> ranges,  UUID pendingRepair, PreviewKind previewKind, boolean asymmetric)
     {
         this.desc = desc;
         this.initiator = initiator;
@@ -69,14 +70,14 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
 
     public void run()
     {
-        InetAddress dest = dst;
-        InetAddress preferred = SystemKeyspace.getPreferredIP(dest);
+        InetAddressAndPort dest = dst;
+        InetAddressAndPort preferred = SystemKeyspace.getPreferredIP(dest);
         logger.info("[streaming task #{}] Performing streaming repair of {} ranges with {}", desc.sessionId, ranges.size(), dst);
         createStreamPlan(dest, preferred).execute();
     }
 
     @VisibleForTesting
-    StreamPlan createStreamPlan(InetAddress dest, InetAddress preferred)
+    StreamPlan createStreamPlan(InetAddressAndPort dest, InetAddressAndPort preferred)
     {
         StreamPlan sp = new StreamPlan(StreamOperation.REPAIR, 1, false, false, pendingRepair, previewKind)
                .listeners(this)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
index 3770621..b46ae5e 100644
--- a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
+++ b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.repair;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
-import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -45,6 +44,7 @@ import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.messages.RepairOption;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
@@ -74,48 +74,50 @@ public final class SystemDistributedKeyspace
 
     private static final TableMetadata RepairHistory =
         parse(REPAIR_HISTORY,
-              "Repair history",
-              "CREATE TABLE %s ("
-              + "keyspace_name text,"
-              + "columnfamily_name text,"
-              + "id timeuuid,"
-              + "parent_id timeuuid,"
-              + "range_begin text,"
-              + "range_end text,"
-              + "coordinator inet,"
-              + "participants set<inet>,"
-              + "exception_message text,"
-              + "exception_stacktrace text,"
-              + "status text,"
-              + "started_at timestamp,"
-              + "finished_at timestamp,"
-              + "PRIMARY KEY ((keyspace_name, columnfamily_name), id))");
+                "Repair history",
+                "CREATE TABLE %s ("
+                     + "keyspace_name text,"
+                     + "columnfamily_name text,"
+                     + "id timeuuid,"
+                     + "parent_id timeuuid,"
+                     + "range_begin text,"
+                     + "range_end text,"
+                     + "coordinator inet,"
+                     + "coordinator_port int,"
+                     + "participants set<inet>,"
+                     + "participants_v2 set<text>,"
+                     + "exception_message text,"
+                     + "exception_stacktrace text,"
+                     + "status text,"
+                     + "started_at timestamp,"
+                     + "finished_at timestamp,"
+                     + "PRIMARY KEY ((keyspace_name, columnfamily_name), id))");
 
     private static final TableMetadata ParentRepairHistory =
         parse(PARENT_REPAIR_HISTORY,
-              "Repair history",
-              "CREATE TABLE %s ("
-              + "parent_id timeuuid,"
-              + "keyspace_name text,"
-              + "columnfamily_names set<text>,"
-              + "started_at timestamp,"
-              + "finished_at timestamp,"
-              + "exception_message text,"
-              + "exception_stacktrace text,"
-              + "requested_ranges set<text>,"
-              + "successful_ranges set<text>,"
-              + "options map<text, text>,"
-              + "PRIMARY KEY (parent_id))");
+                "Repair history",
+                "CREATE TABLE %s ("
+                     + "parent_id timeuuid,"
+                     + "keyspace_name text,"
+                     + "columnfamily_names set<text>,"
+                     + "started_at timestamp,"
+                     + "finished_at timestamp,"
+                     + "exception_message text,"
+                     + "exception_stacktrace text,"
+                     + "requested_ranges set<text>,"
+                     + "successful_ranges set<text>,"
+                     + "options map<text, text>,"
+                     + "PRIMARY KEY (parent_id))");
 
     private static final TableMetadata ViewBuildStatus =
         parse(VIEW_BUILD_STATUS,
-              "Materialized View build status",
-              "CREATE TABLE %s ("
-              + "keyspace_name text,"
-              + "view_name text,"
-              + "host_id uuid,"
-              + "status text,"
-              + "PRIMARY KEY ((keyspace_name, view_name), host_id))");
+            "Materialized View build status",
+            "CREATE TABLE %s ("
+                     + "keyspace_name text,"
+                     + "view_name text,"
+                     + "host_id uuid,"
+                     + "status text,"
+                     + "PRIMARY KEY ((keyspace_name, view_name), host_id))");
 
     private static TableMetadata parse(String table, String description, String cql)
     {
@@ -184,17 +186,21 @@ public final class SystemDistributedKeyspace
         processSilent(fmtQuery);
     }
 
-    public static void startRepairs(UUID id, UUID parent_id, String keyspaceName, String[] cfnames, Collection<Range<Token>> ranges, Iterable<InetAddress> endpoints)
+    public static void startRepairs(UUID id, UUID parent_id, String keyspaceName, String[] cfnames, Collection<Range<Token>> ranges, Iterable<InetAddressAndPort> endpoints)
     {
-        String coordinator = FBUtilities.getBroadcastAddress().getHostAddress();
-        Set<String> participants = Sets.newHashSet(coordinator);
+        InetAddressAndPort coordinator = FBUtilities.getBroadcastAddressAndPort();
+        Set<String> participants = Sets.newHashSet();
+        Set<String> participants_v2 = Sets.newHashSet();
 
-        for (InetAddress endpoint : endpoints)
-            participants.add(endpoint.getHostAddress());
+        for (InetAddressAndPort endpoint : endpoints)
+        {
+            participants.add(endpoint.getHostAddress(false));
+            participants_v2.add(endpoint.toString());
+        }
 
         String query =
-                "INSERT INTO %s.%s (keyspace_name, columnfamily_name, id, parent_id, range_begin, range_end, coordinator, participants, status, started_at) " +
-                        "VALUES (   '%s',          '%s',              %s, %s,        '%s',        '%s',      '%s',        { '%s' },     '%s',   toTimestamp(now()))";
+                "INSERT INTO %s.%s (keyspace_name, columnfamily_name, id, parent_id, range_begin, range_end, coordinator, coordinator_port, participants, participants_v2, status, started_at) " +
+                        "VALUES (   '%s',          '%s',              %s, %s,        '%s',        '%s',      '%s',        %d,               { '%s' },     { '%s' },        '%s',   toTimestamp(now()))";
 
         for (String cfname : cfnames)
         {
@@ -207,8 +213,10 @@ public final class SystemDistributedKeyspace
                                               parent_id.toString(),
                                               range.left.toString(),
                                               range.right.toString(),
-                                              coordinator,
+                                              coordinator.getHostAddress(false),
+                                              coordinator.port,
                                               Joiner.on("', '").join(participants),
+                                              Joiner.on("', '").join(participants_v2),
                                               RepairState.STARTED.toString());
                 processSilent(fmtQry);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/TreeResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/TreeResponse.java b/src/java/org/apache/cassandra/repair/TreeResponse.java
index c898b36..8571caa 100644
--- a/src/java/org/apache/cassandra/repair/TreeResponse.java
+++ b/src/java/org/apache/cassandra/repair/TreeResponse.java
@@ -17,8 +17,7 @@
  */
 package org.apache.cassandra.repair;
 
-import java.net.InetAddress;
-
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.utils.MerkleTrees;
 
 /**
@@ -26,10 +25,10 @@ import org.apache.cassandra.utils.MerkleTrees;
  */
 public class TreeResponse
 {
-    public final InetAddress endpoint;
+    public final InetAddressAndPort endpoint;
     public final MerkleTrees trees;
 
-    public TreeResponse(InetAddress endpoint, MerkleTrees trees)
+    public TreeResponse(InetAddressAndPort endpoint, MerkleTrees trees)
     {
         this.endpoint = endpoint;
         this.trees = trees;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/ValidationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/ValidationTask.java b/src/java/org/apache/cassandra/repair/ValidationTask.java
index 175709f..fc500cf 100644
--- a/src/java/org/apache/cassandra/repair/ValidationTask.java
+++ b/src/java/org/apache/cassandra/repair/ValidationTask.java
@@ -17,11 +17,10 @@
  */
 package org.apache.cassandra.repair;
 
-import java.net.InetAddress;
-
 import com.google.common.util.concurrent.AbstractFuture;
 
 import org.apache.cassandra.exceptions.RepairException;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.ValidationRequest;
 import org.apache.cassandra.streaming.PreviewKind;
@@ -34,11 +33,11 @@ import org.apache.cassandra.utils.MerkleTrees;
 public class ValidationTask extends AbstractFuture<TreeResponse> implements Runnable
 {
     private final RepairJobDesc desc;
-    private final InetAddress endpoint;
+    private final InetAddressAndPort endpoint;
     private final int nowInSec;
     private final PreviewKind previewKind;
 
-    public ValidationTask(RepairJobDesc desc, InetAddress endpoint, int nowInSec, PreviewKind previewKind)
+    public ValidationTask(RepairJobDesc desc, InetAddressAndPort endpoint, int nowInSec, PreviewKind previewKind)
     {
         this.desc = desc;
         this.endpoint = endpoint;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java
index 9803638..4c2856d 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.repair;
 
-import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
@@ -41,6 +40,7 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.ValidationComplete;
 import org.apache.cassandra.streaming.PreviewKind;
@@ -64,7 +64,7 @@ public class Validator implements Runnable
     private static final Logger logger = LoggerFactory.getLogger(Validator.class);
 
     public final RepairJobDesc desc;
-    public final InetAddress initiator;
+    public final InetAddressAndPort initiator;
     public final int nowInSec;
     private final boolean evenTreeDistribution;
     public final boolean isIncremental;
@@ -81,17 +81,17 @@ public class Validator implements Runnable
 
     private final PreviewKind previewKind;
 
-    public Validator(RepairJobDesc desc, InetAddress initiator, int nowInSec, PreviewKind previewKind)
+    public Validator(RepairJobDesc desc, InetAddressAndPort initiator, int nowInSec, PreviewKind previewKind)
     {
         this(desc, initiator, nowInSec, false, false, previewKind);
     }
 
-    public Validator(RepairJobDesc desc, InetAddress initiator, int nowInSec, boolean isIncremental, PreviewKind previewKind)
+    public Validator(RepairJobDesc desc, InetAddressAndPort initiator, int nowInSec, boolean isIncremental, PreviewKind previewKind)
     {
         this(desc, initiator, nowInSec, false, isIncremental, previewKind);
     }
 
-    public Validator(RepairJobDesc desc, InetAddress initiator, int nowInSec, boolean evenTreeDistribution, boolean isIncremental, PreviewKind previewKind)
+    public Validator(RepairJobDesc desc, InetAddressAndPort initiator, int nowInSec, boolean evenTreeDistribution, boolean isIncremental, PreviewKind previewKind)
     {
         this.desc = desc;
         this.initiator = initiator;
@@ -352,7 +352,7 @@ public class Validator implements Runnable
     public void run()
     {
         // respond to the request that triggered this validation
-        if (!initiator.equals(FBUtilities.getBroadcastAddress()))
+        if (!initiator.equals(FBUtilities.getBroadcastAddressAndPort()))
         {
             logger.info("{} Sending completed merkle tree to {} for {}.{}", previewKind.logPrefix(desc.sessionId), initiator, desc.keyspace, desc.columnFamily);
             Tracing.traceRepair("Sending completed merkle tree to {} for {}.{}", initiator, desc.keyspace, desc.columnFamily);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java b/src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java
index eb99977..c9b7ed7 100644
--- a/src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java
+++ b/src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.repair.asymmetric;
 
-import java.net.InetAddress;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -28,6 +27,7 @@ import com.google.common.collect.ImmutableMap;
 
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.TreeResponse;
 import org.apache.cassandra.utils.MerkleTrees;
 
@@ -36,11 +36,11 @@ import org.apache.cassandra.utils.MerkleTrees;
  */
 public class DifferenceHolder
 {
-    private final ImmutableMap<InetAddress, HostDifferences> differences;
+    private final ImmutableMap<InetAddressAndPort, HostDifferences> differences;
 
     public DifferenceHolder(List<TreeResponse> trees)
     {
-        ImmutableMap.Builder<InetAddress, HostDifferences> diffBuilder = ImmutableMap.builder();
+        ImmutableMap.Builder<InetAddressAndPort, HostDifferences> diffBuilder = ImmutableMap.builder();
         for (int i = 0; i < trees.size() - 1; ++i)
         {
             TreeResponse r1 = trees.get(i);
@@ -58,9 +58,9 @@ public class DifferenceHolder
     }
 
     @VisibleForTesting
-    DifferenceHolder(Map<InetAddress, HostDifferences> differences)
+    DifferenceHolder(Map<InetAddressAndPort, HostDifferences> differences)
     {
-        ImmutableMap.Builder<InetAddress, HostDifferences> diffBuilder = ImmutableMap.builder();
+        ImmutableMap.Builder<InetAddressAndPort, HostDifferences> diffBuilder = ImmutableMap.builder();
         diffBuilder.putAll(differences);
         this.differences = diffBuilder.build();
     }
@@ -68,12 +68,12 @@ public class DifferenceHolder
     /**
      * differences only holds one 'side' of the difference - if A and B mismatch, only A will be a key in the map
      */
-    public Set<InetAddress> keyHosts()
+    public Set<InetAddressAndPort> keyHosts()
     {
         return differences.keySet();
     }
 
-    public HostDifferences get(InetAddress hostWithDifference)
+    public HostDifferences get(InetAddressAndPort hostWithDifference)
     {
         return differences.get(hostWithDifference);
     }
@@ -85,7 +85,7 @@ public class DifferenceHolder
                '}';
     }
 
-    public boolean hasDifferenceBetween(InetAddress node1, InetAddress node2, Range<Token> range)
+    public boolean hasDifferenceBetween(InetAddressAndPort node1, InetAddressAndPort node2, Range<Token> range)
     {
         HostDifferences diffsNode1 = differences.get(node1);
         if (diffsNode1 != null && diffsNode1.hasDifferencesFor(node2, range))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java b/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java
index 6cbe987..ab294b9 100644
--- a/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java
+++ b/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.repair.asymmetric;
 
-import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -28,23 +27,24 @@ import java.util.Set;
 
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
 
 /**
  * Tracks the differences for a single host
  */
 public class HostDifferences
 {
-    private final Map<InetAddress, List<Range<Token>>> perHostDifferences = new HashMap<>();
+    private final Map<InetAddressAndPort, List<Range<Token>>> perHostDifferences = new HashMap<>();
 
     /**
      * Adds a set of differences between the node this instance is tracking and endpoint
      */
-    public void add(InetAddress endpoint, List<Range<Token>> difference)
+    public void add(InetAddressAndPort endpoint, List<Range<Token>> difference)
     {
         perHostDifferences.put(endpoint, difference);
     }
 
-    public void addSingleRange(InetAddress remoteNode, Range<Token> rangeToFetch)
+    public void addSingleRange(InetAddressAndPort remoteNode, Range<Token> rangeToFetch)
     {
         perHostDifferences.computeIfAbsent(remoteNode, (x) -> new ArrayList<>()).add(rangeToFetch);
     }
@@ -52,7 +52,7 @@ public class HostDifferences
     /**
      * Does this instance have differences for range with node2?
      */
-    public boolean hasDifferencesFor(InetAddress node2, Range<Token> range)
+    public boolean hasDifferencesFor(InetAddressAndPort node2, Range<Token> range)
     {
         List<Range<Token>> differences = get(node2);
         for (Range<Token> diff : differences)
@@ -64,12 +64,12 @@ public class HostDifferences
         return false;
     }
 
-    public Set<InetAddress> hosts()
+    public Set<InetAddressAndPort> hosts()
     {
         return perHostDifferences.keySet();
     }
 
-    public List<Range<Token>> get(InetAddress differingHost)
+    public List<Range<Token>> get(InetAddressAndPort differingHost)
     {
         return perHostDifferences.getOrDefault(differingHost, Collections.emptyList());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java b/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java
index b41ddd8..450336f 100644
--- a/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java
+++ b/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.repair.asymmetric;
 
-import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -29,6 +28,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
 
 /**
  * Tracks incoming streams for a single host
@@ -60,7 +60,7 @@ public class IncomingRepairStreamTracker
      * @param range the range we need to stream from streamFromNode
      * @param streamFromNode the node we should stream from
      */
-    public void addIncomingRangeFrom(Range<Token> range, InetAddress streamFromNode)
+    public void addIncomingRangeFrom(Range<Token> range, InetAddressAndPort streamFromNode)
     {
         logger.trace("adding incoming range {} from {}", range, streamFromNode);
         Set<Range<Token>> newInput = RangeDenormalizer.denormalize(range, incoming);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/asymmetric/PreferedNodeFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/PreferedNodeFilter.java b/src/java/org/apache/cassandra/repair/asymmetric/PreferedNodeFilter.java
index 90788dc..e8ca85d 100644
--- a/src/java/org/apache/cassandra/repair/asymmetric/PreferedNodeFilter.java
+++ b/src/java/org/apache/cassandra/repair/asymmetric/PreferedNodeFilter.java
@@ -18,10 +18,11 @@
 
 package org.apache.cassandra.repair.asymmetric;
 
-import java.net.InetAddress;
 import java.util.Set;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
+
 public interface PreferedNodeFilter
 {
-    public Set<InetAddress> apply(InetAddress streamingNode, Set<InetAddress> toStream);
+    public Set<InetAddressAndPort> apply(InetAddressAndPort streamingNode, Set<InetAddressAndPort> toStream);
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org