You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/01/25 20:12:09 UTC
[12/19] cassandra git commit: Allow storage port to be configurable
per node
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java b/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java
index ce05e93..c7d45bf 100644
--- a/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java
+++ b/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.repair.asymmetric;
-import java.net.InetAddress;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -31,6 +30,7 @@ import com.google.common.collect.ImmutableMap;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
/**
* Basic idea is that we track incoming ranges instead of blindly just exchanging the ranges that mismatch between two nodes
@@ -47,19 +47,19 @@ public class ReduceHelper
/**
* Reduces the differences provided by the merkle trees to a minimum set of differences
*/
- public static ImmutableMap<InetAddress, HostDifferences> reduce(DifferenceHolder differences, PreferedNodeFilter filter)
+ public static ImmutableMap<InetAddressAndPort, HostDifferences> reduce(DifferenceHolder differences, PreferedNodeFilter filter)
{
- Map<InetAddress, IncomingRepairStreamTracker> trackers = createIncomingRepairStreamTrackers(differences);
- Map<InetAddress, Integer> outgoingStreamCounts = new HashMap<>();
- ImmutableMap.Builder<InetAddress, HostDifferences> mapBuilder = ImmutableMap.builder();
- for (Map.Entry<InetAddress, IncomingRepairStreamTracker> trackerEntry : trackers.entrySet())
+ Map<InetAddressAndPort, IncomingRepairStreamTracker> trackers = createIncomingRepairStreamTrackers(differences);
+ Map<InetAddressAndPort, Integer> outgoingStreamCounts = new HashMap<>();
+ ImmutableMap.Builder<InetAddressAndPort, HostDifferences> mapBuilder = ImmutableMap.builder();
+ for (Map.Entry<InetAddressAndPort, IncomingRepairStreamTracker> trackerEntry : trackers.entrySet())
{
IncomingRepairStreamTracker tracker = trackerEntry.getValue();
HostDifferences rangesToFetch = new HostDifferences();
for (Map.Entry<Range<Token>, StreamFromOptions> entry : tracker.getIncoming().entrySet())
{
Range<Token> rangeToFetch = entry.getKey();
- for (InetAddress remoteNode : pickLeastStreaming(trackerEntry.getKey(), entry.getValue(), outgoingStreamCounts, filter))
+ for (InetAddressAndPort remoteNode : pickLeastStreaming(trackerEntry.getKey(), entry.getValue(), outgoingStreamCounts, filter))
rangesToFetch.addSingleRange(remoteNode, rangeToFetch);
}
mapBuilder.put(trackerEntry.getKey(), rangesToFetch);
@@ -69,14 +69,14 @@ public class ReduceHelper
}
@VisibleForTesting
- static Map<InetAddress, IncomingRepairStreamTracker> createIncomingRepairStreamTrackers(DifferenceHolder differences)
+ static Map<InetAddressAndPort, IncomingRepairStreamTracker> createIncomingRepairStreamTrackers(DifferenceHolder differences)
{
- Map<InetAddress, IncomingRepairStreamTracker> trackers = new HashMap<>();
+ Map<InetAddressAndPort, IncomingRepairStreamTracker> trackers = new HashMap<>();
- for (InetAddress hostWithDifference : differences.keyHosts())
+ for (InetAddressAndPort hostWithDifference : differences.keyHosts())
{
HostDifferences hostDifferences = differences.get(hostWithDifference);
- for (InetAddress differingHost : hostDifferences.hosts())
+ for (InetAddressAndPort differingHost : hostDifferences.hosts())
{
List<Range<Token>> differingRanges = hostDifferences.get(differingHost);
// hostWithDifference has mismatching ranges differingRanges with differingHost:
@@ -93,24 +93,24 @@ public class ReduceHelper
}
private static IncomingRepairStreamTracker getTracker(DifferenceHolder differences,
- Map<InetAddress, IncomingRepairStreamTracker> trackers,
- InetAddress host)
+ Map<InetAddressAndPort, IncomingRepairStreamTracker> trackers,
+ InetAddressAndPort host)
{
return trackers.computeIfAbsent(host, (h) -> new IncomingRepairStreamTracker(differences));
}
// greedily pick the nodes doing the least amount of streaming
- private static Collection<InetAddress> pickLeastStreaming(InetAddress streamingNode,
+ private static Collection<InetAddressAndPort> pickLeastStreaming(InetAddressAndPort streamingNode,
StreamFromOptions toStreamFrom,
- Map<InetAddress, Integer> outgoingStreamCounts,
+ Map<InetAddressAndPort, Integer> outgoingStreamCounts,
PreferedNodeFilter filter)
{
- Set<InetAddress> retSet = new HashSet<>();
- for (Set<InetAddress> toStream : toStreamFrom.allStreams())
+ Set<InetAddressAndPort> retSet = new HashSet<>();
+ for (Set<InetAddressAndPort> toStream : toStreamFrom.allStreams())
{
- InetAddress candidate = null;
- Set<InetAddress> prefered = filter.apply(streamingNode, toStream);
- for (InetAddress node : prefered)
+ InetAddressAndPort candidate = null;
+ Set<InetAddressAndPort> prefered = filter.apply(streamingNode, toStream);
+ for (InetAddressAndPort node : prefered)
{
if (candidate == null || outgoingStreamCounts.getOrDefault(candidate, 0) > outgoingStreamCounts.getOrDefault(node, 0))
{
@@ -120,7 +120,7 @@ public class ReduceHelper
// ok, found no prefered hosts, try all of them
if (candidate == null)
{
- for (InetAddress node : toStream)
+ for (InetAddressAndPort node : toStream)
{
if (candidate == null || outgoingStreamCounts.getOrDefault(candidate, 0) > outgoingStreamCounts.getOrDefault(node, 0))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java b/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java
index 4516f23..6070983 100644
--- a/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java
+++ b/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.repair.asymmetric;
-import java.net.InetAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@@ -28,6 +27,7 @@ import com.google.common.collect.Sets;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
/**
* Keeps track of where a node needs to stream a given range from.
@@ -53,18 +53,18 @@ public class StreamFromOptions
/**
* Contains the hosts to stream from - if two nodes are in the same inner set, they are identical for the range we are handling
*/
- private final Set<Set<InetAddress>> streamOptions = new HashSet<>();
+ private final Set<Set<InetAddressAndPort>> streamOptions = new HashSet<>();
public StreamFromOptions(DifferenceHolder differences, Range<Token> range)
{
this(differences, range, Collections.emptySet());
}
- private StreamFromOptions(DifferenceHolder differences, Range<Token> range, Set<Set<InetAddress>> existing)
+ private StreamFromOptions(DifferenceHolder differences, Range<Token> range, Set<Set<InetAddressAndPort>> existing)
{
this.differences = differences;
this.range = range;
- for (Set<InetAddress> addresses : existing)
+ for (Set<InetAddressAndPort> addresses : existing)
this.streamOptions.add(Sets.newHashSet(addresses));
}
@@ -75,11 +75,11 @@ public class StreamFromOptions
* range we are tracking, then just add it to the set with the identical remote nodes. Otherwise create a new group
* of nodes containing this new node.
*/
- public void add(InetAddress streamFromNode)
+ public void add(InetAddressAndPort streamFromNode)
{
- for (Set<InetAddress> options : streamOptions)
+ for (Set<InetAddressAndPort> options : streamOptions)
{
- InetAddress first = options.iterator().next();
+ InetAddressAndPort first = options.iterator().next();
if (!differences.hasDifferenceBetween(first, streamFromNode, range))
{
options.add(streamFromNode);
@@ -94,7 +94,7 @@ public class StreamFromOptions
return new StreamFromOptions(differences, withRange, streamOptions);
}
- public Iterable<Set<InetAddress>> allStreams()
+ public Iterable<Set<InetAddressAndPort>> allStreams()
{
return streamOptions;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
index c137346..78057e2 100644
--- a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
+++ b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
@@ -33,6 +33,7 @@ import com.google.common.collect.Iterables;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.repair.messages.FailSession;
import org.apache.cassandra.repair.messages.FinalizeCommit;
import org.apache.cassandra.repair.messages.FinalizePromise;
@@ -55,7 +56,7 @@ import org.apache.cassandra.tools.nodetool.RepairAdmin;
* There are 4 stages to a consistent incremental repair.
*
* <h1>Repair prepare</h1>
- * First, the normal {@link ActiveRepairService#prepareForRepair(UUID, InetAddress, Set, RepairOption, List)} stuff
+ * First, the normal {@link ActiveRepairService#prepareForRepair(UUID, InetAddressAndPort, Set, RepairOption, List)} stuff
* happens, which sends out {@link PrepareMessage} and creates a {@link ActiveRepairService.ParentRepairSession}
* on the coordinator and each of the neighbors.
*
@@ -68,7 +69,7 @@ import org.apache.cassandra.tools.nodetool.RepairAdmin;
* coordinator indicating success or failure. If the pending anti-compaction fails, the local session state is set
* to {@code FAILED}.
* <p/>
- * (see {@link LocalSessions#handlePrepareMessage(InetAddress, PrepareConsistentRequest)}
+ * (see {@link LocalSessions#handlePrepareMessage(InetAddressAndPort, PrepareConsistentRequest)}
* <p/>
* Once the coordinator recieves positive {@code PrepareConsistentResponse} messages from all the participants, the
* coordinator begins the normal repair process.
@@ -99,8 +100,8 @@ import org.apache.cassandra.tools.nodetool.RepairAdmin;
* & {@link CoordinatorSession#finalizeCommit()}
* <p/>
*
- * On the local session side, see {@link LocalSessions#handleFinalizeProposeMessage(InetAddress, FinalizePropose)}
- * & {@link LocalSessions#handleFinalizeCommitMessage(InetAddress, FinalizeCommit)}
+ * On the local session side, see {@link LocalSessions#handleFinalizeProposeMessage(InetAddressAndPort, FinalizePropose)}
+ * & {@link LocalSessions#handleFinalizeCommitMessage(InetAddressAndPort, FinalizeCommit)}
*
* <h1>Failure</h1>
* If there are any failures or problems during the process above, the session will be failed. When a session is failed,
@@ -187,11 +188,11 @@ public abstract class ConsistentSession
private volatile State state;
public final UUID sessionID;
- public final InetAddress coordinator;
+ public final InetAddressAndPort coordinator;
public final ImmutableSet<TableId> tableIds;
public final long repairedAt;
public final ImmutableSet<Range<Token>> ranges;
- public final ImmutableSet<InetAddress> participants;
+ public final ImmutableSet<InetAddressAndPort> participants;
ConsistentSession(AbstractBuilder builder)
{
@@ -260,11 +261,11 @@ public abstract class ConsistentSession
{
private State state;
private UUID sessionID;
- private InetAddress coordinator;
+ private InetAddressAndPort coordinator;
private Set<TableId> ids;
private long repairedAt;
private Collection<Range<Token>> ranges;
- private Set<InetAddress> participants;
+ private Set<InetAddressAndPort> participants;
void withState(State state)
{
@@ -276,7 +277,7 @@ public abstract class ConsistentSession
this.sessionID = sessionID;
}
- void withCoordinator(InetAddress coordinator)
+ void withCoordinator(InetAddressAndPort coordinator)
{
this.coordinator = coordinator;
}
@@ -301,7 +302,7 @@ public abstract class ConsistentSession
this.ranges = ranges;
}
- void withParticipants(Set<InetAddress> peers)
+ void withParticipants(Set<InetAddressAndPort> peers)
{
this.participants = peers;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
index ba0025f..f52a28d 100644
--- a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
+++ b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.repair.consistent;
-import java.net.InetAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -39,6 +38,7 @@ import org.apache.commons.lang3.time.DurationFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.RepairSessionResult;
@@ -58,7 +58,7 @@ public class CoordinatorSession extends ConsistentSession
{
private static final Logger logger = LoggerFactory.getLogger(CoordinatorSession.class);
- private final Map<InetAddress, State> participantStates = new HashMap<>();
+ private final Map<InetAddressAndPort, State> participantStates = new HashMap<>();
private final SettableFuture<Boolean> prepareFuture = SettableFuture.create();
private final SettableFuture<Boolean> finalizeProposeFuture = SettableFuture.create();
@@ -69,7 +69,7 @@ public class CoordinatorSession extends ConsistentSession
public CoordinatorSession(Builder builder)
{
super(builder);
- for (InetAddress participant : participants)
+ for (InetAddressAndPort participant : participants)
{
participantStates.put(participant, State.PREPARING);
}
@@ -95,7 +95,7 @@ public class CoordinatorSession extends ConsistentSession
super.setState(state);
}
- public synchronized void setParticipantState(InetAddress participant, State state)
+ public synchronized void setParticipantState(InetAddressAndPort participant, State state)
{
logger.trace("Setting participant {} to state {} for repair {}", participant, state, sessionID);
Preconditions.checkArgument(participantStates.containsKey(participant),
@@ -115,7 +115,7 @@ public class CoordinatorSession extends ConsistentSession
synchronized void setAll(State state)
{
- for (InetAddress participant : participants)
+ for (InetAddressAndPort participant : participants)
{
setParticipantState(participant, state);
}
@@ -131,7 +131,7 @@ public class CoordinatorSession extends ConsistentSession
return getState() == State.FAILED || Iterables.any(participantStates.values(), v -> v == State.FAILED);
}
- protected void sendMessage(InetAddress destination, RepairMessage message)
+ protected void sendMessage(InetAddressAndPort destination, RepairMessage message)
{
logger.trace("Sending {} to {}", message, destination);
MessageOut<RepairMessage> messageOut = new MessageOut<RepairMessage>(MessagingService.Verb.REPAIR_MESSAGE, message, RepairMessage.serializer);
@@ -144,14 +144,14 @@ public class CoordinatorSession extends ConsistentSession
logger.debug("Beginning prepare phase of incremental repair session {}", sessionID);
PrepareConsistentRequest message = new PrepareConsistentRequest(sessionID, coordinator, participants);
- for (final InetAddress participant : participants)
+ for (final InetAddressAndPort participant : participants)
{
sendMessage(participant, message);
}
return prepareFuture;
}
- public synchronized void handlePrepareResponse(InetAddress participant, boolean success)
+ public synchronized void handlePrepareResponse(InetAddressAndPort participant, boolean success)
{
if (getState() == State.FAILED)
{
@@ -185,14 +185,14 @@ public class CoordinatorSession extends ConsistentSession
Preconditions.checkArgument(allStates(State.REPAIRING));
logger.debug("Proposing finalization of repair session {}", sessionID);
FinalizePropose message = new FinalizePropose(sessionID);
- for (final InetAddress participant : participants)
+ for (final InetAddressAndPort participant : participants)
{
sendMessage(participant, message);
}
return finalizeProposeFuture;
}
- public synchronized void handleFinalizePromise(InetAddress participant, boolean success)
+ public synchronized void handleFinalizePromise(InetAddressAndPort participant, boolean success)
{
if (getState() == State.FAILED)
{
@@ -221,7 +221,7 @@ public class CoordinatorSession extends ConsistentSession
Preconditions.checkArgument(allStates(State.FINALIZE_PROMISED));
logger.debug("Committing finalization of repair session {}", sessionID);
FinalizeCommit message = new FinalizeCommit(sessionID);
- for (final InetAddress participant : participants)
+ for (final InetAddressAndPort participant : participants)
{
sendMessage(participant, message);
}
@@ -233,7 +233,7 @@ public class CoordinatorSession extends ConsistentSession
{
logger.info("Incremental repair session {} failed", sessionID);
FailSession message = new FailSession(sessionID);
- for (final InetAddress participant : participants)
+ for (final InetAddressAndPort participant : participants)
{
if (participantStates.get(participant) != State.FAILED)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java
index 211e0c1..bb84d0a 100644
--- a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.repair.consistent;
-import java.net.InetAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -26,6 +25,7 @@ import java.util.UUID;
import com.google.common.base.Preconditions;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.repair.messages.FailSession;
import org.apache.cassandra.repair.messages.FinalizePromise;
import org.apache.cassandra.repair.messages.PrepareConsistentResponse;
@@ -43,7 +43,7 @@ public class CoordinatorSessions
return new CoordinatorSession(builder);
}
- public synchronized CoordinatorSession registerSession(UUID sessionId, Set<InetAddress> participants)
+ public synchronized CoordinatorSession registerSession(UUID sessionId, Set<InetAddressAndPort> participants)
{
Preconditions.checkArgument(!sessions.containsKey(sessionId), "A coordinator already exists for session %s", sessionId);
ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionId);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/consistent/LocalSessionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessionInfo.java b/src/java/org/apache/cassandra/repair/consistent/LocalSessionInfo.java
index 903aeb5..98b883a 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSessionInfo.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessionInfo.java
@@ -21,10 +21,12 @@ package org.apache.cassandra.repair.consistent;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.Map;
+import java.util.stream.Collectors;
import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
@@ -40,6 +42,7 @@ public class LocalSessionInfo
public static final String LAST_UPDATE = "LAST_UPDATE";
public static final String COORDINATOR = "COORDINATOR";
public static final String PARTICIPANTS = "PARTICIPANTS";
+ public static final String PARTICIPANTS_WP = "PARTICIPANTS_WP";
public static final String TABLES = "TABLES";
@@ -59,7 +62,8 @@ public class LocalSessionInfo
m.put(STARTED, Integer.toString(session.getStartedAt()));
m.put(LAST_UPDATE, Integer.toString(session.getLastUpdate()));
m.put(COORDINATOR, session.coordinator.toString());
- m.put(PARTICIPANTS, Joiner.on(',').join(Iterables.transform(session.participants, InetAddress::toString)));
+ m.put(PARTICIPANTS, Joiner.on(',').join(Iterables.transform(session.participants.stream().map(peer -> peer.address).collect(Collectors.toList()), InetAddress::getHostAddress)));
+ m.put(PARTICIPANTS_WP, Joiner.on(',').join(Iterables.transform(session.participants, InetAddressAndPort::toString)));
m.put(TABLES, Joiner.on(',').join(Iterables.transform(session.tableIds, LocalSessionInfo::tableString)));
return m;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
index 4ef2c2c..e62f6fd 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.repair.consistent;
import java.io.IOException;
import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Date;
@@ -33,6 +34,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
@@ -49,13 +51,18 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.marshal.InetAddressType;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
@@ -141,13 +148,13 @@ public class LocalSessions
}
@VisibleForTesting
- protected InetAddress getBroadcastAddress()
+ protected InetAddressAndPort getBroadcastAddressAndPort()
{
- return FBUtilities.getBroadcastAddress();
+ return FBUtilities.getBroadcastAddressAndPort();
}
@VisibleForTesting
- protected boolean isAlive(InetAddress address)
+ protected boolean isAlive(InetAddressAndPort address)
{
return FailureDetector.instance.isAlive(address);
}
@@ -177,14 +184,14 @@ public class LocalSessions
logger.info("Cancelling local repair session {}", sessionID);
LocalSession session = getSession(sessionID);
Preconditions.checkArgument(session != null, "Session {} does not exist", sessionID);
- Preconditions.checkArgument(force || session.coordinator.equals(getBroadcastAddress()),
+ Preconditions.checkArgument(force || session.coordinator.equals(getBroadcastAddressAndPort()),
"Cancel session %s from it's coordinator (%s) or use --force",
sessionID, session.coordinator);
setStateAndSave(session, FAILED);
- for (InetAddress participant : session.participants)
+ for (InetAddressAndPort participant : session.participants)
{
- if (!participant.equals(getBroadcastAddress()))
+ if (!participant.equals(getBroadcastAddressAndPort()))
sendMessage(participant, new FailSession(sessionID));
}
}
@@ -335,10 +342,12 @@ public class LocalSessions
"repaired_at, " +
"state, " +
"coordinator, " +
+ "coordinator_port, " +
"participants, " +
+ "participants_wp," +
"ranges, " +
"cfids) " +
- "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
QueryProcessor.executeInternal(String.format(query, keyspace, table),
session.sessionID,
@@ -346,8 +355,10 @@ public class LocalSessions
Date.from(Instant.ofEpochSecond(session.getLastUpdate())),
Date.from(Instant.ofEpochMilli(session.repairedAt)),
session.getState().ordinal(),
- session.coordinator,
- session.participants,
+ session.coordinator.address,
+ session.coordinator.port,
+ session.participants.stream().map(participant -> participant.address).collect(Collectors.toSet()),
+ session.participants.stream().map(participant -> participant.toString()).collect(Collectors.toSet()),
serializeRanges(session.ranges),
tableIdToUuid(session.tableIds));
}
@@ -362,12 +373,27 @@ public class LocalSessions
LocalSession.Builder builder = LocalSession.builder();
builder.withState(ConsistentSession.State.valueOf(row.getInt("state")));
builder.withSessionID(row.getUUID("parent_id"));
- builder.withCoordinator(row.getInetAddress("coordinator"));
+ InetAddressAndPort coordinator = InetAddressAndPort.getByAddressOverrideDefaults(
+ row.getInetAddress("coordinator"),
+ row.getInt("coordinator_port"));
+ builder.withCoordinator(coordinator);
builder.withTableIds(uuidToTableId(row.getSet("cfids", UUIDType.instance)));
builder.withRepairedAt(row.getTimestamp("repaired_at").getTime());
builder.withRanges(deserializeRanges(row.getSet("ranges", BytesType.instance)));
- builder.withParticipants(row.getSet("participants", InetAddressType.instance));
-
+ //There is no cross version streaming and thus no cross version repair so assume that
+ //any valid repair sessions has the participants_wp column and any that doesn't is malformed
+ Set<String> participants = row.getSet("participants_wp", UTF8Type.instance);
+ builder.withParticipants(participants.stream().map(participant ->
+ {
+ try
+ {
+ return InetAddressAndPort.getByName(participant);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }).collect(Collectors.toSet()));
builder.withStartedAt(dateToSeconds(row.getTimestamp("started_at")));
builder.withLastUpdate(dateToSeconds(row.getTimestamp("last_update")));
@@ -440,7 +466,7 @@ public class LocalSessions
}
@VisibleForTesting
- LocalSession createSessionUnsafe(UUID sessionId, ActiveRepairService.ParentRepairSession prs, Set<InetAddress> peers)
+ LocalSession createSessionUnsafe(UUID sessionId, ActiveRepairService.ParentRepairSession prs, Set<InetAddressAndPort> peers)
{
LocalSession.Builder builder = LocalSession.builder();
builder.withState(ConsistentSession.State.PREPARING);
@@ -464,7 +490,7 @@ public class LocalSessions
return ActiveRepairService.instance.getParentRepairSession(sessionID);
}
- protected void sendMessage(InetAddress destination, RepairMessage message)
+ protected void sendMessage(InetAddressAndPort destination, RepairMessage message)
{
logger.trace("sending {} to {}", message, destination);
MessageOut<RepairMessage> messageOut = new MessageOut<RepairMessage>(MessagingService.Verb.REPAIR_MESSAGE, message, RepairMessage.serializer);
@@ -536,12 +562,12 @@ public class LocalSessions
* successfully. If the pending anti compaction fails, a failure message is sent to the coordinator,
* cancelling the session.
*/
- public void handlePrepareMessage(InetAddress from, PrepareConsistentRequest request)
+ public void handlePrepareMessage(InetAddressAndPort from, PrepareConsistentRequest request)
{
logger.trace("received {} from {}", request, from);
UUID sessionID = request.parentSession;
- InetAddress coordinator = request.coordinator;
- Set<InetAddress> peers = request.participants;
+ InetAddressAndPort coordinator = request.coordinator;
+ Set<InetAddressAndPort> peers = request.participants;
ActiveRepairService.ParentRepairSession parentSession;
try
@@ -568,7 +594,7 @@ public class LocalSessions
{
logger.debug("Prepare phase for incremental repair session {} completed", sessionID);
setStateAndSave(session, PREPARED);
- sendMessage(coordinator, new PrepareConsistentResponse(sessionID, getBroadcastAddress(), true));
+ sendMessage(coordinator, new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), true));
executor.shutdown();
}
@@ -587,7 +613,7 @@ public class LocalSessions
{
logger.error("Prepare phase for incremental repair session {} failed", sessionID, t);
}
- sendMessage(coordinator, new PrepareConsistentResponse(sessionID, getBroadcastAddress(), false));
+ sendMessage(coordinator, new PrepareConsistentResponse(sessionID, getBroadcastAddressAndPort(), false));
failSession(sessionID, false);
executor.shutdown();
}
@@ -604,7 +630,7 @@ public class LocalSessions
}
}
- public void handleFinalizeProposeMessage(InetAddress from, FinalizePropose propose)
+ public void handleFinalizeProposeMessage(InetAddressAndPort from, FinalizePropose propose)
{
logger.trace("received {} from {}", propose, from);
UUID sessionID = propose.sessionID;
@@ -629,7 +655,7 @@ public class LocalSessions
*/
syncTable();
- sendMessage(from, new FinalizePromise(sessionID, getBroadcastAddress(), true));
+ sendMessage(from, new FinalizePromise(sessionID, getBroadcastAddressAndPort(), true));
logger.debug("Received FinalizePropose message for incremental repair session {}, responded with FinalizePromise", sessionID);
}
catch (IllegalArgumentException e)
@@ -659,7 +685,7 @@ public class LocalSessions
* as part of the compaction process, and avoids having to worry about in progress compactions interfering with the
* promotion.
*/
- public void handleFinalizeCommitMessage(InetAddress from, FinalizeCommit commit)
+ public void handleFinalizeCommitMessage(InetAddressAndPort from, FinalizeCommit commit)
{
logger.trace("received {} from {}", commit, from);
UUID sessionID = commit.sessionID;
@@ -674,7 +700,7 @@ public class LocalSessions
logger.info("Finalized local repair session {}", sessionID);
}
- public void handleFailSessionMessage(InetAddress from, FailSession msg)
+ public void handleFailSessionMessage(InetAddressAndPort from, FailSession msg)
{
logger.trace("received {} from {}", msg, from);
failSession(msg.sessionID, false);
@@ -684,16 +710,16 @@ public class LocalSessions
{
logger.debug("Attempting to learn the outcome of unfinished local incremental repair session {}", session.sessionID);
StatusRequest request = new StatusRequest(session.sessionID);
- for (InetAddress participant : session.participants)
+ for (InetAddressAndPort participant : session.participants)
{
- if (!getBroadcastAddress().equals(participant) && isAlive(participant))
+ if (!getBroadcastAddressAndPort().equals(participant) && isAlive(participant))
{
sendMessage(participant, request);
}
}
}
- public void handleStatusRequest(InetAddress from, StatusRequest request)
+ public void handleStatusRequest(InetAddressAndPort from, StatusRequest request)
{
logger.trace("received {} from {}", request, from);
UUID sessionID = request.sessionID;
@@ -710,7 +736,7 @@ public class LocalSessions
}
}
- public void handleStatusResponse(InetAddress from, StatusResponse response)
+ public void handleStatusResponse(InetAddressAndPort from, StatusResponse response)
{
logger.trace("received {} from {}", response, from);
UUID sessionID = response.sessionID;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java b/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java
index 015b558..156fde7 100644
--- a/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java
+++ b/src/java/org/apache/cassandra/repair/consistent/SyncStatSummary.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.repair.consistent;
-import java.net.InetAddress;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -27,6 +26,7 @@ import java.util.Objects;
import com.google.common.collect.Lists;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.repair.RepairResult;
import org.apache.cassandra.repair.RepairSessionResult;
import org.apache.cassandra.repair.SyncStat;
@@ -42,14 +42,14 @@ public class SyncStatSummary
private static class Session
{
- final InetAddress src;
- final InetAddress dst;
+ final InetAddressAndPort src;
+ final InetAddressAndPort dst;
int files = 0;
long bytes = 0;
long ranges = 0;
- Session(InetAddress src, InetAddress dst)
+ Session(InetAddressAndPort src, InetAddressAndPort dst)
{
this.src = src;
this.dst = dst;
@@ -84,7 +84,7 @@ public class SyncStatSummary
int ranges = -1;
boolean totalsCalculated = false;
- final Map<Pair<InetAddress, InetAddress>, Session> sessions = new HashMap<>();
+ final Map<Pair<InetAddressAndPort, InetAddressAndPort>, Session> sessions = new HashMap<>();
Table(String keyspace, String table)
{
@@ -92,9 +92,9 @@ public class SyncStatSummary
this.table = table;
}
- Session getOrCreate(InetAddress from, InetAddress to)
+ Session getOrCreate(InetAddressAndPort from, InetAddressAndPort to)
{
- Pair<InetAddress, InetAddress> k = Pair.create(from, to);
+ Pair<InetAddressAndPort, InetAddressAndPort> k = Pair.create(from, to);
if (!sessions.containsKey(k))
{
sessions.put(k, new Session(from, to));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java b/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java
index b75ad7f..6d76269 100644
--- a/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.CompactEndpointSerializationHelper;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.RepairJobDesc;
@@ -39,13 +40,13 @@ public class AsymmetricSyncRequest extends RepairMessage
{
public static MessageSerializer serializer = new SyncRequestSerializer();
- public final InetAddress initiator;
- public final InetAddress fetchingNode;
- public final InetAddress fetchFrom;
+ public final InetAddressAndPort initiator;
+ public final InetAddressAndPort fetchingNode;
+ public final InetAddressAndPort fetchFrom;
public final Collection<Range<Token>> ranges;
public final PreviewKind previewKind;
- public AsymmetricSyncRequest(RepairJobDesc desc, InetAddress initiator, InetAddress fetchingNode, InetAddress fetchFrom, Collection<Range<Token>> ranges, PreviewKind previewKind)
+ public AsymmetricSyncRequest(RepairJobDesc desc, InetAddressAndPort initiator, InetAddressAndPort fetchingNode, InetAddressAndPort fetchFrom, Collection<Range<Token>> ranges, PreviewKind previewKind)
{
super(Type.ASYMMETRIC_SYNC_REQUEST, desc);
this.initiator = initiator;
@@ -80,9 +81,9 @@ public class AsymmetricSyncRequest extends RepairMessage
public void serialize(AsymmetricSyncRequest message, DataOutputPlus out, int version) throws IOException
{
RepairJobDesc.serializer.serialize(message.desc, out, version);
- CompactEndpointSerializationHelper.serialize(message.initiator, out);
- CompactEndpointSerializationHelper.serialize(message.fetchingNode, out);
- CompactEndpointSerializationHelper.serialize(message.fetchFrom, out);
+ CompactEndpointSerializationHelper.instance.serialize(message.initiator, out, version);
+ CompactEndpointSerializationHelper.instance.serialize(message.fetchingNode, out, version);
+ CompactEndpointSerializationHelper.instance.serialize(message.fetchFrom, out, version);
out.writeInt(message.ranges.size());
for (Range<Token> range : message.ranges)
{
@@ -95,9 +96,9 @@ public class AsymmetricSyncRequest extends RepairMessage
public AsymmetricSyncRequest deserialize(DataInputPlus in, int version) throws IOException
{
RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version);
- InetAddress owner = CompactEndpointSerializationHelper.deserialize(in);
- InetAddress src = CompactEndpointSerializationHelper.deserialize(in);
- InetAddress dst = CompactEndpointSerializationHelper.deserialize(in);
+ InetAddressAndPort owner = CompactEndpointSerializationHelper.instance.deserialize(in, version);
+ InetAddressAndPort src = CompactEndpointSerializationHelper.instance.deserialize(in, version);
+ InetAddressAndPort dst = CompactEndpointSerializationHelper.instance.deserialize(in, version);
int rangesCount = in.readInt();
List<Range<Token>> ranges = new ArrayList<>(rangesCount);
for (int i = 0; i < rangesCount; ++i)
@@ -109,7 +110,9 @@ public class AsymmetricSyncRequest extends RepairMessage
public long serializedSize(AsymmetricSyncRequest message, int version)
{
long size = RepairJobDesc.serializer.serializedSize(message.desc, version);
- size += 3 * CompactEndpointSerializationHelper.serializedSize(message.initiator);
+ size += CompactEndpointSerializationHelper.instance.serializedSize(message.initiator, version);
+ size += CompactEndpointSerializationHelper.instance.serializedSize(message.fetchingNode, version);
+ size += CompactEndpointSerializationHelper.instance.serializedSize(message.fetchFrom, version);
size += TypeSizes.sizeof(message.ranges.size());
for (Range<Token> range : message.ranges)
size += AbstractBounds.tokenSerializer.serializedSize(range, version);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java b/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java
index 6c28347..449748a 100644
--- a/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java
+++ b/src/java/org/apache/cassandra/repair/messages/FinalizePromise.java
@@ -19,24 +19,22 @@
package org.apache.cassandra.repair.messages;
import java.io.IOException;
-import java.net.InetAddress;
import java.util.UUID;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.serializers.InetAddressSerializer;
-import org.apache.cassandra.serializers.TypeSerializer;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
import org.apache.cassandra.utils.UUIDSerializer;
public class FinalizePromise extends RepairMessage
{
public final UUID sessionID;
- public final InetAddress participant;
+ public final InetAddressAndPort participant;
public final boolean promised;
- public FinalizePromise(UUID sessionID, InetAddress participant, boolean promised)
+ public FinalizePromise(UUID sessionID, InetAddressAndPort participant, boolean promised)
{
super(Type.FINALIZE_PROMISE, null);
assert sessionID != null;
@@ -68,26 +66,24 @@ public class FinalizePromise extends RepairMessage
public static MessageSerializer serializer = new MessageSerializer<FinalizePromise>()
{
- private TypeSerializer<InetAddress> inetSerializer = InetAddressSerializer.instance;
-
public void serialize(FinalizePromise msg, DataOutputPlus out, int version) throws IOException
{
UUIDSerializer.serializer.serialize(msg.sessionID, out, version);
- ByteBufferUtil.writeWithShortLength(inetSerializer.serialize(msg.participant), out);
+ CompactEndpointSerializationHelper.instance.serialize(msg.participant, out, version);
out.writeBoolean(msg.promised);
}
public FinalizePromise deserialize(DataInputPlus in, int version) throws IOException
{
return new FinalizePromise(UUIDSerializer.serializer.deserialize(in, version),
- inetSerializer.deserialize(ByteBufferUtil.readWithShortLength(in)),
+ CompactEndpointSerializationHelper.instance.deserialize(in, version),
in.readBoolean());
}
public long serializedSize(FinalizePromise msg, int version)
{
long size = UUIDSerializer.serializer.serializedSize(msg.sessionID, version);
- size += ByteBufferUtil.serializedSizeWithShortLength(inetSerializer.serialize(msg.participant));
+ size += CompactEndpointSerializationHelper.instance.serializedSize(msg.participant, version);
size += TypeSizes.sizeof(msg.promised);
return size;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java
index 57056ef..9aae256 100644
--- a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentRequest.java
@@ -19,7 +19,6 @@
package org.apache.cassandra.repair.messages;
import java.io.IOException;
-import java.net.InetAddress;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
@@ -29,18 +28,17 @@ import com.google.common.collect.ImmutableSet;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.serializers.InetAddressSerializer;
-import org.apache.cassandra.serializers.TypeSerializer;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
import org.apache.cassandra.utils.UUIDSerializer;
public class PrepareConsistentRequest extends RepairMessage
{
public final UUID parentSession;
- public final InetAddress coordinator;
- public final Set<InetAddress> participants;
+ public final InetAddressAndPort coordinator;
+ public final Set<InetAddressAndPort> participants;
- public PrepareConsistentRequest(UUID parentSession, InetAddress coordinator, Set<InetAddress> participants)
+ public PrepareConsistentRequest(UUID parentSession, InetAddressAndPort coordinator, Set<InetAddressAndPort> participants)
{
super(Type.CONSISTENT_REQUEST, null);
assert parentSession != null;
@@ -82,28 +80,27 @@ public class PrepareConsistentRequest extends RepairMessage
public static MessageSerializer serializer = new MessageSerializer<PrepareConsistentRequest>()
{
- private TypeSerializer<InetAddress> inetSerializer = InetAddressSerializer.instance;
public void serialize(PrepareConsistentRequest request, DataOutputPlus out, int version) throws IOException
{
UUIDSerializer.serializer.serialize(request.parentSession, out, version);
- ByteBufferUtil.writeWithShortLength(inetSerializer.serialize(request.coordinator), out);
+ CompactEndpointSerializationHelper.instance.serialize(request.coordinator, out, version);
out.writeInt(request.participants.size());
- for (InetAddress peer : request.participants)
+ for (InetAddressAndPort peer : request.participants)
{
- ByteBufferUtil.writeWithShortLength(inetSerializer.serialize(peer), out);
+ CompactEndpointSerializationHelper.instance.serialize(peer, out, version);
}
}
public PrepareConsistentRequest deserialize(DataInputPlus in, int version) throws IOException
{
UUID sessionId = UUIDSerializer.serializer.deserialize(in, version);
- InetAddress coordinator = inetSerializer.deserialize(ByteBufferUtil.readWithShortLength(in));
+ InetAddressAndPort coordinator = CompactEndpointSerializationHelper.instance.deserialize(in, version);
int numPeers = in.readInt();
- Set<InetAddress> peers = new HashSet<>(numPeers);
+ Set<InetAddressAndPort> peers = new HashSet<>(numPeers);
for (int i = 0; i < numPeers; i++)
{
- InetAddress peer = inetSerializer.deserialize(ByteBufferUtil.readWithShortLength(in));
+ InetAddressAndPort peer = CompactEndpointSerializationHelper.instance.deserialize(in, version);
peers.add(peer);
}
return new PrepareConsistentRequest(sessionId, coordinator, peers);
@@ -112,11 +109,11 @@ public class PrepareConsistentRequest extends RepairMessage
public long serializedSize(PrepareConsistentRequest request, int version)
{
long size = UUIDSerializer.serializer.serializedSize(request.parentSession, version);
- size += ByteBufferUtil.serializedSizeWithShortLength(inetSerializer.serialize(request.coordinator));
+ size += CompactEndpointSerializationHelper.instance.serializedSize(request.coordinator, version);
size += TypeSizes.sizeof(request.participants.size());
- for (InetAddress peer : request.participants)
+ for (InetAddressAndPort peer : request.participants)
{
- size += ByteBufferUtil.serializedSizeWithShortLength(inetSerializer.serialize(peer));
+ size += CompactEndpointSerializationHelper.instance.serializedSize(peer, version);
}
return size;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java
index cf4410a..630f18e 100644
--- a/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java
+++ b/src/java/org/apache/cassandra/repair/messages/PrepareConsistentResponse.java
@@ -19,24 +19,22 @@
package org.apache.cassandra.repair.messages;
import java.io.IOException;
-import java.net.InetAddress;
import java.util.UUID;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.serializers.InetAddressSerializer;
-import org.apache.cassandra.serializers.TypeSerializer;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
import org.apache.cassandra.utils.UUIDSerializer;
public class PrepareConsistentResponse extends RepairMessage
{
public final UUID parentSession;
- public final InetAddress participant;
+ public final InetAddressAndPort participant;
public final boolean success;
- public PrepareConsistentResponse(UUID parentSession, InetAddress participant, boolean success)
+ public PrepareConsistentResponse(UUID parentSession, InetAddressAndPort participant, boolean success)
{
super(Type.CONSISTENT_RESPONSE, null);
assert parentSession != null;
@@ -68,25 +66,24 @@ public class PrepareConsistentResponse extends RepairMessage
public static MessageSerializer serializer = new MessageSerializer<PrepareConsistentResponse>()
{
- private TypeSerializer<InetAddress> inetSerializer = InetAddressSerializer.instance;
public void serialize(PrepareConsistentResponse response, DataOutputPlus out, int version) throws IOException
{
UUIDSerializer.serializer.serialize(response.parentSession, out, version);
- ByteBufferUtil.writeWithShortLength(inetSerializer.serialize(response.participant), out);
+ CompactEndpointSerializationHelper.instance.serialize(response.participant, out, version);
out.writeBoolean(response.success);
}
public PrepareConsistentResponse deserialize(DataInputPlus in, int version) throws IOException
{
return new PrepareConsistentResponse(UUIDSerializer.serializer.deserialize(in, version),
- inetSerializer.deserialize(ByteBufferUtil.readWithShortLength(in)),
+ CompactEndpointSerializationHelper.instance.deserialize(in, version),
in.readBoolean());
}
public long serializedSize(PrepareConsistentResponse response, int version)
{
long size = UUIDSerializer.serializer.serializedSize(response.parentSession, version);
- size += ByteBufferUtil.serializedSizeWithShortLength(inetSerializer.serialize(response.participant));
+ size += CompactEndpointSerializationHelper.instance.serializedSize(response.participant, version);
size += TypeSizes.sizeof(response.success);
return size;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
index 7b68daf..1f1344d 100644
--- a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
+++ b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.repair.messages;
import java.io.IOException;
-import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
@@ -26,6 +25,7 @@ import java.util.Objects;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.repair.NodePair;
import org.apache.cassandra.repair.RepairJobDesc;
import org.apache.cassandra.streaming.SessionSummary;
@@ -53,7 +53,7 @@ public class SyncComplete extends RepairMessage
this.summaries = summaries;
}
- public SyncComplete(RepairJobDesc desc, InetAddress endpoint1, InetAddress endpoint2, boolean success, List<SessionSummary> summaries)
+ public SyncComplete(RepairJobDesc desc, InetAddressAndPort endpoint1, InetAddressAndPort endpoint2, boolean success, List<SessionSummary> summaries)
{
super(Type.SYNC_COMPLETE, desc);
this.summaries = summaries;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
index 01601e2..a0bf4e2 100644
--- a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.repair.messages;
import java.io.IOException;
-import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -30,6 +29,7 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.CompactEndpointSerializationHelper;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.RepairJobDesc;
@@ -45,14 +45,14 @@ public class SyncRequest extends RepairMessage
{
public static MessageSerializer serializer = new SyncRequestSerializer();
- public final InetAddress initiator;
- public final InetAddress src;
- public final InetAddress dst;
+ public final InetAddressAndPort initiator;
+ public final InetAddressAndPort src;
+ public final InetAddressAndPort dst;
public final Collection<Range<Token>> ranges;
public final PreviewKind previewKind;
- public SyncRequest(RepairJobDesc desc, InetAddress initiator, InetAddress src, InetAddress dst, Collection<Range<Token>> ranges, PreviewKind previewKind)
- {
+ public SyncRequest(RepairJobDesc desc, InetAddressAndPort initiator, InetAddressAndPort src, InetAddressAndPort dst, Collection<Range<Token>> ranges, PreviewKind previewKind)
+ {
super(Type.SYNC_REQUEST, desc);
this.initiator = initiator;
this.src = src;
@@ -87,9 +87,9 @@ public class SyncRequest extends RepairMessage
public void serialize(SyncRequest message, DataOutputPlus out, int version) throws IOException
{
RepairJobDesc.serializer.serialize(message.desc, out, version);
- CompactEndpointSerializationHelper.serialize(message.initiator, out);
- CompactEndpointSerializationHelper.serialize(message.src, out);
- CompactEndpointSerializationHelper.serialize(message.dst, out);
+ CompactEndpointSerializationHelper.instance.serialize(message.initiator, out, version);
+ CompactEndpointSerializationHelper.instance.serialize(message.src, out, version);
+ CompactEndpointSerializationHelper.instance.serialize(message.dst, out, version);
out.writeInt(message.ranges.size());
for (Range<Token> range : message.ranges)
{
@@ -102,9 +102,9 @@ public class SyncRequest extends RepairMessage
public SyncRequest deserialize(DataInputPlus in, int version) throws IOException
{
RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version);
- InetAddress owner = CompactEndpointSerializationHelper.deserialize(in);
- InetAddress src = CompactEndpointSerializationHelper.deserialize(in);
- InetAddress dst = CompactEndpointSerializationHelper.deserialize(in);
+ InetAddressAndPort owner = CompactEndpointSerializationHelper.instance.deserialize(in, version);
+ InetAddressAndPort src = CompactEndpointSerializationHelper.instance.deserialize(in, version);
+ InetAddressAndPort dst = CompactEndpointSerializationHelper.instance.deserialize(in, version);
int rangesCount = in.readInt();
List<Range<Token>> ranges = new ArrayList<>(rangesCount);
for (int i = 0; i < rangesCount; ++i)
@@ -116,7 +116,7 @@ public class SyncRequest extends RepairMessage
public long serializedSize(SyncRequest message, int version)
{
long size = RepairJobDesc.serializer.serializedSize(message.desc, version);
- size += 3 * CompactEndpointSerializationHelper.serializedSize(message.initiator);
+ size += 3 * CompactEndpointSerializationHelper.instance.serializedSize(message.initiator, version);
size += TypeSizes.sizeof(message.ranges.size());
for (Range<Token> range : message.ranges)
size += AbstractBounds.tokenSerializer.serializedSize(range, version);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/schema/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/MigrationManager.java b/src/java/org/apache/cassandra/schema/MigrationManager.java
index ef19c25..c8881e5 100644
--- a/src/java/org/apache/cassandra/schema/MigrationManager.java
+++ b/src/java/org/apache/cassandra/schema/MigrationManager.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.schema;
import java.io.IOException;
-import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.*;
import java.lang.management.ManagementFactory;
@@ -39,6 +38,7 @@ import org.apache.cassandra.gms.*;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
@@ -59,10 +59,10 @@ public class MigrationManager
private MigrationManager() {}
- public static void scheduleSchemaPull(InetAddress endpoint, EndpointState state)
+ public static void scheduleSchemaPull(InetAddressAndPort endpoint, EndpointState state)
{
UUID schemaVersion = state.getSchemaVersion();
- if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && schemaVersion != null)
+ if (!endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) && schemaVersion != null)
maybeScheduleSchemaPull(schemaVersion, endpoint);
}
@@ -70,7 +70,7 @@ public class MigrationManager
* If versions differ this node sends request with local migration list to the endpoint
* and expecting to receive a list of migrations to apply locally.
*/
- private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint)
+ private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddressAndPort endpoint)
{
if (Schema.instance.getVersion() == null)
{
@@ -130,7 +130,7 @@ public class MigrationManager
}
}
- private static Future<?> submitMigrationTask(InetAddress endpoint)
+ private static Future<?> submitMigrationTask(InetAddressAndPort endpoint)
{
/*
* Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are
@@ -139,7 +139,7 @@ public class MigrationManager
return StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint));
}
- static boolean shouldPullSchemaFrom(InetAddress endpoint)
+ static boolean shouldPullSchemaFrom(InetAddressAndPort endpoint)
{
/*
* Don't request schema from nodes with a differnt or unknonw major version (may have incompatible schema)
@@ -427,7 +427,7 @@ public class MigrationManager
FBUtilities.waitOnFuture(announce(mutations));
}
- private static void pushSchemaMutation(InetAddress endpoint, Collection<Mutation> schema)
+ private static void pushSchemaMutation(InetAddressAndPort endpoint, Collection<Mutation> schema)
{
MessageOut<Collection<Mutation>> msg = new MessageOut<>(MessagingService.Verb.DEFINITIONS_UPDATE,
schema,
@@ -446,10 +446,10 @@ public class MigrationManager
}
});
- for (InetAddress endpoint : Gossiper.instance.getLiveMembers())
+ for (InetAddressAndPort endpoint : Gossiper.instance.getLiveMembers())
{
// only push schema to nodes with known and equal versions
- if (!endpoint.equals(FBUtilities.getBroadcastAddress()) &&
+ if (!endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) &&
MessagingService.instance().knowsVersion(endpoint) &&
MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version)
pushSchemaMutation(endpoint, schema);
@@ -486,11 +486,11 @@ public class MigrationManager
Schema.instance.clear();
- Set<InetAddress> liveEndpoints = Gossiper.instance.getLiveMembers();
- liveEndpoints.remove(FBUtilities.getBroadcastAddress());
+ Set<InetAddressAndPort> liveEndpoints = Gossiper.instance.getLiveMembers();
+ liveEndpoints.remove(FBUtilities.getBroadcastAddressAndPort());
// force migration if there are nodes around
- for (InetAddress node : liveEndpoints)
+ for (InetAddressAndPort node : liveEndpoints)
{
if (shouldPullSchemaFrom(node))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/schema/MigrationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/MigrationTask.java b/src/java/org/apache/cassandra/schema/MigrationTask.java
index 73e396d..6ff206a 100644
--- a/src/java/org/apache/cassandra/schema/MigrationTask.java
+++ b/src/java/org/apache/cassandra/schema/MigrationTask.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.schema;
-import java.net.InetAddress;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Set;
@@ -32,6 +31,7 @@ import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.SystemKeyspace.BootstrapState;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
@@ -46,9 +46,9 @@ final class MigrationTask extends WrappedRunnable
private static final Set<BootstrapState> monitoringBootstrapStates = EnumSet.of(BootstrapState.NEEDS_BOOTSTRAP, BootstrapState.IN_PROGRESS);
- private final InetAddress endpoint;
+ private final InetAddressAndPort endpoint;
- MigrationTask(InetAddress endpoint)
+ MigrationTask(InetAddressAndPort endpoint)
{
this.endpoint = endpoint;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 7ff0b9b..e06131e 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.service;
-import java.net.InetAddress;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
@@ -38,6 +37,7 @@ import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.exceptions.ReadFailureException;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.metrics.ReadRepairMetrics;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
@@ -60,12 +60,12 @@ public abstract class AbstractReadExecutor
private static final Logger logger = LoggerFactory.getLogger(AbstractReadExecutor.class);
protected final ReadCommand command;
- protected final List<InetAddress> targetReplicas;
+ protected final List<InetAddressAndPort> targetReplicas;
protected final ReadCallback handler;
protected final TraceState traceState;
protected final ColumnFamilyStore cfs;
- AbstractReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas, long queryStartNanoTime)
+ AbstractReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddressAndPort> targetReplicas, long queryStartNanoTime)
{
this.command = command;
this.targetReplicas = targetReplicas;
@@ -78,27 +78,27 @@ public abstract class AbstractReadExecutor
// TODO: we need this when talking with pre-3.0 nodes. So if we preserve the digest format moving forward, we can get rid of this once
// we stop being compatible with pre-3.0 nodes.
int digestVersion = MessagingService.current_version;
- for (InetAddress replica : targetReplicas)
+ for (InetAddressAndPort replica : targetReplicas)
digestVersion = Math.min(digestVersion, MessagingService.instance().getVersion(replica));
command.setDigestVersion(digestVersion);
}
- protected void makeDataRequests(Iterable<InetAddress> endpoints)
+ protected void makeDataRequests(Iterable<InetAddressAndPort> endpoints)
{
makeRequests(command, endpoints);
}
- protected void makeDigestRequests(Iterable<InetAddress> endpoints)
+ protected void makeDigestRequests(Iterable<InetAddressAndPort> endpoints)
{
makeRequests(command.copyAsDigestQuery(), endpoints);
}
- private void makeRequests(ReadCommand readCommand, Iterable<InetAddress> endpoints)
+ private void makeRequests(ReadCommand readCommand, Iterable<InetAddressAndPort> endpoints)
{
boolean hasLocalEndpoint = false;
- for (InetAddress endpoint : endpoints)
+ for (InetAddressAndPort endpoint : endpoints)
{
if (StorageProxy.canDoLocalRequest(endpoint))
{
@@ -132,7 +132,7 @@ public abstract class AbstractReadExecutor
*
* @return target replicas + the extra replica, *IF* we speculated.
*/
- public abstract Collection<InetAddress> getContactedReplicas();
+ public abstract Collection<InetAddressAndPort> getContactedReplicas();
/**
* send the initial set of requests
@@ -184,12 +184,12 @@ public abstract class AbstractReadExecutor
public static AbstractReadExecutor getReadExecutor(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel, long queryStartNanoTime) throws UnavailableException
{
Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
- List<InetAddress> allReplicas = StorageProxy.getLiveSortedEndpoints(keyspace, command.partitionKey());
+ List<InetAddressAndPort> allReplicas = StorageProxy.getLiveSortedEndpoints(keyspace, command.partitionKey());
// 11980: Excluding EACH_QUORUM reads from potential RR, so that we do not miscount DC responses
ReadRepairDecision repairDecision = consistencyLevel == ConsistencyLevel.EACH_QUORUM
? ReadRepairDecision.NONE
: newReadRepairDecision(command.metadata());
- List<InetAddress> targetReplicas = consistencyLevel.filterForQuery(keyspace, allReplicas, repairDecision);
+ List<InetAddressAndPort> targetReplicas = consistencyLevel.filterForQuery(keyspace, allReplicas, repairDecision);
// Throw UAE early if we don't have enough replicas.
consistencyLevel.assureSufficientLiveNodes(keyspace, targetReplicas);
@@ -223,12 +223,12 @@ public abstract class AbstractReadExecutor
}
// RRD.NONE or RRD.DC_LOCAL w/ multiple DCs.
- InetAddress extraReplica = allReplicas.get(targetReplicas.size());
+ InetAddressAndPort extraReplica = allReplicas.get(targetReplicas.size());
// With repair decision DC_LOCAL all replicas/target replicas may be in different order, so
// we might have to find a replacement that's not already in targetReplicas.
if (repairDecision == ReadRepairDecision.DC_LOCAL && targetReplicas.contains(extraReplica))
{
- for (InetAddress address : allReplicas)
+ for (InetAddressAndPort address : allReplicas)
{
if (!targetReplicas.contains(address))
{
@@ -269,7 +269,7 @@ public abstract class AbstractReadExecutor
*/
private final boolean logFailedSpeculation;
- public NeverSpeculatingReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas, long queryStartNanoTime, boolean logFailedSpeculation)
+ public NeverSpeculatingReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddressAndPort> targetReplicas, long queryStartNanoTime, boolean logFailedSpeculation)
{
super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime);
this.logFailedSpeculation = logFailedSpeculation;
@@ -290,7 +290,7 @@ public abstract class AbstractReadExecutor
}
}
- public Collection<InetAddress> getContactedReplicas()
+ public Collection<InetAddressAndPort> getContactedReplicas()
{
return targetReplicas;
}
@@ -304,7 +304,7 @@ public abstract class AbstractReadExecutor
ColumnFamilyStore cfs,
ReadCommand command,
ConsistencyLevel consistencyLevel,
- List<InetAddress> targetReplicas,
+ List<InetAddressAndPort> targetReplicas,
long queryStartNanoTime)
{
super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime);
@@ -314,7 +314,7 @@ public abstract class AbstractReadExecutor
{
// if CL + RR result in covering all replicas, getReadExecutor forces AlwaysSpeculating. So we know
// that the last replica in our list is "extra."
- List<InetAddress> initialReplicas = targetReplicas.subList(0, targetReplicas.size() - 1);
+ List<InetAddressAndPort> initialReplicas = targetReplicas.subList(0, targetReplicas.size() - 1);
if (handler.blockfor < initialReplicas.size())
{
@@ -347,7 +347,7 @@ public abstract class AbstractReadExecutor
if (handler.resolver.isDataPresent())
retryCommand = command.copyAsDigestQuery();
- InetAddress extraReplica = Iterables.getLast(targetReplicas);
+ InetAddressAndPort extraReplica = Iterables.getLast(targetReplicas);
if (traceState != null)
traceState.trace("speculating read retry on {}", extraReplica);
logger.trace("speculating read retry on {}", extraReplica);
@@ -355,7 +355,7 @@ public abstract class AbstractReadExecutor
}
}
- public Collection<InetAddress> getContactedReplicas()
+ public Collection<InetAddressAndPort> getContactedReplicas()
{
return speculated
? targetReplicas
@@ -378,7 +378,7 @@ public abstract class AbstractReadExecutor
ColumnFamilyStore cfs,
ReadCommand command,
ConsistencyLevel consistencyLevel,
- List<InetAddress> targetReplicas,
+ List<InetAddressAndPort> targetReplicas,
long queryStartNanoTime)
{
super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime);
@@ -389,7 +389,7 @@ public abstract class AbstractReadExecutor
// no-op
}
- public Collection<InetAddress> getContactedReplicas()
+ public Collection<InetAddressAndPort> getContactedReplicas()
{
return targetReplicas;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index b5eaadb..9d800a0 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.service;
-import java.net.InetAddress;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -35,6 +34,7 @@ import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
@@ -47,15 +47,15 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
private AtomicInteger responsesAndExpirations;
private final SimpleCondition condition = new SimpleCondition();
protected final Keyspace keyspace;
- protected final Collection<InetAddress> naturalEndpoints;
+ protected final Collection<InetAddressAndPort> naturalEndpoints;
public final ConsistencyLevel consistencyLevel;
protected final Runnable callback;
- protected final Collection<InetAddress> pendingEndpoints;
+ protected final Collection<InetAddressAndPort> pendingEndpoints;
protected final WriteType writeType;
private static final AtomicIntegerFieldUpdater<AbstractWriteResponseHandler> failuresUpdater
= AtomicIntegerFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, "failures");
private volatile int failures = 0;
- private final Map<InetAddress, RequestFailureReason> failureReasonByEndpoint;
+ private final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint;
private final long queryStartNanoTime;
private volatile boolean supportsBackPressure = true;
@@ -72,8 +72,8 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
* @param queryStartNanoTime
*/
protected AbstractWriteResponseHandler(Keyspace keyspace,
- Collection<InetAddress> naturalEndpoints,
- Collection<InetAddress> pendingEndpoints,
+ Collection<InetAddressAndPort> naturalEndpoints,
+ Collection<InetAddressAndPort> pendingEndpoints,
ConsistencyLevel consistencyLevel,
Runnable callback,
WriteType writeType,
@@ -208,7 +208,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
/**
* @return true if the message counts towards the totalBlockFor() threshold
*/
- protected boolean waitingFor(InetAddress from)
+ protected boolean waitingFor(InetAddressAndPort from)
{
return true;
}
@@ -236,7 +236,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
}
@Override
- public void onFailure(InetAddress from, RequestFailureReason failureReason)
+ public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason)
{
logger.trace("Got failure from {}", from);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org