You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/01/25 20:12:10 UTC
[13/19] cassandra git commit: Allow storage port to be configurable
per node
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/async/NettyFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/NettyFactory.java b/src/java/org/apache/cassandra/net/async/NettyFactory.java
index 7844c9b..d891043 100644
--- a/src/java/org/apache/cassandra/net/async/NettyFactory.java
+++ b/src/java/org/apache/cassandra/net/async/NettyFactory.java
@@ -48,6 +48,7 @@ import org.apache.cassandra.auth.IInternodeAuthenticator;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.service.NativeTransportService;
@@ -184,9 +185,9 @@ public final class NettyFactory
* Create a {@link Channel} that listens on the {@code localAddr}. This method will block while trying to bind to the address,
* but it does not make a remote call.
*/
- public Channel createInboundChannel(InetSocketAddress localAddr, InboundInitializer initializer, int receiveBufferSize) throws ConfigurationException
+ public Channel createInboundChannel(InetAddressAndPort localAddr, InboundInitializer initializer, int receiveBufferSize) throws ConfigurationException
{
- String nic = FBUtilities.getNetworkInterface(localAddr.getAddress());
+ String nic = FBUtilities.getNetworkInterface(localAddr.address);
logger.info("Starting Messaging Service on {} {}, encryption: {}",
localAddr, nic == null ? "" : String.format(" (%s)", nic), encryptionLogStatement(initializer.encryptionOptions));
Class<? extends ServerChannel> transport = useEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class;
@@ -202,7 +203,7 @@ public final class NettyFactory
if (receiveBufferSize > 0)
bootstrap.childOption(ChannelOption.SO_RCVBUF, receiveBufferSize);
- ChannelFuture channelFuture = bootstrap.bind(localAddr);
+ ChannelFuture channelFuture = bootstrap.bind(new InetSocketAddress(localAddr.address, localAddr.port));
if (!channelFuture.awaitUninterruptibly().isSuccess())
{
@@ -333,8 +334,9 @@ public final class NettyFactory
.option(ChannelOption.TCP_NODELAY, params.tcpNoDelay)
.option(ChannelOption.WRITE_BUFFER_WATER_MARK, params.waterMark)
.handler(new OutboundInitializer(params));
- bootstrap.localAddress(params.connectionId.local(), 0);
- bootstrap.remoteAddress(params.connectionId.connectionAddress());
+ bootstrap.localAddress(params.connectionId.local().address, 0);
+ InetAddressAndPort remoteAddress = params.connectionId.connectionAddress();
+ bootstrap.remoteAddress(new InetSocketAddress(remoteAddress.address, remoteAddress.port));
return bootstrap;
}
@@ -362,7 +364,8 @@ public final class NettyFactory
{
SslContext sslContext = SSLFactory.getSslContext(params.encryptionOptions, true, false);
// for some reason channel.remoteAddress() will return null
- InetSocketAddress peer = params.encryptionOptions.require_endpoint_verification ? params.connectionId.remoteAddress() : null;
+ InetAddressAndPort address = params.connectionId.remote();
+ InetSocketAddress peer = params.encryptionOptions.require_endpoint_verification ? new InetSocketAddress(address.address, address.port) : null;
SslHandler sslHandler = newSslHandler(channel, sslContext, peer);
logger.trace("creating outbound netty SslContext: context={}, engine={}", sslContext.getClass().getName(), sslHandler.engine().getClass().getName());
pipeline.addFirst(SSL_CHANNEL_HANDLER_NAME, sslHandler);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
index 6b2ff0d..f3cb554 100644
--- a/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
+++ b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
@@ -21,6 +21,8 @@ package org.apache.cassandra.net.async;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
/**
* Identifies an outbound messaging connection.
*
@@ -38,24 +40,24 @@ public class OutboundConnectionIdentifier
/**
* Memoization of the local node's broadcast address.
*/
- private final InetSocketAddress localAddr;
+ private final InetAddressAndPort localAddr;
/**
* The address by which the remote is identified. This may be different from {@link #remoteConnectionAddr} for
* something like EC2 public IP address which need to be used for communication between EC2 regions.
*/
- private final InetSocketAddress remoteAddr;
+ private final InetAddressAndPort remoteAddr;
/**
* The address to which we're connecting to the node (often the same as {@link #remoteAddr} but not always).
*/
- private final InetSocketAddress remoteConnectionAddr;
+ private final InetAddressAndPort remoteConnectionAddr;
private final ConnectionType connectionType;
- private OutboundConnectionIdentifier(InetSocketAddress localAddr,
- InetSocketAddress remoteAddr,
- InetSocketAddress remoteConnectionAddr,
+ private OutboundConnectionIdentifier(InetAddressAndPort localAddr,
+ InetAddressAndPort remoteAddr,
+ InetAddressAndPort remoteConnectionAddr,
ConnectionType connectionType)
{
this.localAddr = localAddr;
@@ -64,8 +66,8 @@ public class OutboundConnectionIdentifier
this.connectionType = connectionType;
}
- private OutboundConnectionIdentifier(InetSocketAddress localAddr,
- InetSocketAddress remoteAddr,
+ private OutboundConnectionIdentifier(InetAddressAndPort localAddr,
+ InetAddressAndPort remoteAddr,
ConnectionType connectionType)
{
this(localAddr, remoteAddr, remoteAddr, connectionType);
@@ -75,7 +77,7 @@ public class OutboundConnectionIdentifier
* Creates an identifier for a small message connection and using the remote "identifying" address as its connection
* address.
*/
- public static OutboundConnectionIdentifier small(InetSocketAddress localAddr, InetSocketAddress remoteAddr)
+ public static OutboundConnectionIdentifier small(InetAddressAndPort localAddr, InetAddressAndPort remoteAddr)
{
return new OutboundConnectionIdentifier(localAddr, remoteAddr, ConnectionType.SMALL_MESSAGE);
}
@@ -84,7 +86,7 @@ public class OutboundConnectionIdentifier
* Creates an identifier for a large message connection and using the remote "identifying" address as its connection
* address.
*/
- public static OutboundConnectionIdentifier large(InetSocketAddress localAddr, InetSocketAddress remoteAddr)
+ public static OutboundConnectionIdentifier large(InetAddressAndPort localAddr, InetAddressAndPort remoteAddr)
{
return new OutboundConnectionIdentifier(localAddr, remoteAddr, ConnectionType.LARGE_MESSAGE);
}
@@ -93,7 +95,7 @@ public class OutboundConnectionIdentifier
* Creates an identifier for a gossip connection and using the remote "identifying" address as its connection
* address.
*/
- public static OutboundConnectionIdentifier gossip(InetSocketAddress localAddr, InetSocketAddress remoteAddr)
+ public static OutboundConnectionIdentifier gossip(InetAddressAndPort localAddr, InetAddressAndPort remoteAddr)
{
return new OutboundConnectionIdentifier(localAddr, remoteAddr, ConnectionType.GOSSIP);
}
@@ -102,7 +104,7 @@ public class OutboundConnectionIdentifier
* Creates an identifier for a gossip connection and using the remote "identifying" address as its connection
* address.
*/
- public static OutboundConnectionIdentifier stream(InetSocketAddress localAddr, InetSocketAddress remoteAddr)
+ public static OutboundConnectionIdentifier stream(InetAddressAndPort localAddr, InetAddressAndPort remoteAddr)
{
return new OutboundConnectionIdentifier(localAddr, remoteAddr, ConnectionType.STREAM);
}
@@ -115,45 +117,37 @@ public class OutboundConnectionIdentifier
* @return a newly created connection identifier that differs from this one only by using {@code remoteConnectionAddr}
* as connection address to the remote.
*/
- public OutboundConnectionIdentifier withNewConnectionAddress(InetSocketAddress remoteConnectionAddr)
+ public OutboundConnectionIdentifier withNewConnectionAddress(InetAddressAndPort remoteConnectionAddr)
{
return new OutboundConnectionIdentifier(localAddr, remoteAddr, remoteConnectionAddr, connectionType);
}
public OutboundConnectionIdentifier withNewConnectionPort(int port)
{
- return new OutboundConnectionIdentifier(localAddr, new InetSocketAddress(remoteAddr.getAddress(), port),
- new InetSocketAddress(remoteConnectionAddr.getAddress(), port), connectionType);
+ return new OutboundConnectionIdentifier(localAddr, InetAddressAndPort.getByAddressOverrideDefaults(remoteAddr.address, port),
+ InetAddressAndPort.getByAddressOverrideDefaults(remoteConnectionAddr.address, port), connectionType);
}
/**
* The local node address.
*/
- public InetAddress local()
+ public InetAddressAndPort local()
{
- return localAddr.getAddress();
+ return localAddr;
}
/**
* The remote node identifying address (the one to use for anything else than connecting to the node).
*/
- public InetSocketAddress remoteAddress()
+ public InetAddressAndPort remote()
{
return remoteAddr;
}
/**
- * The remote node identifying address (the one to use for anything else than connecting to the node).
- */
- public InetAddress remote()
- {
- return remoteAddr.getAddress();
- }
-
- /**
* The remote node connection address (the one to use to actually connect to the remote, and only that).
*/
- public InetSocketAddress connectionAddress()
+ public InetAddressAndPort connectionAddress()
{
return remoteConnectionAddr;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java b/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
index 4522ba4..28775ef 100644
--- a/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
+++ b/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
@@ -43,6 +43,7 @@ import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.async.NettyFactory.Mode;
@@ -258,12 +259,12 @@ public class OutboundMessagingConnection
logger.debug("connection attempt {} to {}", connectAttemptCount, connectionId);
- InetSocketAddress remote = connectionId.remoteAddress();
- if (!authenticator.authenticate(remote.getAddress(), remote.getPort()))
+ InetAddressAndPort remote = connectionId.remote();
+ if (!authenticator.authenticate(remote.address, remote.port))
{
logger.warn("Internode auth failed connecting to {}", connectionId);
//Remove the connection pool and other thread so messages aren't queued
- MessagingService.instance().destroyConnectionPool(remote.getAddress());
+ MessagingService.instance().destroyConnectionPool(remote);
// don't update the state field as destroyConnectionPool() *should* call OMC.close()
// on all the connections in the OMP for the remoteAddress
@@ -284,7 +285,7 @@ public class OutboundMessagingConnection
}
@VisibleForTesting
- static boolean shouldCompressConnection(InetAddress localHost, InetAddress remoteHost)
+ static boolean shouldCompressConnection(InetAddressAndPort localHost, InetAddressAndPort remoteHost)
{
return (DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.all)
|| ((DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.dc) && !isLocalDC(localHost, remoteHost));
@@ -355,7 +356,7 @@ public class OutboundMessagingConnection
return null;
}
- static boolean isLocalDC(InetAddress localHost, InetAddress remoteHost)
+ static boolean isLocalDC(InetAddressAndPort localHost, InetAddressAndPort remoteHost)
{
String remoteDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(remoteHost);
String localDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(localHost);
@@ -585,7 +586,7 @@ public class OutboundMessagingConnection
* Any outstanding messages in the existing channel will still be sent to the previous address (we won't/can't move them from
* one channel to another).
*/
- void reconnectWithNewIp(InetSocketAddress newAddr)
+ void reconnectWithNewIp(InetAddressAndPort newAddr)
{
State currentState = state.get();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java b/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java
index 0086da8..c701229 100644
--- a/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java
+++ b/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.metrics.ConnectionMetrics;
import org.apache.cassandra.net.BackPressureState;
import org.apache.cassandra.net.MessageOut;
@@ -56,14 +57,14 @@ public class OutboundMessagingPool
* An override address on which to communicate with the peer. Typically used for something like EC2 public IP addresses
* which need to be used for communication between EC2 regions.
*/
- private InetSocketAddress preferredRemoteAddr;
+ private InetAddressAndPort preferredRemoteAddr;
- public OutboundMessagingPool(InetSocketAddress remoteAddr, InetSocketAddress localAddr, ServerEncryptionOptions encryptionOptions,
+ public OutboundMessagingPool(InetAddressAndPort remoteAddr, InetAddressAndPort localAddr, ServerEncryptionOptions encryptionOptions,
BackPressureState backPressureState, IInternodeAuthenticator authenticator)
{
preferredRemoteAddr = remoteAddr;
this.backPressureState = backPressureState;
- metrics = new ConnectionMetrics(localAddr.getAddress(), this);
+ metrics = new ConnectionMetrics(localAddr, this);
smallMessageChannel = new OutboundMessagingConnection(OutboundConnectionIdentifier.small(localAddr, preferredRemoteAddr),
@@ -76,10 +77,10 @@ public class OutboundMessagingPool
encryptionOptions, Optional.empty(), authenticator);
}
- private static Optional<CoalescingStrategy> coalescingStrategy(InetSocketAddress remoteAddr)
+ private static Optional<CoalescingStrategy> coalescingStrategy(InetAddressAndPort remoteAddr)
{
String strategyName = DatabaseDescriptor.getOtcCoalescingStrategy();
- String displayName = remoteAddr.getAddress().getHostAddress();
+ String displayName = remoteAddr.toString();
return CoalescingStrategies.newCoalescingStrategy(strategyName,
DatabaseDescriptor.getOtcCoalescingWindow(),
OutboundMessagingConnection.logger,
@@ -117,7 +118,7 @@ public class OutboundMessagingPool
*
* @param addr IP Address to use (and prefer) going forward for connecting to the peer
*/
- public void reconnectWithNewIp(InetSocketAddress addr)
+ public void reconnectWithNewIp(InetAddressAndPort addr)
{
preferredRemoteAddr = addr;
gossipChannel.reconnectWithNewIp(addr);
@@ -166,7 +167,7 @@ public class OutboundMessagingPool
return metrics.timeouts.getCount();
}
- public InetSocketAddress getPreferredRemoteAddr()
+ public InetAddressAndPort getPreferredRemoteAddr()
{
return preferredRemoteAddr;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
index 5464520..2c4fae4 100644
--- a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java
@@ -18,13 +18,13 @@
package org.apache.cassandra.repair;
-import java.net.InetAddress;
import java.util.List;
import java.util.UUID;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.StreamEvent;
@@ -41,15 +41,15 @@ public class AsymmetricLocalSyncTask extends AsymmetricSyncTask implements Strea
private final UUID pendingRepair;
private final TraceState state = Tracing.instance.get();
- public AsymmetricLocalSyncTask(RepairJobDesc desc, InetAddress fetchFrom, List<Range<Token>> rangesToFetch, UUID pendingRepair, PreviewKind previewKind)
+ public AsymmetricLocalSyncTask(RepairJobDesc desc, InetAddressAndPort fetchFrom, List<Range<Token>> rangesToFetch, UUID pendingRepair, PreviewKind previewKind)
{
- super(desc, FBUtilities.getBroadcastAddress(), fetchFrom, rangesToFetch, previewKind);
+ super(desc, FBUtilities.getBroadcastAddressAndPort(), fetchFrom, rangesToFetch, previewKind);
this.pendingRepair = pendingRepair;
}
public void startSync(List<Range<Token>> rangesToFetch)
{
- InetAddress preferred = SystemKeyspace.getPreferredIP(fetchFrom);
+ InetAddressAndPort preferred = SystemKeyspace.getPreferredIP(fetchFrom);
StreamPlan plan = new StreamPlan(StreamOperation.REPAIR,
1, false,
false,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
index d70975d..e24d854 100644
--- a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
@@ -18,12 +18,12 @@
package org.apache.cassandra.repair;
-import java.net.InetAddress;
import java.util.List;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.RepairException;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.messages.AsymmetricSyncRequest;
import org.apache.cassandra.streaming.PreviewKind;
@@ -33,14 +33,14 @@ import org.apache.cassandra.utils.FBUtilities;
public class AsymmetricRemoteSyncTask extends AsymmetricSyncTask implements CompletableRemoteSyncTask
{
- public AsymmetricRemoteSyncTask(RepairJobDesc desc, InetAddress fetchNode, InetAddress fetchFrom, List<Range<Token>> rangesToFetch, PreviewKind previewKind)
+ public AsymmetricRemoteSyncTask(RepairJobDesc desc, InetAddressAndPort fetchNode, InetAddressAndPort fetchFrom, List<Range<Token>> rangesToFetch, PreviewKind previewKind)
{
super(desc, fetchNode, fetchFrom, rangesToFetch, previewKind);
}
public void startSync(List<Range<Token>> rangesToFetch)
{
- InetAddress local = FBUtilities.getBroadcastAddress();
+ InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
AsymmetricSyncRequest request = new AsymmetricSyncRequest(desc, local, fetchingNode, fetchFrom, rangesToFetch, previewKind);
String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.fetchingNode, request.fetchFrom);
Tracing.traceRepair(message);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java
index fe00058..4d38e8a 100644
--- a/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.repair;
-import java.net.InetAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -30,6 +29,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.tracing.Tracing;
@@ -37,15 +37,15 @@ public abstract class AsymmetricSyncTask extends AbstractFuture<SyncStat> implem
{
private static Logger logger = LoggerFactory.getLogger(AsymmetricSyncTask.class);
protected final RepairJobDesc desc;
- protected final InetAddress fetchFrom;
+ protected final InetAddressAndPort fetchFrom;
protected final List<Range<Token>> rangesToFetch;
- protected final InetAddress fetchingNode;
+ protected final InetAddressAndPort fetchingNode;
protected final PreviewKind previewKind;
private long startTime = Long.MIN_VALUE;
protected volatile SyncStat stat;
- public AsymmetricSyncTask(RepairJobDesc desc, InetAddress fetchingNode, InetAddress fetchFrom, List<Range<Token>> rangesToFetch, PreviewKind previewKind)
+ public AsymmetricSyncTask(RepairJobDesc desc, InetAddressAndPort fetchingNode, InetAddressAndPort fetchFrom, List<Range<Token>> rangesToFetch, PreviewKind previewKind)
{
this.desc = desc;
this.fetchFrom = fetchFrom;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/LocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
index 8545b22..60d571b 100644
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.repair;
-import java.net.InetAddress;
import java.util.List;
import java.util.UUID;
@@ -28,7 +27,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.StreamEvent;
@@ -39,8 +38,6 @@ import org.apache.cassandra.streaming.StreamOperation;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.MerkleTree;
-import org.apache.cassandra.utils.MerkleTrees;
/**
* LocalSyncTask performs streaming between local(coordinator) node and remote replica.
@@ -62,7 +59,7 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler
}
@VisibleForTesting
- StreamPlan createStreamPlan(InetAddress dst, InetAddress preferred, List<Range<Token>> differences)
+ StreamPlan createStreamPlan(InetAddressAndPort dst, InetAddressAndPort preferred, List<Range<Token>> differences)
{
StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, false, pendingRepair, previewKind)
.listeners(this)
@@ -84,10 +81,10 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler
@Override
protected void startSync(List<Range<Token>> differences)
{
- InetAddress local = FBUtilities.getBroadcastAddress();
+ InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
// We can take anyone of the node as source or destination, however if one is localhost, we put at source to avoid a forwarding
- InetAddress dst = r2.endpoint.equals(local) ? r1.endpoint : r2.endpoint;
- InetAddress preferred = SystemKeyspace.getPreferredIP(dst);
+ InetAddressAndPort dst = r2.endpoint.equals(local) ? r1.endpoint : r2.endpoint;
+ InetAddressAndPort preferred = SystemKeyspace.getPreferredIP(dst);
String message = String.format("Performing streaming repair of %d ranges with %s", differences.size(), dst);
logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/NodePair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/NodePair.java b/src/java/org/apache/cassandra/repair/NodePair.java
index a73c61a..bfb237e 100644
--- a/src/java/org/apache/cassandra/repair/NodePair.java
+++ b/src/java/org/apache/cassandra/repair/NodePair.java
@@ -18,13 +18,13 @@
package org.apache.cassandra.repair;
import java.io.IOException;
-import java.net.InetAddress;
import com.google.common.base.Objects;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.CompactEndpointSerializationHelper;
/**
@@ -36,10 +36,10 @@ public class NodePair
{
public static IVersionedSerializer<NodePair> serializer = new NodePairSerializer();
- public final InetAddress endpoint1;
- public final InetAddress endpoint2;
+ public final InetAddressAndPort endpoint1;
+ public final InetAddressAndPort endpoint2;
- public NodePair(InetAddress endpoint1, InetAddress endpoint2)
+ public NodePair(InetAddressAndPort endpoint1, InetAddressAndPort endpoint2)
{
this.endpoint1 = endpoint1;
this.endpoint2 = endpoint2;
@@ -56,6 +56,12 @@ public class NodePair
}
@Override
+ public String toString()
+ {
+ return endpoint1.toString() + " - " + endpoint2.toString();
+ }
+
+ @Override
public int hashCode()
{
return Objects.hashCode(endpoint1, endpoint2);
@@ -65,20 +71,21 @@ public class NodePair
{
public void serialize(NodePair nodePair, DataOutputPlus out, int version) throws IOException
{
- CompactEndpointSerializationHelper.serialize(nodePair.endpoint1, out);
- CompactEndpointSerializationHelper.serialize(nodePair.endpoint2, out);
+ CompactEndpointSerializationHelper.instance.serialize(nodePair.endpoint1, out, version);
+ CompactEndpointSerializationHelper.instance.serialize(nodePair.endpoint2, out, version);
}
public NodePair deserialize(DataInputPlus in, int version) throws IOException
{
- InetAddress ep1 = CompactEndpointSerializationHelper.deserialize(in);
- InetAddress ep2 = CompactEndpointSerializationHelper.deserialize(in);
+ InetAddressAndPort ep1 = CompactEndpointSerializationHelper.instance.deserialize(in, version);
+ InetAddressAndPort ep2 = CompactEndpointSerializationHelper.instance.deserialize(in, version);
return new NodePair(ep1, ep2);
}
public long serializedSize(NodePair nodePair, int version)
{
- return 2 * CompactEndpointSerializationHelper.serializedSize(nodePair.endpoint1);
+ return CompactEndpointSerializationHelper.instance.serializedSize(nodePair.endpoint1, version)
+ + CompactEndpointSerializationHelper.instance.serializedSize(nodePair.endpoint2, version);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
index 93feb72..0a47f73 100644
--- a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.repair;
-import java.net.InetAddress;
import java.util.List;
import org.slf4j.Logger;
@@ -26,6 +25,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.RepairException;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.messages.SyncRequest;
import org.apache.cassandra.streaming.PreviewKind;
@@ -51,7 +51,7 @@ public class RemoteSyncTask extends SyncTask implements CompletableRemoteSyncTas
@Override
protected void startSync(List<Range<Token>> differences)
{
- InetAddress local = FBUtilities.getBroadcastAddress();
+ InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
SyncRequest request = new SyncRequest(desc, local, r1.endpoint, r2.endpoint, differences, previewKind);
String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.src, request.dst);
logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index 7b8eb92..48973d2 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.repair;
-import java.net.InetAddress;
import java.util.*;
import java.util.stream.Collectors;
@@ -33,6 +32,7 @@ import org.apache.cassandra.repair.asymmetric.DifferenceHolder;
import org.apache.cassandra.repair.asymmetric.HostDifferences;
import org.apache.cassandra.repair.asymmetric.PreferedNodeFilter;
import org.apache.cassandra.repair.asymmetric.ReduceHelper;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
@@ -83,14 +83,14 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
Keyspace ks = Keyspace.open(desc.keyspace);
ColumnFamilyStore cfs = ks.getColumnFamilyStore(desc.columnFamily);
cfs.metric.repairsStarted.inc();
- List<InetAddress> allEndpoints = new ArrayList<>(session.endpoints);
- allEndpoints.add(FBUtilities.getBroadcastAddress());
+ List<InetAddressAndPort> allEndpoints = new ArrayList<>(session.endpoints);
+ allEndpoints.add(FBUtilities.getBroadcastAddressAndPort());
ListenableFuture<List<TreeResponse>> validations;
// Create a snapshot at all nodes unless we're using pure parallel repairs
if (parallelismDegree != RepairParallelism.PARALLEL)
{
- ListenableFuture<List<InetAddress>> allSnapshotTasks;
+ ListenableFuture<List<InetAddressAndPort>> allSnapshotTasks;
if (isIncremental)
{
// consistent repair does it's own "snapshotting"
@@ -99,8 +99,8 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
else
{
// Request snapshot to all replica
- List<ListenableFuture<InetAddress>> snapshotTasks = new ArrayList<>(allEndpoints.size());
- for (InetAddress endpoint : allEndpoints)
+ List<ListenableFuture<InetAddressAndPort>> snapshotTasks = new ArrayList<>(allEndpoints.size());
+ for (InetAddressAndPort endpoint : allEndpoints)
{
SnapshotTask snapshotTask = new SnapshotTask(desc, endpoint);
snapshotTasks.add(snapshotTask);
@@ -110,9 +110,9 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
}
// When all snapshot complete, send validation requests
- validations = Futures.transformAsync(allSnapshotTasks, new AsyncFunction<List<InetAddress>, List<TreeResponse>>()
+ validations = Futures.transformAsync(allSnapshotTasks, new AsyncFunction<List<InetAddressAndPort>, List<TreeResponse>>()
{
- public ListenableFuture<List<TreeResponse>> apply(List<InetAddress> endpoints)
+ public ListenableFuture<List<TreeResponse>> apply(List<InetAddressAndPort> endpoints)
{
if (parallelismDegree == RepairParallelism.SEQUENTIAL)
return sendSequentialValidationRequest(endpoints);
@@ -164,7 +164,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
{
return trees ->
{
- InetAddress local = FBUtilities.getLocalAddress();
+ InetAddressAndPort local = FBUtilities.getLocalAddressAndPort();
List<SyncTask> syncTasks = new ArrayList<>();
// We need to difference all trees one against another
@@ -198,7 +198,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
{
return trees ->
{
- InetAddress local = FBUtilities.getLocalAddress();
+ InetAddressAndPort local = FBUtilities.getLocalAddressAndPort();
List<AsymmetricSyncTask> syncTasks = new ArrayList<>();
// We need to difference all trees one against another
@@ -210,16 +210,16 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
.filter(node -> getDC(streaming)
.equals(getDC(node)))
.collect(Collectors.toSet());
- ImmutableMap<InetAddress, HostDifferences> reducedDifferences = ReduceHelper.reduce(diffHolder, preferSameDCFilter);
+ ImmutableMap<InetAddressAndPort, HostDifferences> reducedDifferences = ReduceHelper.reduce(diffHolder, preferSameDCFilter);
for (int i = 0; i < trees.size(); i++)
{
- InetAddress address = trees.get(i).endpoint;
+ InetAddressAndPort address = trees.get(i).endpoint;
HostDifferences streamsFor = reducedDifferences.get(address);
if (streamsFor != null)
{
assert streamsFor.get(address).isEmpty() : "We should not fetch ranges from ourselves";
- for (InetAddress fetchFrom : streamsFor.hosts())
+ for (InetAddressAndPort fetchFrom : streamsFor.hosts())
{
List<Range<Token>> toFetch = streamsFor.get(fetchFrom);
logger.debug("{} is about to fetch {} from {}", address, toFetch, fetchFrom);
@@ -246,7 +246,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
};
}
- private String getDC(InetAddress address)
+ private String getDC(InetAddressAndPort address)
{
return DatabaseDescriptor.getEndpointSnitch().getDatacenter(address);
}
@@ -257,14 +257,14 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
* @param endpoints Endpoint addresses to send validation request
* @return Future that can get all {@link TreeResponse} from replica, if all validation succeed.
*/
- private ListenableFuture<List<TreeResponse>> sendValidationRequest(Collection<InetAddress> endpoints)
+ private ListenableFuture<List<TreeResponse>> sendValidationRequest(Collection<InetAddressAndPort> endpoints)
{
String message = String.format("Requesting merkle trees for %s (to %s)", desc.columnFamily, endpoints);
logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
Tracing.traceRepair(message);
int nowInSec = FBUtilities.nowInSeconds();
List<ListenableFuture<TreeResponse>> tasks = new ArrayList<>(endpoints.size());
- for (InetAddress endpoint : endpoints)
+ for (InetAddressAndPort endpoint : endpoints)
{
ValidationTask task = new ValidationTask(desc, endpoint, nowInSec, previewKind);
tasks.add(task);
@@ -277,7 +277,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
/**
* Creates {@link ValidationTask} and submit them to task executor so that tasks run sequentially.
*/
- private ListenableFuture<List<TreeResponse>> sendSequentialValidationRequest(Collection<InetAddress> endpoints)
+ private ListenableFuture<List<TreeResponse>> sendSequentialValidationRequest(Collection<InetAddressAndPort> endpoints)
{
String message = String.format("Requesting merkle trees for %s (to %s)", desc.columnFamily, endpoints);
logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
@@ -285,8 +285,8 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
int nowInSec = FBUtilities.nowInSeconds();
List<ListenableFuture<TreeResponse>> tasks = new ArrayList<>(endpoints.size());
- Queue<InetAddress> requests = new LinkedList<>(endpoints);
- InetAddress address = requests.poll();
+ Queue<InetAddressAndPort> requests = new LinkedList<>(endpoints);
+ InetAddressAndPort address = requests.poll();
ValidationTask firstTask = new ValidationTask(desc, address, nowInSec, previewKind);
logger.info("Validating {}", address);
session.waitForValidation(Pair.create(desc, address), firstTask);
@@ -294,7 +294,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
ValidationTask currentTask = firstTask;
while (requests.size() > 0)
{
- final InetAddress nextAddress = requests.poll();
+ final InetAddressAndPort nextAddress = requests.poll();
final ValidationTask nextTask = new ValidationTask(desc, nextAddress, nowInSec, previewKind);
tasks.add(nextTask);
Futures.addCallback(currentTask, new FutureCallback<TreeResponse>()
@@ -319,7 +319,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
/**
* Creates {@link ValidationTask} and submit them to task executor so that tasks run sequentially within each dc.
*/
- private ListenableFuture<List<TreeResponse>> sendDCAwareValidationRequest(Collection<InetAddress> endpoints)
+ private ListenableFuture<List<TreeResponse>> sendDCAwareValidationRequest(Collection<InetAddressAndPort> endpoints)
{
String message = String.format("Requesting merkle trees for %s (to %s)", desc.columnFamily, endpoints);
logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
@@ -327,11 +327,11 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
int nowInSec = FBUtilities.nowInSeconds();
List<ListenableFuture<TreeResponse>> tasks = new ArrayList<>(endpoints.size());
- Map<String, Queue<InetAddress>> requestsByDatacenter = new HashMap<>();
- for (InetAddress endpoint : endpoints)
+ Map<String, Queue<InetAddressAndPort>> requestsByDatacenter = new HashMap<>();
+ for (InetAddressAndPort endpoint : endpoints)
{
String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint);
- Queue<InetAddress> queue = requestsByDatacenter.get(dc);
+ Queue<InetAddressAndPort> queue = requestsByDatacenter.get(dc);
if (queue == null)
{
queue = new LinkedList<>();
@@ -340,10 +340,10 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
queue.add(endpoint);
}
- for (Map.Entry<String, Queue<InetAddress>> entry : requestsByDatacenter.entrySet())
+ for (Map.Entry<String, Queue<InetAddressAndPort>> entry : requestsByDatacenter.entrySet())
{
- Queue<InetAddress> requests = entry.getValue();
- InetAddress address = requests.poll();
+ Queue<InetAddressAndPort> requests = entry.getValue();
+ InetAddressAndPort address = requests.poll();
ValidationTask firstTask = new ValidationTask(desc, address, nowInSec, previewKind);
logger.info("Validating {}", address);
session.waitForValidation(Pair.create(desc, address), firstTask);
@@ -351,7 +351,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
ValidationTask currentTask = firstTask;
while (requests.size() > 0)
{
- final InetAddress nextAddress = requests.poll();
+ final InetAddressAndPort nextAddress = requests.poll();
final ValidationTask nextTask = new ValidationTask(desc, nextAddress, nowInSec, previewKind);
tasks.add(nextTask);
Futures.addCallback(currentTask, new FutureCallback<TreeResponse>()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index c26d4d1..4c0a564 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.repair;
-import java.net.InetAddress;
import java.util.*;
import com.google.common.base.Predicate;
@@ -28,10 +27,12 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ParameterType;
import org.apache.cassandra.repair.messages.*;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.ActiveRepairService;
@@ -225,11 +226,11 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
}
}
- private void logErrorAndSendFailureResponse(String errorMessage, InetAddress to, int id)
+ private void logErrorAndSendFailureResponse(String errorMessage, InetAddressAndPort to, int id)
{
logger.error(errorMessage);
MessageOut reply = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE)
- .withParameter(MessagingService.FAILURE_RESPONSE_PARAM, MessagingService.ONE_BYTE);
+ .withParameter(ParameterType.FAILURE_RESPONSE, MessagingService.ONE_BYTE);
MessagingService.instance().sendReply(reply, id, to);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 1c9778b..5121874 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -60,6 +60,7 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.repair.consistent.CoordinatorSession;
import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.repair.messages.RepairOption;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.QueryState;
@@ -141,10 +142,10 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
@VisibleForTesting
static class CommonRange
{
- public final Set<InetAddress> endpoints;
+ public final Set<InetAddressAndPort> endpoints;
public final Collection<Range<Token>> ranges;
- public CommonRange(Set<InetAddress> endpoints, Collection<Range<Token>> ranges)
+ public CommonRange(Set<InetAddressAndPort> endpoints, Collection<Range<Token>> ranges)
{
Preconditions.checkArgument(endpoints != null && !endpoints.isEmpty());
Preconditions.checkArgument(ranges != null && !ranges.isEmpty());
@@ -232,7 +233,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
traceState = null;
}
- final Set<InetAddress> allNeighbors = new HashSet<>();
+ final Set<InetAddressAndPort> allNeighbors = new HashSet<>();
List<CommonRange> commonRanges = new ArrayList<>();
//pre-calculate output of getLocalRanges and pass it to getNeighbors to increase performance and prevent
@@ -243,9 +244,9 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
{
for (Range<Token> range : options.getRanges())
{
- Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range,
- options.getDataCenters(),
- options.getHosts());
+ Set<InetAddressAndPort> neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range,
+ options.getDataCenters(),
+ options.getHosts());
addRangeToNeighbors(commonRanges, range, neighbors);
allNeighbors.addAll(neighbors);
@@ -286,7 +287,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
try (Timer.Context ctx = Keyspace.open(keyspace).metric.repairPrepareTime.time())
{
- ActiveRepairService.instance.prepareForRepair(parentSession, FBUtilities.getBroadcastAddress(), allNeighbors, options, columnFamilyStores);
+ ActiveRepairService.instance.prepareForRepair(parentSession, FBUtilities.getBroadcastAddressAndPort(), allNeighbors, options, columnFamilyStores);
progress.incrementAndGet();
}
catch (Throwable t)
@@ -362,7 +363,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
* removes dead nodes from common ranges, and exludes ranges left without any participants
*/
@VisibleForTesting
- static List<CommonRange> filterCommonRanges(List<CommonRange> commonRanges, Set<InetAddress> liveEndpoints, boolean force)
+ static List<CommonRange> filterCommonRanges(List<CommonRange> commonRanges, Set<InetAddressAndPort> liveEndpoints, boolean force)
{
if (!force)
{
@@ -374,7 +375,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
for (CommonRange commonRange: commonRanges)
{
- Set<InetAddress> endpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.endpoints, liveEndpoints::contains));
+ Set<InetAddressAndPort> endpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.endpoints, liveEndpoints::contains));
// this node is implicitly a participant in this repair, so a single endpoint is ok here
if (!endpoints.isEmpty())
@@ -391,15 +392,15 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
long startTime,
boolean forceRepair,
TraceState traceState,
- Set<InetAddress> allNeighbors,
+ Set<InetAddressAndPort> allNeighbors,
List<CommonRange> commonRanges,
String... cfnames)
{
// the local node also needs to be included in the set of participants, since coordinator sessions aren't persisted
- Predicate<InetAddress> isAlive = FailureDetector.instance::isAlive;
- Set<InetAddress> allParticipants = ImmutableSet.<InetAddress>builder()
+ Predicate<InetAddressAndPort> isAlive = FailureDetector.instance::isAlive;
+ Set<InetAddressAndPort> allParticipants = ImmutableSet.<InetAddressAndPort>builder()
.addAll(forceRepair ? Iterables.filter(allNeighbors, isAlive) : allNeighbors)
- .add(FBUtilities.getBroadcastAddress())
+ .add(FBUtilities.getBroadcastAddressAndPort())
.build();
List<CommonRange> allRanges = filterCommonRanges(commonRanges, allParticipants, forceRepair);
@@ -673,7 +674,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
ImmutableList.of(failureMessage, completionMessage));
}
- private void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, Set<InetAddress> neighbors)
+ private void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, Set<InetAddressAndPort> neighbors)
{
for (int i = 0; i < neighborRangeList.size(); i++)
{
@@ -708,7 +709,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
SelectStatement statement = (SelectStatement) QueryProcessor.parseStatement(query).prepare().statement;
ByteBuffer sessionIdBytes = ByteBufferUtil.bytes(sessionId);
- InetAddress source = FBUtilities.getBroadcastAddress();
+ InetAddressAndPort source = FBUtilities.getBroadcastAddressAndPort();
HashSet<UUID>[] seen = new HashSet[] { new HashSet<>(), new HashSet<>() };
int si = 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index 609ec56..91d767d 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.repair;
import java.io.IOException;
-import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -34,6 +33,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.*;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.SessionSummary;
import org.apache.cassandra.tracing.Tracing;
@@ -95,14 +95,14 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
/** Range to repair */
public final Collection<Range<Token>> ranges;
- public final Set<InetAddress> endpoints;
+ public final Set<InetAddressAndPort> endpoints;
public final boolean isIncremental;
public final PreviewKind previewKind;
private final AtomicBoolean isFailed = new AtomicBoolean(false);
// Each validation task waits response from replica in validating ConcurrentMap (keyed by CF name and endpoint address)
- private final ConcurrentMap<Pair<RepairJobDesc, InetAddress>, ValidationTask> validating = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Pair<RepairJobDesc, InetAddressAndPort>, ValidationTask> validating = new ConcurrentHashMap<>();
// Remote syncing jobs wait response in syncingTasks map
private final ConcurrentMap<Pair<RepairJobDesc, NodePair>, CompletableRemoteSyncTask> syncingTasks = new ConcurrentHashMap<>();
@@ -130,7 +130,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
Collection<Range<Token>> ranges,
String keyspace,
RepairParallelism parallelismDegree,
- Set<InetAddress> endpoints,
+ Set<InetAddressAndPort> endpoints,
boolean isIncremental,
boolean pullRepair,
boolean force,
@@ -152,8 +152,8 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
if (force)
{
logger.debug("force flag set, removing dead endpoints");
- final Set<InetAddress> removeCandidates = new HashSet<>();
- for (final InetAddress endpoint : endpoints)
+ final Set<InetAddressAndPort> removeCandidates = new HashSet<>();
+ for (final InetAddressAndPort endpoint : endpoints)
{
if (!FailureDetector.instance.isAlive(endpoint))
{
@@ -189,7 +189,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
return ranges;
}
- public void waitForValidation(Pair<RepairJobDesc, InetAddress> key, ValidationTask task)
+ public void waitForValidation(Pair<RepairJobDesc, InetAddressAndPort> key, ValidationTask task)
{
validating.put(key, task);
}
@@ -207,7 +207,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
* @param endpoint endpoint that sent merkle tree
* @param trees calculated merkle trees, or null if validation failed
*/
- public void validationComplete(RepairJobDesc desc, InetAddress endpoint, MerkleTrees trees)
+ public void validationComplete(RepairJobDesc desc, InetAddressAndPort endpoint, MerkleTrees trees)
{
ValidationTask task = validating.remove(Pair.create(desc, endpoint));
if (task == null)
@@ -245,8 +245,8 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
private String repairedNodes()
{
StringBuilder sb = new StringBuilder();
- sb.append(FBUtilities.getBroadcastAddress());
- for (InetAddress ep : endpoints)
+ sb.append(FBUtilities.getBroadcastAddressAndPort());
+ for (InetAddressAndPort ep : endpoints)
sb.append(", ").append(ep);
return sb.toString();
}
@@ -285,7 +285,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
}
// Checking all nodes are live
- for (InetAddress endpoint : endpoints)
+ for (InetAddressAndPort endpoint : endpoints)
{
if (!FailureDetector.instance.isAlive(endpoint) && !skippedReplicas)
{
@@ -353,23 +353,23 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
terminate();
}
- public void onJoin(InetAddress endpoint, EndpointState epState) {}
- public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
- public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
- public void onAlive(InetAddress endpoint, EndpointState state) {}
- public void onDead(InetAddress endpoint, EndpointState state) {}
+ public void onJoin(InetAddressAndPort endpoint, EndpointState epState) {}
+ public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
+ public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) {}
+ public void onAlive(InetAddressAndPort endpoint, EndpointState state) {}
+ public void onDead(InetAddressAndPort endpoint, EndpointState state) {}
- public void onRemove(InetAddress endpoint)
+ public void onRemove(InetAddressAndPort endpoint)
{
convict(endpoint, Double.MAX_VALUE);
}
- public void onRestart(InetAddress endpoint, EndpointState epState)
+ public void onRestart(InetAddressAndPort endpoint, EndpointState epState)
{
convict(endpoint, Double.MAX_VALUE);
}
- public void convict(InetAddress endpoint, double phi)
+ public void convict(InetAddressAndPort endpoint, double phi)
{
if (!endpoints.contains(endpoint))
return;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/SnapshotTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SnapshotTask.java b/src/java/org/apache/cassandra/repair/SnapshotTask.java
index 2b267a7..acc5186 100644
--- a/src/java/org/apache/cassandra/repair/SnapshotTask.java
+++ b/src/java/org/apache/cassandra/repair/SnapshotTask.java
@@ -17,13 +17,13 @@
*/
package org.apache.cassandra.repair;
-import java.net.InetAddress;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.AbstractFuture;
import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
@@ -32,12 +32,12 @@ import org.apache.cassandra.repair.messages.SnapshotMessage;
/**
* SnapshotTask is a task that sends snapshot request.
*/
-public class SnapshotTask extends AbstractFuture<InetAddress> implements RunnableFuture<InetAddress>
+public class SnapshotTask extends AbstractFuture<InetAddressAndPort> implements RunnableFuture<InetAddressAndPort>
{
private final RepairJobDesc desc;
- private final InetAddress endpoint;
+ private final InetAddressAndPort endpoint;
- public SnapshotTask(RepairJobDesc desc, InetAddress endpoint)
+ public SnapshotTask(RepairJobDesc desc, InetAddressAndPort endpoint)
{
this.desc = desc;
this.endpoint = endpoint;
@@ -74,7 +74,7 @@ public class SnapshotTask extends AbstractFuture<InetAddress> implements Runnabl
public boolean isLatencyForSnitch() { return false; }
- public void onFailure(InetAddress from, RequestFailureReason failureReason)
+ public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason)
{
//listener.failedSnapshot();
task.setException(new RuntimeException("Could not create snapshot at " + from));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index a1b7459..59fee0b 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.repair;
-import java.net.InetAddress;
import java.util.UUID;
import java.util.Collections;
import java.util.Collection;
@@ -29,6 +28,8 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.messages.SyncComplete;
import org.apache.cassandra.streaming.PreviewKind;
@@ -48,14 +49,14 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
private final RepairJobDesc desc;
private final boolean asymmetric;
- private final InetAddress initiator;
- private final InetAddress src;
- private final InetAddress dst;
+ private final InetAddressAndPort initiator;
+ private final InetAddressAndPort src;
+ private final InetAddressAndPort dst;
private final Collection<Range<Token>> ranges;
private final UUID pendingRepair;
private final PreviewKind previewKind;
- public StreamingRepairTask(RepairJobDesc desc, InetAddress initiator, InetAddress src, InetAddress dst, Collection<Range<Token>> ranges, UUID pendingRepair, PreviewKind previewKind, boolean asymmetric)
+ public StreamingRepairTask(RepairJobDesc desc, InetAddressAndPort initiator, InetAddressAndPort src, InetAddressAndPort dst, Collection<Range<Token>> ranges, UUID pendingRepair, PreviewKind previewKind, boolean asymmetric)
{
this.desc = desc;
this.initiator = initiator;
@@ -69,14 +70,14 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
public void run()
{
- InetAddress dest = dst;
- InetAddress preferred = SystemKeyspace.getPreferredIP(dest);
+ InetAddressAndPort dest = dst;
+ InetAddressAndPort preferred = SystemKeyspace.getPreferredIP(dest);
logger.info("[streaming task #{}] Performing streaming repair of {} ranges with {}", desc.sessionId, ranges.size(), dst);
createStreamPlan(dest, preferred).execute();
}
@VisibleForTesting
- StreamPlan createStreamPlan(InetAddress dest, InetAddress preferred)
+ StreamPlan createStreamPlan(InetAddressAndPort dest, InetAddressAndPort preferred)
{
StreamPlan sp = new StreamPlan(StreamOperation.REPAIR, 1, false, false, pendingRepair, previewKind)
.listeners(this)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
index 3770621..b46ae5e 100644
--- a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
+++ b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.repair;
import java.io.PrintWriter;
import java.io.StringWriter;
-import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
@@ -45,6 +44,7 @@ import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.repair.messages.RepairOption;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.KeyspaceParams;
@@ -74,48 +74,50 @@ public final class SystemDistributedKeyspace
private static final TableMetadata RepairHistory =
parse(REPAIR_HISTORY,
- "Repair history",
- "CREATE TABLE %s ("
- + "keyspace_name text,"
- + "columnfamily_name text,"
- + "id timeuuid,"
- + "parent_id timeuuid,"
- + "range_begin text,"
- + "range_end text,"
- + "coordinator inet,"
- + "participants set<inet>,"
- + "exception_message text,"
- + "exception_stacktrace text,"
- + "status text,"
- + "started_at timestamp,"
- + "finished_at timestamp,"
- + "PRIMARY KEY ((keyspace_name, columnfamily_name), id))");
+ "Repair history",
+ "CREATE TABLE %s ("
+ + "keyspace_name text,"
+ + "columnfamily_name text,"
+ + "id timeuuid,"
+ + "parent_id timeuuid,"
+ + "range_begin text,"
+ + "range_end text,"
+ + "coordinator inet,"
+ + "coordinator_port int,"
+ + "participants set<inet>,"
+ + "participants_v2 set<text>,"
+ + "exception_message text,"
+ + "exception_stacktrace text,"
+ + "status text,"
+ + "started_at timestamp,"
+ + "finished_at timestamp,"
+ + "PRIMARY KEY ((keyspace_name, columnfamily_name), id))");
private static final TableMetadata ParentRepairHistory =
parse(PARENT_REPAIR_HISTORY,
- "Repair history",
- "CREATE TABLE %s ("
- + "parent_id timeuuid,"
- + "keyspace_name text,"
- + "columnfamily_names set<text>,"
- + "started_at timestamp,"
- + "finished_at timestamp,"
- + "exception_message text,"
- + "exception_stacktrace text,"
- + "requested_ranges set<text>,"
- + "successful_ranges set<text>,"
- + "options map<text, text>,"
- + "PRIMARY KEY (parent_id))");
+ "Repair history",
+ "CREATE TABLE %s ("
+ + "parent_id timeuuid,"
+ + "keyspace_name text,"
+ + "columnfamily_names set<text>,"
+ + "started_at timestamp,"
+ + "finished_at timestamp,"
+ + "exception_message text,"
+ + "exception_stacktrace text,"
+ + "requested_ranges set<text>,"
+ + "successful_ranges set<text>,"
+ + "options map<text, text>,"
+ + "PRIMARY KEY (parent_id))");
private static final TableMetadata ViewBuildStatus =
parse(VIEW_BUILD_STATUS,
- "Materialized View build status",
- "CREATE TABLE %s ("
- + "keyspace_name text,"
- + "view_name text,"
- + "host_id uuid,"
- + "status text,"
- + "PRIMARY KEY ((keyspace_name, view_name), host_id))");
+ "Materialized View build status",
+ "CREATE TABLE %s ("
+ + "keyspace_name text,"
+ + "view_name text,"
+ + "host_id uuid,"
+ + "status text,"
+ + "PRIMARY KEY ((keyspace_name, view_name), host_id))");
private static TableMetadata parse(String table, String description, String cql)
{
@@ -184,17 +186,21 @@ public final class SystemDistributedKeyspace
processSilent(fmtQuery);
}
- public static void startRepairs(UUID id, UUID parent_id, String keyspaceName, String[] cfnames, Collection<Range<Token>> ranges, Iterable<InetAddress> endpoints)
+ public static void startRepairs(UUID id, UUID parent_id, String keyspaceName, String[] cfnames, Collection<Range<Token>> ranges, Iterable<InetAddressAndPort> endpoints)
{
- String coordinator = FBUtilities.getBroadcastAddress().getHostAddress();
- Set<String> participants = Sets.newHashSet(coordinator);
+ InetAddressAndPort coordinator = FBUtilities.getBroadcastAddressAndPort();
+ Set<String> participants = Sets.newHashSet();
+ Set<String> participants_v2 = Sets.newHashSet();
- for (InetAddress endpoint : endpoints)
- participants.add(endpoint.getHostAddress());
+ for (InetAddressAndPort endpoint : endpoints)
+ {
+ participants.add(endpoint.getHostAddress(false));
+ participants_v2.add(endpoint.toString());
+ }
String query =
- "INSERT INTO %s.%s (keyspace_name, columnfamily_name, id, parent_id, range_begin, range_end, coordinator, participants, status, started_at) " +
- "VALUES ( '%s', '%s', %s, %s, '%s', '%s', '%s', { '%s' }, '%s', toTimestamp(now()))";
+ "INSERT INTO %s.%s (keyspace_name, columnfamily_name, id, parent_id, range_begin, range_end, coordinator, coordinator_port, participants, participants_v2, status, started_at) " +
+ "VALUES ( '%s', '%s', %s, %s, '%s', '%s', '%s', %d, { '%s' }, { '%s' }, '%s', toTimestamp(now()))";
for (String cfname : cfnames)
{
@@ -207,8 +213,10 @@ public final class SystemDistributedKeyspace
parent_id.toString(),
range.left.toString(),
range.right.toString(),
- coordinator,
+ coordinator.getHostAddress(false),
+ coordinator.port,
Joiner.on("', '").join(participants),
+ Joiner.on("', '").join(participants_v2),
RepairState.STARTED.toString());
processSilent(fmtQry);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/TreeResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/TreeResponse.java b/src/java/org/apache/cassandra/repair/TreeResponse.java
index c898b36..8571caa 100644
--- a/src/java/org/apache/cassandra/repair/TreeResponse.java
+++ b/src/java/org/apache/cassandra/repair/TreeResponse.java
@@ -17,8 +17,7 @@
*/
package org.apache.cassandra.repair;
-import java.net.InetAddress;
-
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.utils.MerkleTrees;
/**
@@ -26,10 +25,10 @@ import org.apache.cassandra.utils.MerkleTrees;
*/
public class TreeResponse
{
- public final InetAddress endpoint;
+ public final InetAddressAndPort endpoint;
public final MerkleTrees trees;
- public TreeResponse(InetAddress endpoint, MerkleTrees trees)
+ public TreeResponse(InetAddressAndPort endpoint, MerkleTrees trees)
{
this.endpoint = endpoint;
this.trees = trees;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/ValidationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/ValidationTask.java b/src/java/org/apache/cassandra/repair/ValidationTask.java
index 175709f..fc500cf 100644
--- a/src/java/org/apache/cassandra/repair/ValidationTask.java
+++ b/src/java/org/apache/cassandra/repair/ValidationTask.java
@@ -17,11 +17,10 @@
*/
package org.apache.cassandra.repair;
-import java.net.InetAddress;
-
import com.google.common.util.concurrent.AbstractFuture;
import org.apache.cassandra.exceptions.RepairException;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.messages.ValidationRequest;
import org.apache.cassandra.streaming.PreviewKind;
@@ -34,11 +33,11 @@ import org.apache.cassandra.utils.MerkleTrees;
public class ValidationTask extends AbstractFuture<TreeResponse> implements Runnable
{
private final RepairJobDesc desc;
- private final InetAddress endpoint;
+ private final InetAddressAndPort endpoint;
private final int nowInSec;
private final PreviewKind previewKind;
- public ValidationTask(RepairJobDesc desc, InetAddress endpoint, int nowInSec, PreviewKind previewKind)
+ public ValidationTask(RepairJobDesc desc, InetAddressAndPort endpoint, int nowInSec, PreviewKind previewKind)
{
this.desc = desc;
this.endpoint = endpoint;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java
index 9803638..4c2856d 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.repair;
-import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
@@ -41,6 +40,7 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.messages.ValidationComplete;
import org.apache.cassandra.streaming.PreviewKind;
@@ -64,7 +64,7 @@ public class Validator implements Runnable
private static final Logger logger = LoggerFactory.getLogger(Validator.class);
public final RepairJobDesc desc;
- public final InetAddress initiator;
+ public final InetAddressAndPort initiator;
public final int nowInSec;
private final boolean evenTreeDistribution;
public final boolean isIncremental;
@@ -81,17 +81,17 @@ public class Validator implements Runnable
private final PreviewKind previewKind;
- public Validator(RepairJobDesc desc, InetAddress initiator, int nowInSec, PreviewKind previewKind)
+ public Validator(RepairJobDesc desc, InetAddressAndPort initiator, int nowInSec, PreviewKind previewKind)
{
this(desc, initiator, nowInSec, false, false, previewKind);
}
- public Validator(RepairJobDesc desc, InetAddress initiator, int nowInSec, boolean isIncremental, PreviewKind previewKind)
+ public Validator(RepairJobDesc desc, InetAddressAndPort initiator, int nowInSec, boolean isIncremental, PreviewKind previewKind)
{
this(desc, initiator, nowInSec, false, isIncremental, previewKind);
}
- public Validator(RepairJobDesc desc, InetAddress initiator, int nowInSec, boolean evenTreeDistribution, boolean isIncremental, PreviewKind previewKind)
+ public Validator(RepairJobDesc desc, InetAddressAndPort initiator, int nowInSec, boolean evenTreeDistribution, boolean isIncremental, PreviewKind previewKind)
{
this.desc = desc;
this.initiator = initiator;
@@ -352,7 +352,7 @@ public class Validator implements Runnable
public void run()
{
// respond to the request that triggered this validation
- if (!initiator.equals(FBUtilities.getBroadcastAddress()))
+ if (!initiator.equals(FBUtilities.getBroadcastAddressAndPort()))
{
logger.info("{} Sending completed merkle tree to {} for {}.{}", previewKind.logPrefix(desc.sessionId), initiator, desc.keyspace, desc.columnFamily);
Tracing.traceRepair("Sending completed merkle tree to {} for {}.{}", initiator, desc.keyspace, desc.columnFamily);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java b/src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java
index eb99977..c9b7ed7 100644
--- a/src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java
+++ b/src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.repair.asymmetric;
-import java.net.InetAddress;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -28,6 +27,7 @@ import com.google.common.collect.ImmutableMap;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.repair.TreeResponse;
import org.apache.cassandra.utils.MerkleTrees;
@@ -36,11 +36,11 @@ import org.apache.cassandra.utils.MerkleTrees;
*/
public class DifferenceHolder
{
- private final ImmutableMap<InetAddress, HostDifferences> differences;
+ private final ImmutableMap<InetAddressAndPort, HostDifferences> differences;
public DifferenceHolder(List<TreeResponse> trees)
{
- ImmutableMap.Builder<InetAddress, HostDifferences> diffBuilder = ImmutableMap.builder();
+ ImmutableMap.Builder<InetAddressAndPort, HostDifferences> diffBuilder = ImmutableMap.builder();
for (int i = 0; i < trees.size() - 1; ++i)
{
TreeResponse r1 = trees.get(i);
@@ -58,9 +58,9 @@ public class DifferenceHolder
}
@VisibleForTesting
- DifferenceHolder(Map<InetAddress, HostDifferences> differences)
+ DifferenceHolder(Map<InetAddressAndPort, HostDifferences> differences)
{
- ImmutableMap.Builder<InetAddress, HostDifferences> diffBuilder = ImmutableMap.builder();
+ ImmutableMap.Builder<InetAddressAndPort, HostDifferences> diffBuilder = ImmutableMap.builder();
diffBuilder.putAll(differences);
this.differences = diffBuilder.build();
}
@@ -68,12 +68,12 @@ public class DifferenceHolder
/**
* differences only holds one 'side' of the difference - if A and B mismatch, only A will be a key in the map
*/
- public Set<InetAddress> keyHosts()
+ public Set<InetAddressAndPort> keyHosts()
{
return differences.keySet();
}
- public HostDifferences get(InetAddress hostWithDifference)
+ public HostDifferences get(InetAddressAndPort hostWithDifference)
{
return differences.get(hostWithDifference);
}
@@ -85,7 +85,7 @@ public class DifferenceHolder
'}';
}
- public boolean hasDifferenceBetween(InetAddress node1, InetAddress node2, Range<Token> range)
+ public boolean hasDifferenceBetween(InetAddressAndPort node1, InetAddressAndPort node2, Range<Token> range)
{
HostDifferences diffsNode1 = differences.get(node1);
if (diffsNode1 != null && diffsNode1.hasDifferencesFor(node2, range))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java b/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java
index 6cbe987..ab294b9 100644
--- a/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java
+++ b/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.repair.asymmetric;
-import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -28,23 +27,24 @@ import java.util.Set;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
/**
* Tracks the differences for a single host
*/
public class HostDifferences
{
- private final Map<InetAddress, List<Range<Token>>> perHostDifferences = new HashMap<>();
+ private final Map<InetAddressAndPort, List<Range<Token>>> perHostDifferences = new HashMap<>();
/**
* Adds a set of differences between the node this instance is tracking and endpoint
*/
- public void add(InetAddress endpoint, List<Range<Token>> difference)
+ public void add(InetAddressAndPort endpoint, List<Range<Token>> difference)
{
perHostDifferences.put(endpoint, difference);
}
- public void addSingleRange(InetAddress remoteNode, Range<Token> rangeToFetch)
+ public void addSingleRange(InetAddressAndPort remoteNode, Range<Token> rangeToFetch)
{
perHostDifferences.computeIfAbsent(remoteNode, (x) -> new ArrayList<>()).add(rangeToFetch);
}
@@ -52,7 +52,7 @@ public class HostDifferences
/**
* Does this instance have differences for range with node2?
*/
- public boolean hasDifferencesFor(InetAddress node2, Range<Token> range)
+ public boolean hasDifferencesFor(InetAddressAndPort node2, Range<Token> range)
{
List<Range<Token>> differences = get(node2);
for (Range<Token> diff : differences)
@@ -64,12 +64,12 @@ public class HostDifferences
return false;
}
- public Set<InetAddress> hosts()
+ public Set<InetAddressAndPort> hosts()
{
return perHostDifferences.keySet();
}
- public List<Range<Token>> get(InetAddress differingHost)
+ public List<Range<Token>> get(InetAddressAndPort differingHost)
{
return perHostDifferences.getOrDefault(differingHost, Collections.emptyList());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java b/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java
index b41ddd8..450336f 100644
--- a/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java
+++ b/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.repair.asymmetric;
-import java.net.InetAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -29,6 +28,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
/**
* Tracks incoming streams for a single host
@@ -60,7 +60,7 @@ public class IncomingRepairStreamTracker
* @param range the range we need to stream from streamFromNode
* @param streamFromNode the node we should stream from
*/
- public void addIncomingRangeFrom(Range<Token> range, InetAddress streamFromNode)
+ public void addIncomingRangeFrom(Range<Token> range, InetAddressAndPort streamFromNode)
{
logger.trace("adding incoming range {} from {}", range, streamFromNode);
Set<Range<Token>> newInput = RangeDenormalizer.denormalize(range, incoming);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/repair/asymmetric/PreferedNodeFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/asymmetric/PreferedNodeFilter.java b/src/java/org/apache/cassandra/repair/asymmetric/PreferedNodeFilter.java
index 90788dc..e8ca85d 100644
--- a/src/java/org/apache/cassandra/repair/asymmetric/PreferedNodeFilter.java
+++ b/src/java/org/apache/cassandra/repair/asymmetric/PreferedNodeFilter.java
@@ -18,10 +18,11 @@
package org.apache.cassandra.repair.asymmetric;
-import java.net.InetAddress;
import java.util.Set;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
public interface PreferedNodeFilter
{
- public Set<InetAddress> apply(InetAddress streamingNode, Set<InetAddress> toStream);
+ public Set<InetAddressAndPort> apply(InetAddressAndPort streamingNode, Set<InetAddressAndPort> toStream);
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org