You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/01/25 20:12:09 UTC

[12/19] cassandra git commit: Allow storage port to be configurable per node

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java b/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java
index ce05e93..c7d45bf 100644
--- a/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java
+++ b/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.repair.asymmetric;
 
-import java.net.InetAddress;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -31,6 +30,7 @@ import com.google.common.collect.ImmutableMap;
 
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
 
 /**
  * Basic idea is that we track incoming ranges instead of blindly just exchanging the ranges that mismatch between two nodes
@@ -47,19 +47,19 @@ public class ReduceHelper
     /**
      * Reduces the differences provided by the merkle trees to a minimum set of differences
      */
-    public static ImmutableMap<InetAddress, HostDifferences> reduce(DifferenceHolder differences, PreferedNodeFilter filter)
+    public static ImmutableMap<InetAddressAndPort, HostDifferences> reduce(DifferenceHolder differences, PreferedNodeFilter filter)
     {
-        Map<InetAddress, IncomingRepairStreamTracker> trackers = createIncomingRepairStreamTrackers(differences);
-        Map<InetAddress, Integer> outgoingStreamCounts = new HashMap<>();
-        ImmutableMap.Builder<InetAddress, HostDifferences> mapBuilder = ImmutableMap.builder();
-        for (Map.Entry<InetAddress, IncomingRepairStreamTracker> trackerEntry : trackers.entrySet())
+        Map<InetAddressAndPort, IncomingRepairStreamTracker> trackers = createIncomingRepairStreamTrackers(differences);
+        Map<InetAddressAndPort, Integer> outgoingStreamCounts = new HashMap<>();
+        ImmutableMap.Builder<InetAddressAndPort, HostDifferences> mapBuilder = ImmutableMap.builder();
+        for (Map.Entry<InetAddressAndPort, IncomingRepairStreamTracker> trackerEntry : trackers.entrySet())
         {
             IncomingRepairStreamTracker tracker = trackerEntry.getValue();
             HostDifferences rangesToFetch = new HostDifferences();
             for (Map.Entry<Range<Token>, StreamFromOptions> entry : tracker.getIncoming().entrySet())
             {
                 Range<Token> rangeToFetch = entry.getKey();
-                for (InetAddress remoteNode : pickLeastStreaming(trackerEntry.getKey(), entry.getValue(), outgoingStreamCounts, filter))
+                for (InetAddressAndPort remoteNode : pickLeastStreaming(trackerEntry.getKey(), entry.getValue(), outgoingStreamCounts, filter))
                     rangesToFetch.addSingleRange(remoteNode, rangeToFetch);
             }
             mapBuilder.put(trackerEntry.getKey(), rangesToFetch);
@@ -69,14 +69,14 @@ public class ReduceHelper
     }
 
     @VisibleForTesting
-    static Map<InetAddress, IncomingRepairStreamTracker> createIncomingRepairStreamTrackers(DifferenceHolder differences)
+    static Map<InetAddressAndPort, IncomingRepairStreamTracker> createIncomingRepairStreamTrackers(DifferenceHolder differences)
     {
-        Map<InetAddress, IncomingRepairStreamTracker> trackers = new HashMap<>();
+        Map<InetAddressAndPort, IncomingRepairStreamTracker> trackers = new HashMap<>();
 
-        for (InetAddress hostWithDifference : differences.keyHosts())
+        for (InetAddressAndPort hostWithDifference : differences.keyHosts())
         {
             HostDifferences hostDifferences = differences.get(hostWithDifference);
-            for (InetAddress differingHost : hostDifferences.hosts())
+            for (InetAddressAndPort differingHost : hostDifferences.hosts())
             {
                 List<Range<Token>> differingRanges = hostDifferences.get(differingHost);
                 // hostWithDifference has mismatching ranges differingRanges with differingHost:
@@ -93,24 +93,24 @@ public class ReduceHelper
     }
 
     private static IncomingRepairStreamTracker getTracker(DifferenceHolder differences,
-                                                          Map<InetAddress, IncomingRepairStreamTracker> trackers,
-                                                          InetAddress host)
+                                                          Map<InetAddressAndPort, IncomingRepairStreamTracker> trackers,
+                                                          InetAddressAndPort host)
     {
         return trackers.computeIfAbsent(host, (h) -> new IncomingRepairStreamTracker(differences));
     }
 
     // greedily pick the nodes doing the least amount of streaming
-    private static Collection<InetAddress> pickLeastStreaming(InetAddress streamingNode,
+    private static Collection<InetAddressAndPort> pickLeastStreaming(InetAddressAndPort streamingNode,
                                                               StreamFromOptions toStreamFrom,
-                                                              Map<InetAddress, Integer> outgoingStreamCounts,
+                                                              Map<InetAddressAndPort, Integer> outgoingStreamCounts,
                                                               PreferedNodeFilter filter)
     {
-        Set<InetAddress> retSet = new HashSet<>();
-        for (Set<InetAddress> toStream : toStreamFrom.allStreams())
+        Set<InetAddressAndPort> retSet = new HashSet<>();
+        for (Set<InetAddressAndPort> toStream : toStreamFrom.allStreams())
         {
-            InetAddress candidate = null;
-            Set<InetAddress> prefered = filter.apply(streamingNode, toStream);
-            for (InetAddress node : prefered)
+            InetAddressAndPort candidate = null;
+            Set<InetAddressAndPort> prefered = filter.apply(streamingNode, toStream);
+            for (InetAddressAndPort node : prefered)
             {
                 if (candidate == null || outgoingStreamCounts.getOrDefault(candidate, 0) > outgoingStreamCounts.getOrDefault(node, 0))
                 {
@@ -120,7 +120,7 @@ public class ReduceHelper
             // ok, found no prefered hosts, try all of them
             if (candidate == null)
             {
-                for (InetAddress node : toStream)
+                for (InetAddressAndPort node : toStream)
                 {
                     if (candidate == null || outgoingStreamCounts.getOrDefault(candidate, 0) > outgoingStreamCounts.getOrDefault(node, 0))
                     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java b/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java
index 4516f23..6070983 100644
--- a/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java
+++ b/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.repair.asymmetric;
 
-import java.net.InetAddress;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
@@ -28,6 +27,7 @@ import com.google.common.collect.Sets;
 
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
 
 /**
  * Keeps track of where a node needs to stream a given range from.
@@ -53,18 +53,18 @@ public class StreamFromOptions
     /**
      * Contains the hosts to stream from - if two nodes are in the same inner set, they are identical for the range we are handling
      */
-    private final Set<Set<InetAddress>> streamOptions = new HashSet<>();
+    private final Set<Set<InetAddressAndPort>> streamOptions = new HashSet<>();
 
     public StreamFromOptions(DifferenceHolder differences, Range<Token> range)
     {
         this(differences, range, Collections.emptySet());
     }
 
-    private StreamFromOptions(DifferenceHolder differences, Range<Token> range, Set<Set<InetAddress>> existing)
+    private StreamFromOptions(DifferenceHolder differences, Range<Token> range, Set<Set<InetAddressAndPort>> existing)
     {
         this.differences = differences;
         this.range = range;
-        for (Set<InetAddress> addresses : existing)
+        for (Set<InetAddressAndPort> addresses : existing)
             this.streamOptions.add(Sets.newHashSet(addresses));
     }
 
@@ -75,11 +75,11 @@ public class StreamFromOptions
      * range we are tracking, then just add it to the set with the identical remote nodes. Otherwise create a new group
      * of nodes containing this new node.
      */
-    public void add(InetAddress streamFromNode)
+    public void add(InetAddressAndPort streamFromNode)
     {
-        for (Set<InetAddress> options : streamOptions)
+        for (Set<InetAddressAndPort> options : streamOptions)
         {
-            InetAddress first = options.iterator().next();
+            InetAddressAndPort first = options.iterator().next();
             if (!differences.hasDifferenceBetween(first, streamFromNode, range))
             {
                 options.add(streamFromNode);
@@ -94,7 +94,7 @@ public class StreamFromOptions
         return new StreamFromOptions(differences, withRange, streamOptions);
     }
 
-    public Iterable<Set<InetAddress>> allStreams()
+    public Iterable<Set<InetAddressAndPort>> allStreams()
     {
         return streamOptions;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
index c137346..78057e2 100644
--- a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
+++ b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
@@ -33,6 +33,7 @@ import com.google.common.collect.Iterables;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.messages.FailSession;
 import org.apache.cassandra.repair.messages.FinalizeCommit;
 import org.apache.cassandra.repair.messages.FinalizePromise;
@@ -55,7 +56,7 @@ import org.apache.cassandra.tools.nodetool.RepairAdmin;
  * There are 4 stages to a consistent incremental repair.
  *
  * <h1>Repair prepare</h1>
- *  First, the normal {@link ActiveRepairService#prepareForRepair(UUID, InetAddress, Set, RepairOption, List)} stuff
+ *  First, the normal {@link ActiveRepairService#prepareForRepair(UUID, InetAddressAndPort, Set, RepairOption, List)} stuff
  *  happens, which sends out {@link PrepareMessage} and creates a {@link ActiveRepairService.ParentRepairSession}
  *  on the coordinator and each of the neighbors.
  *
@@ -68,7 +69,7 @@ import org.apache.cassandra.tools.nodetool.RepairAdmin;
  *  coordinator indicating success or failure. If the pending anti-compaction fails, the local session state is set
  *  to {@code FAILED}.
  *  <p/>
- *  (see {@link LocalSessions#handlePrepareMessage(InetAddress, PrepareConsistentRequest)}
+ *  (see {@link LocalSessions#handlePrepareMessage(InetAddressAndPort, PrepareConsistentRequest)}
  *  <p/>
  *  Once the coordinator recieves positive {@code PrepareConsistentResponse} messages from all the participants, the
  *  coordinator begins the normal repair process.
@@ -99,8 +100,8 @@ import org.apache.cassandra.tools.nodetool.RepairAdmin;
  *  & {@link CoordinatorSession#finalizeCommit()}
  *  <p/>
  *
- *  On the local session side, see {@link LocalSessions#handleFinalizeProposeMessage(InetAddress, FinalizePropose)}
- *  & {@link LocalSessions#handleFinalizeCommitMessage(InetAddress, FinalizeCommit)}
+ *  On the local session side, see {@link LocalSessions#handleFinalizeProposeMessage(InetAddressAndPort, FinalizePropose)}
+ *  & {@link LocalSessions#handleFinalizeCommitMessage(InetAddressAndPort, FinalizeCommit)}
  *
  * <h1>Failure</h1>
  *  If there are any failures or problems during the process above, the session will be failed. When a session is failed,
@@ -187,11 +188,11 @@ public abstract class ConsistentSession
 
     private volatile State state;
     public final UUID sessionID;
-    public final InetAddress coordinator;
+    public final InetAddressAndPort coordinator;
     public final ImmutableSet<TableId> tableIds;
     public final long repairedAt;
     public final ImmutableSet<Range<Token>> ranges;
-    public final ImmutableSet<InetAddress> participants;
+    public final ImmutableSet<InetAddressAndPort> participants;
 
     ConsistentSession(AbstractBuilder builder)
     {
@@ -260,11 +261,11 @@ public abstract class ConsistentSession
     {
         private State state;
         private UUID sessionID;
-        private InetAddress coordinator;
+        private InetAddressAndPort coordinator;
         private Set<TableId> ids;
         private long repairedAt;
         private Collection<Range<Token>> ranges;
-        private Set<InetAddress> participants;
+        private Set<InetAddressAndPort> participants;
 
         void withState(State state)
         {
@@ -276,7 +277,7 @@ public abstract class ConsistentSession
             this.sessionID = sessionID;
         }
 
-        void withCoordinator(InetAddress coordinator)
+        void withCoordinator(InetAddressAndPort coordinator)
         {
             this.coordinator = coordinator;
         }
@@ -301,7 +302,7 @@ public abstract class ConsistentSession
             this.ranges = ranges;
         }
 
-        void withParticipants(Set<InetAddress> peers)
+        void withParticipants(Set<InetAddressAndPort> peers)
         {
             this.participants = peers;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
index ba0025f..f52a28d 100644
--- a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
+++ b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.repair.consistent;
 
-import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -39,6 +38,7 @@ import org.apache.commons.lang3.time.DurationFormatUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.RepairSessionResult;
@@ -58,7 +58,7 @@ public class CoordinatorSession extends ConsistentSession
 {
     private static final Logger logger = LoggerFactory.getLogger(CoordinatorSession.class);
 
-    private final Map<InetAddress, State> participantStates = new HashMap<>();
+    private final Map<InetAddressAndPort, State> participantStates = new HashMap<>();
     private final SettableFuture<Boolean> prepareFuture = SettableFuture.create();
     private final SettableFuture<Boolean> finalizeProposeFuture = SettableFuture.create();
 
@@ -69,7 +69,7 @@ public class CoordinatorSession extends ConsistentSession
     public CoordinatorSession(Builder builder)
     {
         super(builder);
-        for (InetAddress participant : participants)
+        for (InetAddressAndPort participant : participants)
         {
             participantStates.put(participant, State.PREPARING);
         }
@@ -95,7 +95,7 @@ public class CoordinatorSession extends ConsistentSession
         super.setState(state);
     }
 
-    public synchronized void setParticipantState(InetAddress participant, State state)
+    public synchronized void setParticipantState(InetAddressAndPort participant, State state)
     {
         logger.trace("Setting participant {} to state {} for repair {}", participant, state, sessionID);
         Preconditions.checkArgument(participantStates.containsKey(participant),
@@ -115,7 +115,7 @@ public class CoordinatorSession extends ConsistentSession
 
     synchronized void setAll(State state)
     {
-        for (InetAddress participant : participants)
+        for (InetAddressAndPort participant : participants)
         {
             setParticipantState(participant, state);
         }
@@ -131,7 +131,7 @@ public class CoordinatorSession extends ConsistentSession
         return getState() == State.FAILED || Iterables.any(participantStates.values(), v -> v == State.FAILED);
     }
 
-    protected void sendMessage(InetAddress destination, RepairMessage message)
+    protected void sendMessage(InetAddressAndPort destination, RepairMessage message)
     {
         logger.trace("Sending {} to {}", message, destination);
         MessageOut<RepairMessage> messageOut = new MessageOut<RepairMessage>(MessagingService.Verb.REPAIR_MESSAGE, message, RepairMessage.serializer);
@@ -144,14 +144,14 @@ public class CoordinatorSession extends ConsistentSession
 
         logger.debug("Beginning prepare phase of incremental repair session {}", sessionID);
         PrepareConsistentRequest message = new PrepareConsistentRequest(sessionID, coordinator, participants);
-        for (final InetAddress participant : participants)
+        for (final InetAddressAndPort participant : participants)
         {
             sendMessage(participant, message);
         }
         return prepareFuture;
     }
 
-    public synchronized void handlePrepareResponse(InetAddress participant, boolean success)
+    public synchronized void handlePrepareResponse(InetAddressAndPort participant, boolean success)
     {
         if (getState() == State.FAILED)
         {
@@ -185,14 +185,14 @@ public class CoordinatorSession extends ConsistentSession
         Preconditions.checkArgument(allStates(State.REPAIRING));
         logger.debug("Proposing finalization of repair session {}", sessionID);
         FinalizePropose message = new FinalizePropose(sessionID);
-        for (final InetAddress participant : participants)
+        for (final InetAddressAndPort participant : participants)
         {
             sendMessage(participant, message);
         }
         return finalizeProposeFuture;
     }
 
-    public synchronized void handleFinalizePromise(InetAddress participant, boolean success)
+    public synchronized void handleFinalizePromise(InetAddressAndPort participant, boolean success)
     {
         if (getState() == State.FAILED)
         {
@@ -221,7 +221,7 @@ public class CoordinatorSession extends ConsistentSession
         Preconditions.checkArgument(allStates(State.FINALIZE_PROMISED));
         logger.debug("Committing finalization of repair session {}", sessionID);
         FinalizeCommit message = new FinalizeCommit(sessionID);
-        for (final InetAddress participant : participants)
+        for (final InetAddressAndPort participant : participants)
         {
             sendMessage(participant, message);
         }
@@ -233,7 +233,7 @@ public class CoordinatorSession extends ConsistentSession
     {
         logger.info("Incremental repair session {} failed", sessionID);
         FailSession message = new FailSession(sessionID);
-        for (final InetAddress participant : participants)
+        for (final InetAddressAndPort participant : participants)
         {
             if (participantStates.get(participant) != State.FAILED)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java
index 211e0c1..bb84d0a 100644
--- a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.repair.consistent;
 
-import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -26,6 +25,7 @@ import java.util.UUID;
 
 import com.google.common.base.Preconditions;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.messages.FailSession;
 import org.apache.cassandra.repair.messages.FinalizePromise;
 import org.apache.cassandra.repair.messages.PrepareConsistentResponse;
@@ -43,7 +43,7 @@ public class CoordinatorSessions
         return new CoordinatorSession(builder);
     }
 
-    public synchronized CoordinatorSession registerSession(UUID sessionId, Set<InetAddress> participants)
+    public synchronized CoordinatorSession registerSession(UUID sessionId, Set<InetAddressAndPort> participants)
     {
         Preconditions.checkArgument(!sessions.containsKey(sessionId), "A coordinator already exists for session %s", sessionId);
         ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionId);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/consistent/LocalSessionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessionInfo.java b/src/java/org/apache/cassandra/repair/consistent/LocalSessionInfo.java
index 903aeb5..98b883a 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSessionInfo.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessionInfo.java
@@ -21,10 +21,12 @@ package org.apache.cassandra.repair.consistent;
 import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.Iterables;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
@@ -40,6 +42,7 @@ public class LocalSessionInfo
     public static final String LAST_UPDATE = "LAST_UPDATE";
     public static final String COORDINATOR = "COORDINATOR";
     public static final String PARTICIPANTS = "PARTICIPANTS";
+    public static final String PARTICIPANTS_WP = "PARTICIPANTS_WP";
     public static final String TABLES = "TABLES";
 
 
@@ -59,7 +62,8 @@ public class LocalSessionInfo
         m.put(STARTED, Integer.toString(session.getStartedAt()));
         m.put(LAST_UPDATE, Integer.toString(session.getLastUpdate()));
         m.put(COORDINATOR, session.coordinator.toString());
-        m.put(PARTICIPANTS, Joiner.on(',').join(Iterables.transform(session.participants, InetAddress::toString)));
+        m.put(PARTICIPANTS, Joiner.on(',').join(Iterables.transform(session.participants.stream().map(peer -> peer.address).collect(Collectors.toList()), InetAddress::getHostAddress)));
+        m.put(PARTICIPANTS_WP, Joiner.on(',').join(Iterables.transform(session.participants, InetAddressAndPort::toString)));
         m.put(TABLES, Joiner.on(',').join(Iterables.transform(session.tableIds, LocalSessionInfo::tableString)));
 
         return m;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
index 4ef2c2c..e62f6fd 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.repair.consistent;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.time.Instant;
 import java.util.Date;
@@ -33,6 +34,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -49,13 +51,18 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.marshal.InetAddressType;
 import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
@@ -141,13 +148,13 @@ public class LocalSessions
     }
 
     @VisibleForTesting
-    protected InetAddress getBroadcastAddress()
+    protected InetAddressAndPort getBroadcastAddressAndPort()
     {
-        return FBUtilities.getBroadcastAddress();
+        return FBUtilities.getBroadcastAddressAndPort();
     }
 
     @VisibleForTesting
-    protected boolean isAlive(InetAddress address)
+    protected boolean isAlive(InetAddressAndPort address)
     {
         return FailureDetector.instance.isAlive(address);
     }
@@ -177,14 +184,14 @@ public class LocalSessions
         logger.info("Cancelling local repair session {}", sessionID);
         LocalSession session = getSession(sessionID);
         Preconditions.checkArgument(session != null, "Session {} does not exist", sessionID);
-        Preconditions.checkArgument(force || session.coordinator.equals(getBroadcastAddress()),
+        Preconditions.checkArgument(force || session.coordinator.equals(getBroadcastAddressAndPort()),
                                     "Cancel session %s from it's coordinator (%s) or use --force",
                                     sessionID, session.coordinator);
 
         setStateAndSave(session, FAILED);
-        for (InetAddress participant : session.participants)
+        for (InetAddressAndPort participant : session.participants)
         {
-            if (!participant.equals(getBroadcastAddress()))
+            if (!participant.equals(getBroadcastAddressAndPort()))
                 sendMessage(participant, new FailSession(sessionID));
         }
     }
@@ -335,10 +342,12 @@ public class LocalSessions
                        "repaired_at, " +
                        "state, " +
                        "coordinator, " +
+                       "coordinator_port, " +
                        "participants, " +
+                       "participants_wp," +
                        "ranges, " +
                        "cfids) " +
-                       "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+                       "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
 
         QueryProcessor.executeInternal(String.format(query, keyspace, table),
                                        session.sessionID,
@@ -346,8 +355,10 @@ public class LocalSessions
                                        Date.from(Instant.ofEpochSecond(session.getLastUpdate())),
                                        Date.from(Instant.ofEpochMilli(session.repairedAt)),
                                        session.getState().ordinal(),
-                                       session.coordinator,
-                                       session.participants,
+                                       session.coordinator.address,
+                                       session.coordinator.port,
+                                       session.participants.stream().map(participant -> participant.address).collect(Collectors.toSet()),
+                                       session.participants.stream().map(participant -> participant.toString()).collect(Collectors.toSet()),
                                        serializeRanges(session.ranges),
                                        tableIdToUuid(session.tableIds));
     }
@@ -362,12 +373,27 @@ public class LocalSessions
         LocalSession.Builder builder = LocalSession.builder();
         builder.withState(ConsistentSession.State.valueOf(row.getInt("state")));
         builder.withSessionID(row.getUUID("parent_id"));
-        builder.withCoordinator(row.getInetAddress("coordinator"));
+        InetAddressAndPort coordinator = InetAddressAndPort.getByAddressOverrideDefaults(
+            row.getInetAddress("coordinator"),
+            row.getInt("coordinator_port"));
+        builder.withCoordinator(coordinator);
         builder.withTableIds(uuidToTableId(row.getSet("cfids", UUIDType.instance)));
         builder.withRepairedAt(row.getTimestamp("repaired_at").getTime());
         builder.withRanges(deserializeRanges(row.getSet("ranges", BytesType.instance)));
-        builder.withParticipants(row.getSet("participants", InetAddressType.instance));
-
+        //There is no cross version streaming and thus no cross version repair so assume that
+        //any valid repair sessions has the participants_wp column and any that doesn't is malformed
+        Set<String> participants = row.getSet("participants_wp", UTF8Type.instance);
+        builder.withParticipants(participants.stream().map(participant ->
+                                                             {
+                                                                 try
+                                                                 {
+                                                                     return InetAddressAndPort.getByName(participant);
+                                                                 }
+                                                                 catch (UnknownHostException e)
+                                                                 {
+                                                                     throw new RuntimeException(e);
+                                                                 }
+                                                             }).collect(Collectors.toSet()));
         builder.withStartedAt(dateToSeconds(row.getTimestamp("started_at")));
         builder.withLastUpdate(dateToSeconds(row.getTimestamp("last_update")));
 
@@ -440,7 +466,7 @@ public class LocalSessions
     }
 
     @VisibleForTesting
-    LocalSession createSessionUnsafe(UUID sessionId, ActiveRepairService.ParentRepairSession prs, Set<InetAddress> peers)
+    LocalSession createSessionUnsafe(UUID sessionId, ActiveRepairService.ParentRepairSession prs, Set<InetAddressAndPort> peers)
     {
         LocalSession.Builder builder = LocalSession.builder();
         builder.withState(ConsistentSession.State.PREPARING);
@@ -464,7 +490,7 @@ public class LocalSessions
         return ActiveRepairService.instance.getParentRepairSession(sessionID);
     }
 
-    protected void sendMessage(InetAddress destination, RepairMessage message)
+    protected void sendMessage(InetAddressAndPort destination, RepairMessage message)
     {
         logger.trace("sending {} to {}", message, destination);
         MessageOut<RepairMessage> messageOut = new MessageOut<RepairMessage>(MessagingService.Verb.REPAIR_MESSAGE, message, RepairMessage.serializer);
@@ -536,12 +562,12 @@ public class LocalSessions
      * successfully. If the pending anti compaction fails, a failure message is sent to the coordinator,
      * cancelling the session.
      */
-    public void handlePrepareMessage(InetAddress from, PrepareConsistentRequest request)
+    public void handlePrepareMessage(InetAddressAndPort from, PrepareConsistentRequest request)
     {
         logger.trace("received {} from {}", request, from);
         UUID sessionID = request.parentSession;
-        InetAddress coordinator = request.coordinator;
-        Set<InetAddress> peers = request.participants;
+        InetAddressAndPort coordinator = request.coordinator;
+        Set<InetAddressAndPort> peers = request.participants;
 
         ActiveRepairService.ParentRepairSession parentSession;
         try
@@ -568,7 +594,7 @@ public class LocalSessions
             {
                 logger.debug("Prepare phase for incremental repair session {} completed", sessionID);
                 setStateAndSave(session, PREPARED);
-                sendMessage(coordinator, new PrepareConsistentResponse(sessionID, getBroadcastAddress(), true));
+                sendMessage(coordinator, new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), true));
                 executor.shutdown();
             }
 
@@ -587,7 +613,7 @@ public class LocalSessions
                 {
                     logger.error("Prepare phase for incremental repair session {} failed", sessionID, t);
                 }
-                sendMessage(coordinator, new PrepareConsistentResponse(sessionID, getBroadcastAddress(), false));
+                sendMessage(coordinator, new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), false));
                 failSession(sessionID, false);
                 executor.shutdown();
             }
@@ -604,7 +630,7 @@ public class LocalSessions
         }
     }
 
-    public void handleFinalizeProposeMessage(InetAddress from, FinalizePropose propose)
+    public void handleFinalizeProposeMessage(InetAddressAndPort from, FinalizePropose propose)
     {
         logger.trace("received {} from {}", propose, from);
         UUID sessionID = propose.sessionID;
@@ -629,7 +655,7 @@ public class LocalSessions
              */
             syncTable();
 
-            sendMessage(from, new FinalizePromise(sessionID, getBroadcastAddress(), true));
+            sendMessage(from, new FinalizePromise(sessionID, getBroadcastAddressAndPort(), true));
             logger.debug("Received FinalizePropose message for incremental repair session {}, responded with FinalizePromise", sessionID);
         }
         catch (IllegalArgumentException e)
@@ -659,7 +685,7 @@ public class LocalSessions
      * as part of the compaction process, and avoids having to worry about in progress compactions interfering with the
      * promotion.
      */
-    public void handleFinalizeCommitMessage(InetAddress from, FinalizeCommit commit)
+    public void handleFinalizeCommitMessage(InetAddressAndPort from, FinalizeCommit commit)
     {
         logger.trace("received {} from {}", commit, from);
         UUID sessionID = commit.sessionID;
@@ -674,7 +700,7 @@ public class LocalSessions
         logger.info("Finalized local repair session {}", sessionID);
     }
 
-    public void handleFailSessionMessage(InetAddress from, FailSession msg)
+    public void handleFailSessionMessage(InetAddressAndPort from, FailSession msg)
     {
         logger.trace("received {} from {}", msg, from);
         failSession(msg.sessionID, false);
@@ -684,16 +710,16 @@ public class LocalSessions
     {
         logger.debug("Attempting to learn the outcome of unfinished local incremental repair session {}", session.sessionID);
         StatusRequest request = new StatusRequest(session.sessionID);
-        for (InetAddress participant : session.participants)
+        for (InetAddressAndPort participant : session.participants)
         {
-            if (!getBroadcastAddress().equals(participant) && isAlive(participant))
+            if (!getBroadcastAddressAndPort().equals(participant) && isAlive(participant))
             {
                 sendMessage(participant, request);
             }
         }
     }
 
-    public void handleStatusRequest(InetAddress from, StatusRequest request)
+    public void handleStatusRequest(InetAddressAndPort from, StatusRequest request)
     {
         logger.trace("received {} from {}", request, from);
         UUID sessionID = request.sessionID;
@@ -710,7 +736,7 @@ public class LocalSessions
        }
     }
 
-    public void handleStatusResponse(InetAddress from, StatusResponse response)
+    public void handleStatusResponse(InetAddressAndPort from, StatusResponse response)
     {
         logger.trace("received {} from {}", response, from);
         UUID sessionID = response.sessionID;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java b/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java
index 015b558..156fde7 100644
--- a/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java
+++ b/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.repair.consistent;
 
-import java.net.InetAddress;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -27,6 +26,7 @@ import java.util.Objects;
 
 import com.google.common.collect.Lists;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.RepairResult;
 import org.apache.cassandra.repair.RepairSessionResult;
 import org.apache.cassandra.repair.SyncStat;
@@ -42,14 +42,14 @@ public class SyncStatSummary
 
     private static class Session
     {
-        final InetAddress src;
-        final InetAddress dst;
+        final InetAddressAndPort src;
+        final InetAddressAndPort dst;
 
         int files = 0;
         long bytes = 0;
         long ranges = 0;
 
-        Session(InetAddress src, InetAddress dst)
+        Session(InetAddressAndPort src, InetAddressAndPort dst)
         {
             this.src = src;
             this.dst = dst;
@@ -84,7 +84,7 @@ public class SyncStatSummary
         int ranges = -1;
         boolean totalsCalculated = false;
 
-        final Map<Pair<InetAddress, InetAddress>, Session> sessions = new HashMap<>();
+        final Map<Pair<InetAddressAndPort, InetAddressAndPort>, Session> sessions = new HashMap<>();
 
         Table(String keyspace, String table)
         {
@@ -92,9 +92,9 @@ public class SyncStatSummary
             this.table = table;
         }
 
-        Session getOrCreate(InetAddress from, InetAddress to)
+        Session getOrCreate(InetAddressAndPort from, InetAddressAndPort to)
         {
-            Pair<InetAddress, InetAddress> k = Pair.create(from, to);
+            Pair<InetAddressAndPort, InetAddressAndPort> k = Pair.create(from, to);
             if (!sessions.containsKey(k))
             {
                 sessions.put(k, new Session(from, to));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java b/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java
index b75ad7f..6d76269 100644
--- a/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.RepairJobDesc;
@@ -39,13 +40,13 @@ public class AsymmetricSyncRequest extends RepairMessage
 {
     public static MessageSerializer serializer = new SyncRequestSerializer();
 
-    public final InetAddress initiator;
-    public final InetAddress fetchingNode;
-    public final InetAddress fetchFrom;
+    public final InetAddressAndPort initiator;
+    public final InetAddressAndPort fetchingNode;
+    public final InetAddressAndPort fetchFrom;
     public final Collection<Range<Token>> ranges;
     public final PreviewKind previewKind;
 
-    public AsymmetricSyncRequest(RepairJobDesc desc, InetAddress initiator, InetAddress fetchingNode, InetAddress fetchFrom, Collection<Range<Token>> ranges, PreviewKind previewKind)
+    public AsymmetricSyncRequest(RepairJobDesc desc, InetAddressAndPort initiator, InetAddressAndPort fetchingNode, InetAddressAndPort fetchFrom, Collection<Range<Token>> ranges, PreviewKind previewKind)
     {
         super(Type.ASYMMETRIC_SYNC_REQUEST, desc);
         this.initiator = initiator;
@@ -80,9 +81,9 @@ public class AsymmetricSyncRequest extends RepairMessage
         public void serialize(AsymmetricSyncRequest message, DataOutputPlus out, int version) throws IOException
         {
             RepairJobDesc.serializer.serialize(message.desc, out, version);
-            CompactEndpointSerializationHelper.serialize(message.initiator, out);
-            CompactEndpointSerializationHelper.serialize(message.fetchingNode, out);
-            CompactEndpointSerializationHelper.serialize(message.fetchFrom, out);
+            CompactEndpointSerializationHelper.instance.serialize(message.initiator, out, version);
+            CompactEndpointSerializationHelper.instance.serialize(message.fetchingNode, out, version);
+            CompactEndpointSerializationHelper.instance.serialize(message.fetchFrom, out, version);
             out.writeInt(message.ranges.size());
             for (Range<Token> range : message.ranges)
             {
@@ -95,9 +96,9 @@ public class AsymmetricSyncRequest extends RepairMessage
         public AsymmetricSyncRequest deserialize(DataInputPlus in, int version) throws IOException
         {
             RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version);
-            InetAddress owner = CompactEndpointSerializationHelper.deserialize(in);
-            InetAddress src = CompactEndpointSerializationHelper.deserialize(in);
-            InetAddress dst = CompactEndpointSerializationHelper.deserialize(in);
+            InetAddressAndPort owner = CompactEndpointSerializationHelper.instance.deserialize(in, version);
+            InetAddressAndPort src = CompactEndpointSerializationHelper.instance.deserialize(in, version);
+            InetAddressAndPort dst = CompactEndpointSerializationHelper.instance.deserialize(in, version);
             int rangesCount = in.readInt();
             List<Range<Token>> ranges = new ArrayList<>(rangesCount);
             for (int i = 0; i < rangesCount; ++i)
@@ -109,7 +110,9 @@ public class AsymmetricSyncRequest extends RepairMessage
         public long serializedSize(AsymmetricSyncRequest message, int version)
         {
             long size = RepairJobDesc.serializer.serializedSize(message.desc, version);
-            size += 3 * CompactEndpointSerializationHelper.serializedSize(message.initiator);
+            size += CompactEndpointSerializationHelper.instance.serializedSize(message.initiator, version);
+            size += CompactEndpointSerializationHelper.instance.serializedSize(message.fetchingNode, version);
+            size += CompactEndpointSerializationHelper.instance.serializedSize(message.fetchFrom, version);
             size += TypeSizes.sizeof(message.ranges.size());
             for (Range<Token> range : message.ranges)
                 size += AbstractBounds.tokenSerializer.serializedSize(range, version);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java b/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java
index 6c28347..449748a 100644
--- a/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java
+++ b/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java
@@ -19,24 +19,22 @@
 package org.apache.cassandra.repair.messages;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.UUID;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.serializers.InetAddressSerializer;
-import org.apache.cassandra.serializers.TypeSerializer;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.utils.UUIDSerializer;
 
 public class FinalizePromise extends RepairMessage
 {
     public final UUID sessionID;
-    public final InetAddress participant;
+    public final InetAddressAndPort participant;
     public final boolean promised;
 
-    public FinalizePromise(UUID sessionID, InetAddress participant, boolean promised)
+    public FinalizePromise(UUID sessionID, InetAddressAndPort participant, boolean promised)
     {
         super(Type.FINALIZE_PROMISE, null);
         assert sessionID != null;
@@ -68,26 +66,24 @@ public class FinalizePromise extends RepairMessage
 
     public static MessageSerializer serializer = new MessageSerializer<FinalizePromise>()
     {
-        private TypeSerializer<InetAddress> inetSerializer = InetAddressSerializer.instance;
-
         public void serialize(FinalizePromise msg, DataOutputPlus out, int version) throws IOException
         {
             UUIDSerializer.serializer.serialize(msg.sessionID, out, version);
-            ByteBufferUtil.writeWithShortLength(inetSerializer.serialize(msg.participant), out);
+            CompactEndpointSerializationHelper.instance.serialize(msg.participant, out, version);
             out.writeBoolean(msg.promised);
         }
 
         public FinalizePromise deserialize(DataInputPlus in, int version) throws IOException
         {
             return new FinalizePromise(UUIDSerializer.serializer.deserialize(in, version),
-                                       inetSerializer.deserialize(ByteBufferUtil.readWithShortLength(in)),
+                                       CompactEndpointSerializationHelper.instance.deserialize(in, version),
                                        in.readBoolean());
         }
 
         public long serializedSize(FinalizePromise msg, int version)
         {
             long size = UUIDSerializer.serializer.serializedSize(msg.sessionID, version);
-            size += ByteBufferUtil.serializedSizeWithShortLength(inetSerializer.serialize(msg.participant));
+            size += CompactEndpointSerializationHelper.instance.serializedSize(msg.participant, version);
             size += TypeSizes.sizeof(msg.promised);
             return size;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java
index 57056ef..9aae256 100644
--- a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java
@@ -19,7 +19,6 @@
 package org.apache.cassandra.repair.messages;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
@@ -29,18 +28,17 @@ import com.google.common.collect.ImmutableSet;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.serializers.InetAddressSerializer;
-import org.apache.cassandra.serializers.TypeSerializer;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.utils.UUIDSerializer;
 
 public class PrepareConsistentRequest extends RepairMessage
 {
     public final UUID parentSession;
-    public final InetAddress coordinator;
-    public final Set<InetAddress> participants;
+    public final InetAddressAndPort coordinator;
+    public final Set<InetAddressAndPort> participants;
 
-    public PrepareConsistentRequest(UUID parentSession, InetAddress coordinator, Set<InetAddress> participants)
+    public PrepareConsistentRequest(UUID parentSession, InetAddressAndPort coordinator, Set<InetAddressAndPort> participants)
     {
         super(Type.CONSISTENT_REQUEST, null);
         assert parentSession != null;
@@ -82,28 +80,27 @@ public class PrepareConsistentRequest extends RepairMessage
 
     public static MessageSerializer serializer = new MessageSerializer<PrepareConsistentRequest>()
     {
-        private TypeSerializer<InetAddress> inetSerializer = InetAddressSerializer.instance;
 
         public void serialize(PrepareConsistentRequest request, DataOutputPlus out, int version) throws IOException
         {
             UUIDSerializer.serializer.serialize(request.parentSession, out, version);
-            ByteBufferUtil.writeWithShortLength(inetSerializer.serialize(request.coordinator), out);
+            CompactEndpointSerializationHelper.instance.serialize(request.coordinator, out, version);
             out.writeInt(request.participants.size());
-            for (InetAddress peer : request.participants)
+            for (InetAddressAndPort peer : request.participants)
             {
-                ByteBufferUtil.writeWithShortLength(inetSerializer.serialize(peer), out);
+                CompactEndpointSerializationHelper.instance.serialize(peer, out, version);
             }
         }
 
         public PrepareConsistentRequest deserialize(DataInputPlus in, int version) throws IOException
         {
             UUID sessionId = UUIDSerializer.serializer.deserialize(in, version);
-            InetAddress coordinator = inetSerializer.deserialize(ByteBufferUtil.readWithShortLength(in));
+            InetAddressAndPort coordinator = CompactEndpointSerializationHelper.instance.deserialize(in, version);
             int numPeers = in.readInt();
-            Set<InetAddress> peers = new HashSet<>(numPeers);
+            Set<InetAddressAndPort> peers = new HashSet<>(numPeers);
             for (int i = 0; i < numPeers; i++)
             {
-                InetAddress peer = inetSerializer.deserialize(ByteBufferUtil.readWithShortLength(in));
+                InetAddressAndPort peer = CompactEndpointSerializationHelper.instance.deserialize(in, version);
                 peers.add(peer);
             }
             return new PrepareConsistentRequest(sessionId, coordinator, peers);
@@ -112,11 +109,11 @@ public class PrepareConsistentRequest extends RepairMessage
         public long serializedSize(PrepareConsistentRequest request, int version)
         {
             long size = UUIDSerializer.serializer.serializedSize(request.parentSession, version);
-            size += ByteBufferUtil.serializedSizeWithShortLength(inetSerializer.serialize(request.coordinator));
+            size += CompactEndpointSerializationHelper.instance.serializedSize(request.coordinator, version);
             size += TypeSizes.sizeof(request.participants.size());
-            for (InetAddress peer : request.participants)
+            for (InetAddressAndPort peer : request.participants)
             {
-                size += ByteBufferUtil.serializedSizeWithShortLength(inetSerializer.serialize(peer));
+                size += CompactEndpointSerializationHelper.instance.serializedSize(peer, version);
             }
             return size;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java
index cf4410a..630f18e 100644
--- a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java
+++ b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java
@@ -19,24 +19,22 @@
 package org.apache.cassandra.repair.messages;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.UUID;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.serializers.InetAddressSerializer;
-import org.apache.cassandra.serializers.TypeSerializer;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.utils.UUIDSerializer;
 
 public class PrepareConsistentResponse extends RepairMessage
 {
     public final UUID parentSession;
-    public final InetAddress participant;
+    public final InetAddressAndPort participant;
     public final boolean success;
 
-    public PrepareConsistentResponse(UUID parentSession, InetAddress participant, boolean success)
+    public PrepareConsistentResponse(UUID parentSession, InetAddressAndPort participant, boolean success)
     {
         super(Type.CONSISTENT_RESPONSE, null);
         assert parentSession != null;
@@ -68,25 +66,24 @@ public class PrepareConsistentResponse extends RepairMessage
 
     public static MessageSerializer serializer = new MessageSerializer<PrepareConsistentResponse>()
     {
-        private TypeSerializer<InetAddress> inetSerializer = InetAddressSerializer.instance;
         public void serialize(PrepareConsistentResponse response, DataOutputPlus out, int version) throws IOException
         {
             UUIDSerializer.serializer.serialize(response.parentSession, out, version);
-            ByteBufferUtil.writeWithShortLength(inetSerializer.serialize(response.participant), out);
+            CompactEndpointSerializationHelper.instance.serialize(response.participant, out, version);
             out.writeBoolean(response.success);
         }
 
         public PrepareConsistentResponse deserialize(DataInputPlus in, int version) throws IOException
         {
             return new PrepareConsistentResponse(UUIDSerializer.serializer.deserialize(in, version),
-                                                 inetSerializer.deserialize(ByteBufferUtil.readWithShortLength(in)),
+                                                 CompactEndpointSerializationHelper.instance.deserialize(in, version),
                                                  in.readBoolean());
         }
 
         public long serializedSize(PrepareConsistentResponse response, int version)
         {
             long size = UUIDSerializer.serializer.serializedSize(response.parentSession, version);
-            size += ByteBufferUtil.serializedSizeWithShortLength(inetSerializer.serialize(response.participant));
+            size += CompactEndpointSerializationHelper.instance.serializedSize(response.participant, version);
             size += TypeSizes.sizeof(response.success);
             return size;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
index 7b68daf..1f1344d 100644
--- a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
+++ b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.repair.messages;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
@@ -26,6 +25,7 @@ import java.util.Objects;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.NodePair;
 import org.apache.cassandra.repair.RepairJobDesc;
 import org.apache.cassandra.streaming.SessionSummary;
@@ -53,7 +53,7 @@ public class SyncComplete extends RepairMessage
         this.summaries = summaries;
     }
 
-    public SyncComplete(RepairJobDesc desc, InetAddress endpoint1, InetAddress endpoint2, boolean success, List<SessionSummary> summaries)
+    public SyncComplete(RepairJobDesc desc, InetAddressAndPort endpoint1, InetAddressAndPort endpoint2, boolean success, List<SessionSummary> summaries)
     {
         super(Type.SYNC_COMPLETE, desc);
         this.summaries = summaries;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
index 01601e2..a0bf4e2 100644
--- a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.repair.messages;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -30,6 +29,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.RepairJobDesc;
@@ -45,14 +45,14 @@ public class SyncRequest extends RepairMessage
 {
     public static MessageSerializer serializer = new SyncRequestSerializer();
 
-    public final InetAddress initiator;
-    public final InetAddress src;
-    public final InetAddress dst;
+    public final InetAddressAndPort initiator;
+    public final InetAddressAndPort src;
+    public final InetAddressAndPort dst;
     public final Collection<Range<Token>> ranges;
     public final PreviewKind previewKind;
 
-    public SyncRequest(RepairJobDesc desc, InetAddress initiator, InetAddress src, InetAddress dst, Collection<Range<Token>> ranges, PreviewKind previewKind)
-    {
+   public SyncRequest(RepairJobDesc desc, InetAddressAndPort initiator, InetAddressAndPort src, InetAddressAndPort dst, Collection<Range<Token>> ranges, PreviewKind previewKind)
+   {
         super(Type.SYNC_REQUEST, desc);
         this.initiator = initiator;
         this.src = src;
@@ -87,9 +87,9 @@ public class SyncRequest extends RepairMessage
         public void serialize(SyncRequest message, DataOutputPlus out, int version) throws IOException
         {
             RepairJobDesc.serializer.serialize(message.desc, out, version);
-            CompactEndpointSerializationHelper.serialize(message.initiator, out);
-            CompactEndpointSerializationHelper.serialize(message.src, out);
-            CompactEndpointSerializationHelper.serialize(message.dst, out);
+            CompactEndpointSerializationHelper.instance.serialize(message.initiator, out, version);
+            CompactEndpointSerializationHelper.instance.serialize(message.src, out, version);
+            CompactEndpointSerializationHelper.instance.serialize(message.dst, out, version);
             out.writeInt(message.ranges.size());
             for (Range<Token> range : message.ranges)
             {
@@ -102,9 +102,9 @@ public class SyncRequest extends RepairMessage
         public SyncRequest deserialize(DataInputPlus in, int version) throws IOException
         {
             RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version);
-            InetAddress owner = CompactEndpointSerializationHelper.deserialize(in);
-            InetAddress src = CompactEndpointSerializationHelper.deserialize(in);
-            InetAddress dst = CompactEndpointSerializationHelper.deserialize(in);
+            InetAddressAndPort owner = CompactEndpointSerializationHelper.instance.deserialize(in, version);
+            InetAddressAndPort src = CompactEndpointSerializationHelper.instance.deserialize(in, version);
+            InetAddressAndPort dst = CompactEndpointSerializationHelper.instance.deserialize(in, version);
             int rangesCount = in.readInt();
             List<Range<Token>> ranges = new ArrayList<>(rangesCount);
             for (int i = 0; i < rangesCount; ++i)
@@ -116,7 +116,7 @@ public class SyncRequest extends RepairMessage
         public long serializedSize(SyncRequest message, int version)
         {
             long size = RepairJobDesc.serializer.serializedSize(message.desc, version);
-            size += 3 * CompactEndpointSerializationHelper.serializedSize(message.initiator);
+            size += 3 * CompactEndpointSerializationHelper.instance.serializedSize(message.initiator, version);
             size += TypeSizes.sizeof(message.ranges.size());
             for (Range<Token> range : message.ranges)
                 size += AbstractBounds.tokenSerializer.serializedSize(range, version);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/schema/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/MigrationManager.java b/src/java/org/apache/cassandra/schema/MigrationManager.java
index ef19c25..c8881e5 100644
--- a/src/java/org/apache/cassandra/schema/MigrationManager.java
+++ b/src/java/org/apache/cassandra/schema/MigrationManager.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.schema;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.*;
 import java.lang.management.ManagementFactory;
@@ -39,6 +38,7 @@ import org.apache.cassandra.gms.*;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
@@ -59,10 +59,10 @@ public class MigrationManager
 
     private MigrationManager() {}
 
-    public static void scheduleSchemaPull(InetAddress endpoint, EndpointState state)
+    public static void scheduleSchemaPull(InetAddressAndPort endpoint, EndpointState state)
     {
         UUID schemaVersion = state.getSchemaVersion();
-        if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && schemaVersion != null)
+        if (!endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) && schemaVersion != null)
             maybeScheduleSchemaPull(schemaVersion, endpoint);
     }
 
@@ -70,7 +70,7 @@ public class MigrationManager
      * If versions differ this node sends request with local migration list to the endpoint
      * and expecting to receive a list of migrations to apply locally.
      */
-    private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint)
+    private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddressAndPort endpoint)
     {
         if (Schema.instance.getVersion() == null)
         {
@@ -130,7 +130,7 @@ public class MigrationManager
         }
     }
 
-    private static Future<?> submitMigrationTask(InetAddress endpoint)
+    private static Future<?> submitMigrationTask(InetAddressAndPort endpoint)
     {
         /*
          * Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are
@@ -139,7 +139,7 @@ public class MigrationManager
         return StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint));
     }
 
-    static boolean shouldPullSchemaFrom(InetAddress endpoint)
+    static boolean shouldPullSchemaFrom(InetAddressAndPort endpoint)
     {
         /*
          * Don't request schema from nodes with a differnt or unknonw major version (may have incompatible schema)
@@ -427,7 +427,7 @@ public class MigrationManager
             FBUtilities.waitOnFuture(announce(mutations));
     }
 
-    private static void pushSchemaMutation(InetAddress endpoint, Collection<Mutation> schema)
+    private static void pushSchemaMutation(InetAddressAndPort endpoint, Collection<Mutation> schema)
     {
         MessageOut<Collection<Mutation>> msg = new MessageOut<>(MessagingService.Verb.DEFINITIONS_UPDATE,
                                                                 schema,
@@ -446,10 +446,10 @@ public class MigrationManager
             }
         });
 
-        for (InetAddress endpoint : Gossiper.instance.getLiveMembers())
+        for (InetAddressAndPort endpoint : Gossiper.instance.getLiveMembers())
         {
             // only push schema to nodes with known and equal versions
-            if (!endpoint.equals(FBUtilities.getBroadcastAddress()) &&
+            if (!endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) &&
                     MessagingService.instance().knowsVersion(endpoint) &&
                     MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version)
                 pushSchemaMutation(endpoint, schema);
@@ -486,11 +486,11 @@ public class MigrationManager
 
         Schema.instance.clear();
 
-        Set<InetAddress> liveEndpoints = Gossiper.instance.getLiveMembers();
-        liveEndpoints.remove(FBUtilities.getBroadcastAddress());
+        Set<InetAddressAndPort> liveEndpoints = Gossiper.instance.getLiveMembers();
+        liveEndpoints.remove(FBUtilities.getBroadcastAddressAndPort());
 
         // force migration if there are nodes around
-        for (InetAddress node : liveEndpoints)
+        for (InetAddressAndPort node : liveEndpoints)
         {
             if (shouldPullSchemaFrom(node))
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/schema/MigrationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/MigrationTask.java b/src/java/org/apache/cassandra/schema/MigrationTask.java
index 73e396d..6ff206a 100644
--- a/src/java/org/apache/cassandra/schema/MigrationTask.java
+++ b/src/java/org/apache/cassandra/schema/MigrationTask.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.schema;
 
-import java.net.InetAddress;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.Set;
@@ -32,6 +31,7 @@ import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.SystemKeyspace.BootstrapState;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
@@ -46,9 +46,9 @@ final class MigrationTask extends WrappedRunnable
 
     private static final Set<BootstrapState> monitoringBootstrapStates = EnumSet.of(BootstrapState.NEEDS_BOOTSTRAP, BootstrapState.IN_PROGRESS);
 
-    private final InetAddress endpoint;
+    private final InetAddressAndPort endpoint;
 
-    MigrationTask(InetAddress endpoint)
+    MigrationTask(InetAddressAndPort endpoint)
     {
         this.endpoint = endpoint;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 7ff0b9b..e06131e 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.service;
 
-import java.net.InetAddress;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
@@ -38,6 +37,7 @@ import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.exceptions.ReadFailureException;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.metrics.ReadRepairMetrics;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
@@ -60,12 +60,12 @@ public abstract class AbstractReadExecutor
     private static final Logger logger = LoggerFactory.getLogger(AbstractReadExecutor.class);
 
     protected final ReadCommand command;
-    protected final List<InetAddress> targetReplicas;
+    protected final List<InetAddressAndPort> targetReplicas;
     protected final ReadCallback handler;
     protected final TraceState traceState;
     protected final ColumnFamilyStore cfs;
 
-    AbstractReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas, long queryStartNanoTime)
+    AbstractReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddressAndPort> targetReplicas, long queryStartNanoTime)
     {
         this.command = command;
         this.targetReplicas = targetReplicas;
@@ -78,27 +78,27 @@ public abstract class AbstractReadExecutor
         // TODO: we need this when talking with pre-3.0 nodes. So if we preserve the digest format moving forward, we can get rid of this once
         // we stop being compatible with pre-3.0 nodes.
         int digestVersion = MessagingService.current_version;
-        for (InetAddress replica : targetReplicas)
+        for (InetAddressAndPort replica : targetReplicas)
             digestVersion = Math.min(digestVersion, MessagingService.instance().getVersion(replica));
         command.setDigestVersion(digestVersion);
     }
 
-    protected void makeDataRequests(Iterable<InetAddress> endpoints)
+    protected void makeDataRequests(Iterable<InetAddressAndPort> endpoints)
     {
         makeRequests(command, endpoints);
 
     }
 
-    protected void makeDigestRequests(Iterable<InetAddress> endpoints)
+    protected void makeDigestRequests(Iterable<InetAddressAndPort> endpoints)
     {
         makeRequests(command.copyAsDigestQuery(), endpoints);
     }
 
-    private void makeRequests(ReadCommand readCommand, Iterable<InetAddress> endpoints)
+    private void makeRequests(ReadCommand readCommand, Iterable<InetAddressAndPort> endpoints)
     {
         boolean hasLocalEndpoint = false;
 
-        for (InetAddress endpoint : endpoints)
+        for (InetAddressAndPort endpoint : endpoints)
         {
             if (StorageProxy.canDoLocalRequest(endpoint))
             {
@@ -132,7 +132,7 @@ public abstract class AbstractReadExecutor
      *
      * @return target replicas + the extra replica, *IF* we speculated.
      */
-    public abstract Collection<InetAddress> getContactedReplicas();
+    public abstract Collection<InetAddressAndPort> getContactedReplicas();
 
     /**
      * send the initial set of requests
@@ -184,12 +184,12 @@ public abstract class AbstractReadExecutor
     public static AbstractReadExecutor getReadExecutor(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel, long queryStartNanoTime) throws UnavailableException
     {
         Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
-        List<InetAddress> allReplicas = StorageProxy.getLiveSortedEndpoints(keyspace, command.partitionKey());
+        List<InetAddressAndPort> allReplicas = StorageProxy.getLiveSortedEndpoints(keyspace, command.partitionKey());
         // 11980: Excluding EACH_QUORUM reads from potential RR, so that we do not miscount DC responses
         ReadRepairDecision repairDecision = consistencyLevel == ConsistencyLevel.EACH_QUORUM
                                             ? ReadRepairDecision.NONE
                                             : newReadRepairDecision(command.metadata());
-        List<InetAddress> targetReplicas = consistencyLevel.filterForQuery(keyspace, allReplicas, repairDecision);
+        List<InetAddressAndPort> targetReplicas = consistencyLevel.filterForQuery(keyspace, allReplicas, repairDecision);
 
         // Throw UAE early if we don't have enough replicas.
         consistencyLevel.assureSufficientLiveNodes(keyspace, targetReplicas);
@@ -223,12 +223,12 @@ public abstract class AbstractReadExecutor
         }
 
         // RRD.NONE or RRD.DC_LOCAL w/ multiple DCs.
-        InetAddress extraReplica = allReplicas.get(targetReplicas.size());
+        InetAddressAndPort extraReplica = allReplicas.get(targetReplicas.size());
         // With repair decision DC_LOCAL all replicas/target replicas may be in different order, so
         // we might have to find a replacement that's not already in targetReplicas.
         if (repairDecision == ReadRepairDecision.DC_LOCAL && targetReplicas.contains(extraReplica))
         {
-            for (InetAddress address : allReplicas)
+            for (InetAddressAndPort address : allReplicas)
             {
                 if (!targetReplicas.contains(address))
                 {
@@ -269,7 +269,7 @@ public abstract class AbstractReadExecutor
          */
         private final boolean logFailedSpeculation;
 
-        public NeverSpeculatingReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas, long queryStartNanoTime, boolean logFailedSpeculation)
+        public NeverSpeculatingReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddressAndPort> targetReplicas, long queryStartNanoTime, boolean logFailedSpeculation)
         {
             super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime);
             this.logFailedSpeculation = logFailedSpeculation;
@@ -290,7 +290,7 @@ public abstract class AbstractReadExecutor
             }
         }
 
-        public Collection<InetAddress> getContactedReplicas()
+        public Collection<InetAddressAndPort> getContactedReplicas()
         {
             return targetReplicas;
         }
@@ -304,7 +304,7 @@ public abstract class AbstractReadExecutor
                                        ColumnFamilyStore cfs,
                                        ReadCommand command,
                                        ConsistencyLevel consistencyLevel,
-                                       List<InetAddress> targetReplicas,
+                                       List<InetAddressAndPort> targetReplicas,
                                        long queryStartNanoTime)
         {
             super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime);
@@ -314,7 +314,7 @@ public abstract class AbstractReadExecutor
         {
             // if CL + RR result in covering all replicas, getReadExecutor forces AlwaysSpeculating.  So we know
             // that the last replica in our list is "extra."
-            List<InetAddress> initialReplicas = targetReplicas.subList(0, targetReplicas.size() - 1);
+            List<InetAddressAndPort> initialReplicas = targetReplicas.subList(0, targetReplicas.size() - 1);
 
             if (handler.blockfor < initialReplicas.size())
             {
@@ -347,7 +347,7 @@ public abstract class AbstractReadExecutor
                 if (handler.resolver.isDataPresent())
                     retryCommand = command.copyAsDigestQuery();
 
-                InetAddress extraReplica = Iterables.getLast(targetReplicas);
+                InetAddressAndPort extraReplica = Iterables.getLast(targetReplicas);
                 if (traceState != null)
                     traceState.trace("speculating read retry on {}", extraReplica);
                 logger.trace("speculating read retry on {}", extraReplica);
@@ -355,7 +355,7 @@ public abstract class AbstractReadExecutor
             }
         }
 
-        public Collection<InetAddress> getContactedReplicas()
+        public Collection<InetAddressAndPort> getContactedReplicas()
         {
             return speculated
                  ? targetReplicas
@@ -378,7 +378,7 @@ public abstract class AbstractReadExecutor
                                              ColumnFamilyStore cfs,
                                              ReadCommand command,
                                              ConsistencyLevel consistencyLevel,
-                                             List<InetAddress> targetReplicas,
+                                             List<InetAddressAndPort> targetReplicas,
                                              long queryStartNanoTime)
         {
             super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime);
@@ -389,7 +389,7 @@ public abstract class AbstractReadExecutor
             // no-op
         }
 
-        public Collection<InetAddress> getContactedReplicas()
+        public Collection<InetAddressAndPort> getContactedReplicas()
         {
             return targetReplicas;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index b5eaadb..9d800a0 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.service;
 
-import java.net.InetAddress;
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -35,6 +34,7 @@ import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
@@ -47,15 +47,15 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
     private AtomicInteger responsesAndExpirations;
     private final SimpleCondition condition = new SimpleCondition();
     protected final Keyspace keyspace;
-    protected final Collection<InetAddress> naturalEndpoints;
+    protected final Collection<InetAddressAndPort> naturalEndpoints;
     public final ConsistencyLevel consistencyLevel;
     protected final Runnable callback;
-    protected final Collection<InetAddress> pendingEndpoints;
+    protected final Collection<InetAddressAndPort> pendingEndpoints;
     protected final WriteType writeType;
     private static final AtomicIntegerFieldUpdater<AbstractWriteResponseHandler> failuresUpdater
     = AtomicIntegerFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, "failures");
     private volatile int failures = 0;
-    private final Map<InetAddress, RequestFailureReason> failureReasonByEndpoint;
+    private final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint;
     private final long queryStartNanoTime;
     private volatile boolean supportsBackPressure = true;
 
@@ -72,8 +72,8 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
      * @param queryStartNanoTime
      */
     protected AbstractWriteResponseHandler(Keyspace keyspace,
-                                           Collection<InetAddress> naturalEndpoints,
-                                           Collection<InetAddress> pendingEndpoints,
+                                           Collection<InetAddressAndPort> naturalEndpoints,
+                                           Collection<InetAddressAndPort> pendingEndpoints,
                                            ConsistencyLevel consistencyLevel,
                                            Runnable callback,
                                            WriteType writeType,
@@ -208,7 +208,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
     /**
      * @return true if the message counts towards the totalBlockFor() threshold
      */
-    protected boolean waitingFor(InetAddress from)
+    protected boolean waitingFor(InetAddressAndPort from)
     {
         return true;
     }
@@ -236,7 +236,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
     }
 
     @Override
-    public void onFailure(InetAddress from, RequestFailureReason failureReason)
+    public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason)
     {
         logger.trace("Got failure from {}", from);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org