You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/01/25 20:12:00 UTC
[03/19] cassandra git commit: Allow storage port to be configurable
per node
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/async/InboundHandshakeHandlerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/InboundHandshakeHandlerTest.java b/test/unit/org/apache/cassandra/net/async/InboundHandshakeHandlerTest.java
index 4d9829f..93a1c20 100644
--- a/test/unit/org/apache/cassandra/net/async/InboundHandshakeHandlerTest.java
+++ b/test/unit/org/apache/cassandra/net/async/InboundHandshakeHandlerTest.java
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
+import com.google.common.net.InetAddresses;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -39,6 +40,7 @@ import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.compression.Lz4FrameDecoder;
import io.netty.handler.codec.compression.Lz4FrameEncoder;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.CompactEndpointSerializationHelper;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.async.HandshakeProtocol.FirstHandshakeMessage;
@@ -49,7 +51,7 @@ import static org.apache.cassandra.net.async.NettyFactory.Mode.MESSAGING;
public class InboundHandshakeHandlerTest
{
- private static final InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 0);
+ private static final InetAddressAndPort addr = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 0);
private static final int MESSAGING_VERSION = MessagingService.current_version;
private static final int VERSION_30 = MessagingService.VERSION_30;
@@ -84,7 +86,7 @@ public class InboundHandshakeHandlerTest
{
handler = new InboundHandshakeHandler(new TestAuthenticator(true));
channel = new EmbeddedChannel(handler);
- boolean result = handler.handleAuthenticate(addr, channel.pipeline().firstContext());
+ boolean result = handler.handleAuthenticate(new InetSocketAddress(addr.address, addr.port), channel.pipeline().firstContext());
Assert.assertTrue(result);
Assert.assertTrue(channel.isOpen());
}
@@ -92,7 +94,7 @@ public class InboundHandshakeHandlerTest
@Test
public void handleAuthenticate_Bad()
{
- boolean result = handler.handleAuthenticate(addr, channel.pipeline().firstContext());
+ boolean result = handler.handleAuthenticate(new InetSocketAddress(addr.address, addr.port), channel.pipeline().firstContext());
Assert.assertFalse(result);
Assert.assertFalse(channel.isOpen());
Assert.assertFalse(channel.isActive());
@@ -178,7 +180,7 @@ public class InboundHandshakeHandlerTest
if (buf.refCnt() > 0)
buf.release();
- buf = new ThirdHandshakeMessage(MESSAGING_VERSION, addr.getAddress()).encode(PooledByteBufAllocator.DEFAULT);
+ buf = new ThirdHandshakeMessage(MESSAGING_VERSION, addr).encode(PooledByteBufAllocator.DEFAULT);
state = handler.handleMessagingStartResponse(channel.pipeline().firstContext(), buf);
Assert.assertEquals(State.HANDSHAKE_COMPLETE, state);
@@ -203,7 +205,7 @@ public class InboundHandshakeHandlerTest
{
buf = Unpooled.buffer(32, 32);
buf.writeInt(MESSAGING_VERSION + 1);
- CompactEndpointSerializationHelper.serialize(addr.getAddress(), new ByteBufOutputStream(buf));
+ CompactEndpointSerializationHelper.instance.serialize(addr, new ByteBufDataOutputPlus(buf), MESSAGING_VERSION + 1);
State state = handler.handleMessagingStartResponse(channel.pipeline().firstContext(), buf);
Assert.assertEquals(State.HANDSHAKE_FAIL, state);
Assert.assertFalse(channel.isOpen());
@@ -215,7 +217,7 @@ public class InboundHandshakeHandlerTest
{
buf = Unpooled.buffer(32, 32);
buf.writeInt(MESSAGING_VERSION);
- CompactEndpointSerializationHelper.serialize(addr.getAddress(), new ByteBufOutputStream(buf));
+ CompactEndpointSerializationHelper.instance.serialize(addr, new ByteBufDataOutputPlus(buf), MESSAGING_VERSION);
State state = handler.handleMessagingStartResponse(channel.pipeline().firstContext(), buf);
Assert.assertEquals(State.HANDSHAKE_COMPLETE, state);
Assert.assertTrue(channel.isOpen());
@@ -228,7 +230,7 @@ public class InboundHandshakeHandlerTest
ChannelPipeline pipeline = channel.pipeline();
Assert.assertNotNull(pipeline.get(InboundHandshakeHandler.class));
- handler.setupMessagingPipeline(pipeline, addr.getAddress(), false, MESSAGING_VERSION);
+ handler.setupMessagingPipeline(pipeline, addr, false, MESSAGING_VERSION);
Assert.assertNotNull(pipeline.get(MessageInHandler.class));
Assert.assertNull(pipeline.get(Lz4FrameDecoder.class));
Assert.assertNull(pipeline.get(Lz4FrameEncoder.class));
@@ -241,7 +243,7 @@ public class InboundHandshakeHandlerTest
ChannelPipeline pipeline = channel.pipeline();
Assert.assertNotNull(pipeline.get(InboundHandshakeHandler.class));
- handler.setupMessagingPipeline(pipeline, addr.getAddress(), true, MESSAGING_VERSION);
+ handler.setupMessagingPipeline(pipeline, addr, true, MESSAGING_VERSION);
Assert.assertNotNull(pipeline.get(MessageInHandler.class));
Assert.assertNotNull(pipeline.get(Lz4FrameDecoder.class));
Assert.assertNull(pipeline.get(Lz4FrameEncoder.class));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/async/MessageInHandlerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/MessageInHandlerTest.java b/test/unit/org/apache/cassandra/net/async/MessageInHandlerTest.java
index bb82d2c..43cb964 100644
--- a/test/unit/org/apache/cassandra/net/async/MessageInHandlerTest.java
+++ b/test/unit/org/apache/cassandra/net/async/MessageInHandlerTest.java
@@ -20,40 +20,36 @@ package org.apache.cassandra.net.async;
import java.io.EOFException;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.function.BiConsumer;
-import com.google.common.base.Charsets;
+import com.google.common.net.InetAddresses;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufOutputStream;
-import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
-import io.netty.channel.Channel;
import io.netty.channel.embedded.EmbeddedChannel;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.util.concurrent.Future;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ParameterType;
import org.apache.cassandra.net.async.MessageInHandler.MessageHeader;
-import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.NanoTimeToCurrentTimeMillis;
+import org.apache.cassandra.utils.UUIDGen;
public class MessageInHandlerTest
{
- private static final InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 0);
+ private static final InetAddressAndPort addr = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 0);
private static final int MSG_VERSION = MessagingService.current_version;
private static final int MSG_ID = 42;
@@ -81,7 +77,7 @@ public class MessageInHandlerTest
buf.writeInt(-1);
buf.writerIndex(len);
- MessageInHandler handler = new MessageInHandler(addr.getAddress(), MSG_VERSION, null);
+ MessageInHandler handler = new MessageInHandler(addr, MSG_VERSION, null);
EmbeddedChannel channel = new EmbeddedChannel(handler);
Assert.assertTrue(channel.isOpen());
channel.writeInbound(buf);
@@ -98,22 +94,25 @@ public class MessageInHandlerTest
@Test
public void decode_HappyPath_WithParameters() throws Exception
{
- Map<String, byte[]> parameters = new HashMap<>();
- parameters.put("p1", "val1".getBytes(Charsets.UTF_8));
- parameters.put("p2", "val2".getBytes(Charsets.UTF_8));
+ UUID uuid = UUIDGen.getTimeUUID();
+ Map<ParameterType, Object> parameters = new HashMap<>();
+ parameters.put(ParameterType.FAILURE_REASON, (short)42);
+ parameters.put(ParameterType.TRACE_SESSION, uuid);
MessageInWrapper result = decode_HappyPath(parameters);
Assert.assertEquals(2, result.messageIn.parameters.size());
+ Assert.assertEquals((short)42, result.messageIn.parameters.get(ParameterType.FAILURE_REASON));
+ Assert.assertEquals(uuid, result.messageIn.parameters.get(ParameterType.TRACE_SESSION));
}
- private MessageInWrapper decode_HappyPath(Map<String, byte[]> parameters) throws Exception
+ private MessageInWrapper decode_HappyPath(Map<ParameterType, Object> parameters) throws Exception
{
MessageOut msgOut = new MessageOut(MessagingService.Verb.ECHO);
- for (Map.Entry<String, byte[]> param : parameters.entrySet())
+ for (Map.Entry<ParameterType, Object> param : parameters.entrySet())
msgOut = msgOut.withParameter(param.getKey(), param.getValue());
serialize(msgOut);
MessageInWrapper wrapper = new MessageInWrapper();
- MessageInHandler handler = new MessageInHandler(addr.getAddress(), MSG_VERSION, wrapper.messageConsumer);
+ MessageInHandler handler = new MessageInHandler(addr, MSG_VERSION, wrapper.messageConsumer);
List<Object> out = new ArrayList<>();
handler.decode(null, buf, out);
@@ -140,7 +139,7 @@ public class MessageInHandlerTest
public void decode_WithHalfReceivedParameters() throws Exception
{
MessageOut msgOut = new MessageOut(MessagingService.Verb.ECHO);
- msgOut = msgOut.withParameter("p3", "val1".getBytes(Charsets.UTF_8));
+ msgOut = msgOut.withParameter(ParameterType.FAILURE_REASON, (short)42);
serialize(msgOut);
@@ -150,7 +149,7 @@ public class MessageInHandlerTest
buf.writerIndex(originalWriterIndex - 6);
MessageInWrapper wrapper = new MessageInWrapper();
- MessageInHandler handler = new MessageInHandler(addr.getAddress(), MSG_VERSION, wrapper.messageConsumer);
+ MessageInHandler handler = new MessageInHandler(addr, MSG_VERSION, wrapper.messageConsumer);
List<Object> out = new ArrayList<>();
handler.decode(null, buf, out);
@@ -221,7 +220,7 @@ public class MessageInHandlerTest
@Test
public void exceptionHandled()
{
- MessageInHandler handler = new MessageInHandler(addr.getAddress(), MSG_VERSION, null);
+ MessageInHandler handler = new MessageInHandler(addr, MSG_VERSION, null);
EmbeddedChannel channel = new EmbeddedChannel(handler);
Assert.assertTrue(channel.isOpen());
handler.exceptionCaught(channel.pipeline().firstContext(), new EOFException());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/async/MessageOutHandlerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/MessageOutHandlerTest.java b/test/unit/org/apache/cassandra/net/async/MessageOutHandlerTest.java
index 566dfdb..86112ae 100644
--- a/test/unit/org/apache/cassandra/net/async/MessageOutHandlerTest.java
+++ b/test/unit/org/apache/cassandra/net/async/MessageOutHandlerTest.java
@@ -44,8 +44,10 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.ParameterType;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.UUIDGen;
@@ -65,15 +67,15 @@ public class MessageOutHandlerTest
}
@Before
- public void setup()
+ public void setup() throws Exception
{
setup(MessageOutHandler.AUTO_FLUSH_THRESHOLD);
}
- private void setup(int flushThreshold)
+ private void setup(int flushThreshold) throws Exception
{
- OutboundConnectionIdentifier connectionId = OutboundConnectionIdentifier.small(new InetSocketAddress("127.0.0.1", 0),
- new InetSocketAddress("127.0.0.2", 0));
+ OutboundConnectionIdentifier connectionId = OutboundConnectionIdentifier.small(InetAddressAndPort.getByNameOverrideDefaults("127.0.0.1", 0),
+ InetAddressAndPort.getByNameOverrideDefaults("127.0.0.2", 0));
OutboundMessagingConnection omc = new NonSendingOutboundMessagingConnection(connectionId, null, Optional.empty());
channel = new EmbeddedChannel();
channelWriter = ChannelWriter.create(channel, omc::handleMessageResult, Optional.empty());
@@ -91,7 +93,7 @@ public class MessageOutHandlerTest
}
@Test
- public void write_WithFlush() throws ExecutionException, InterruptedException, TimeoutException
+ public void write_WithFlush() throws Exception
{
setup(1);
MessageOut message = new MessageOut(MessagingService.Verb.ECHO);
@@ -217,7 +219,7 @@ public class MessageOutHandlerTest
public void captureTracingInfo_ForceException()
{
MessageOut message = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE)
- .withParameter(Tracing.TRACE_HEADER, new byte[9]);
+ .withParameter(ParameterType.TRACE_SESSION, new byte[9]);
handler.captureTracingInfo(new QueuedMessage(message, 42));
}
@@ -226,7 +228,7 @@ public class MessageOutHandlerTest
{
UUID uuid = UUID.randomUUID();
MessageOut message = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE)
- .withParameter(Tracing.TRACE_HEADER, UUIDGen.decompose(uuid));
+ .withParameter(ParameterType.TRACE_SESSION, uuid);
handler.captureTracingInfo(new QueuedMessage(message, 42));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java b/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java
index 0550490..4607d5c 100644
--- a/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java
+++ b/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.net.async;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Optional;
@@ -47,6 +46,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.async.NettyFactory.InboundInitializer;
import org.apache.cassandra.net.async.NettyFactory.OutboundInitializer;
@@ -56,8 +56,8 @@ import org.apache.cassandra.utils.NativeLibrary;
public class NettyFactoryTest
{
- private static final InetSocketAddress LOCAL_ADDR = new InetSocketAddress("127.0.0.1", 9876);
- private static final InetSocketAddress REMOTE_ADDR = new InetSocketAddress("127.0.0.2", 9876);
+ private static final InetAddressAndPort LOCAL_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 9876);
+ private static final InetAddressAndPort REMOTE_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 9876);
private static final int receiveBufferSize = 1 << 16;
private static final IInternodeAuthenticator AUTHENTICATOR = new AllowAllInternodeAuthenticator();
private static final boolean EPOLL_AVAILABLE = NativeTransportService.useEpoll();
@@ -129,7 +129,6 @@ public class NettyFactoryTest
Channel inboundChannel = null;
try
{
- InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 9876);
InboundInitializer inboundInitializer = new InboundInitializer(AUTHENTICATOR, null, channelGroup);
inboundChannel = NettyFactory.instance.createInboundChannel(LOCAL_ADDR, inboundInitializer, receiveBufferSize);
NettyFactory.instance.createInboundChannel(LOCAL_ADDR, inboundInitializer, receiveBufferSize);
@@ -144,7 +143,7 @@ public class NettyFactoryTest
@Test(expected = ConfigurationException.class)
public void createServerChannel_UnbindableAddress()
{
- InetSocketAddress addr = new InetSocketAddress("1.1.1.1", 9876);
+ InetAddressAndPort addr = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("1.1.1.1"), 9876);
InboundInitializer inboundInitializer = new InboundInitializer(AUTHENTICATOR, null, channelGroup);
NettyFactory.instance.createInboundChannel(addr, inboundInitializer, receiveBufferSize);
}
@@ -162,10 +161,10 @@ public class NettyFactoryTest
Assert.assertEquals(2, NettyFactory.determineAcceptGroupSize(serverEncryptionOptions));
serverEncryptionOptions.enable_legacy_ssl_storage_port = false;
- InetAddress originalBroadcastAddr = FBUtilities.getBroadcastAddress();
+ InetAddressAndPort originalBroadcastAddr = FBUtilities.getBroadcastAddressAndPort();
try
{
- FBUtilities.setBroadcastInetAddress(InetAddresses.increment(FBUtilities.getLocalAddress()));
+ FBUtilities.setBroadcastInetAddress(InetAddresses.increment(FBUtilities.getLocalAddressAndPort().address));
DatabaseDescriptor.setListenOnBroadcastAddress(true);
serverEncryptionOptions.enabled = false;
@@ -178,7 +177,7 @@ public class NettyFactoryTest
}
finally
{
- FBUtilities.setBroadcastInetAddress(originalBroadcastAddr);
+ FBUtilities.setBroadcastInetAddress(originalBroadcastAddr.address);
DatabaseDescriptor.setListenOnBroadcastAddress(false);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/async/OutboundHandshakeHandlerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/OutboundHandshakeHandlerTest.java b/test/unit/org/apache/cassandra/net/async/OutboundHandshakeHandlerTest.java
index f8bfab1..be71fd4 100644
--- a/test/unit/org/apache/cassandra/net/async/OutboundHandshakeHandlerTest.java
+++ b/test/unit/org/apache/cassandra/net/async/OutboundHandshakeHandlerTest.java
@@ -23,6 +23,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
+import com.google.common.net.InetAddresses;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -38,6 +39,7 @@ import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.compression.Lz4FrameDecoder;
import io.netty.handler.codec.compression.Lz4FrameEncoder;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.async.HandshakeProtocol.SecondHandshakeMessage;
import org.apache.cassandra.net.async.OutboundHandshakeHandler.HandshakeResult;
@@ -47,8 +49,8 @@ import static org.apache.cassandra.net.async.OutboundHandshakeHandler.HandshakeR
public class OutboundHandshakeHandlerTest
{
private static final int MESSAGING_VERSION = MessagingService.current_version;
- private static final InetSocketAddress localAddr = new InetSocketAddress("127.0.0.1", 0);
- private static final InetSocketAddress remoteAddr = new InetSocketAddress("127.0.0.2", 0);
+ private static final InetAddressAndPort localAddr = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 0);
+ private static final InetAddressAndPort remoteAddr = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 0);
private static final String HANDLER_NAME = "clientHandshakeHandler";
private EmbeddedChannel channel;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java b/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
index 641c28c..bf6e066 100644
--- a/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
+++ b/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLHandshakeException;
+import com.google.common.net.InetAddresses;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -45,6 +46,7 @@ import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.AbstractEndpointSnitch;
import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.MessagingServiceTest;
@@ -59,9 +61,9 @@ import static org.apache.cassandra.net.async.OutboundMessagingConnection.State.R
public class OutboundMessagingConnectionTest
{
- private static final InetSocketAddress LOCAL_ADDR = new InetSocketAddress("127.0.0.1", 9998);
- private static final InetSocketAddress REMOTE_ADDR = new InetSocketAddress("127.0.0.2", 9999);
- private static final InetSocketAddress RECONNECT_ADDR = new InetSocketAddress("127.0.0.3", 9999);
+ private static final InetAddressAndPort LOCAL_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 9998);
+ private static final InetAddressAndPort REMOTE_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 9999);
+ private static final InetAddressAndPort RECONNECT_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.3"), 9999);
private static final int MESSAGING_VERSION = MessagingService.current_version;
private OutboundConnectionIdentifier connectionId;
@@ -131,47 +133,47 @@ public class OutboundMessagingConnectionTest
public void shouldCompressConnection_None()
{
DatabaseDescriptor.setInternodeCompression(Config.InternodeCompression.none);
- Assert.assertFalse(OutboundMessagingConnection.shouldCompressConnection(LOCAL_ADDR.getAddress(), REMOTE_ADDR.getAddress()));
+ Assert.assertFalse(OutboundMessagingConnection.shouldCompressConnection(LOCAL_ADDR, REMOTE_ADDR));
}
@Test
public void shouldCompressConnection_All()
{
DatabaseDescriptor.setInternodeCompression(Config.InternodeCompression.all);
- Assert.assertTrue(OutboundMessagingConnection.shouldCompressConnection(LOCAL_ADDR.getAddress(), REMOTE_ADDR.getAddress()));
+ Assert.assertTrue(OutboundMessagingConnection.shouldCompressConnection(LOCAL_ADDR, REMOTE_ADDR));
}
@Test
public void shouldCompressConnection_SameDc()
{
TestSnitch snitch = new TestSnitch();
- snitch.add(LOCAL_ADDR.getAddress(), "dc1");
- snitch.add(REMOTE_ADDR.getAddress(), "dc1");
+ snitch.add(LOCAL_ADDR, "dc1");
+ snitch.add(REMOTE_ADDR, "dc1");
DatabaseDescriptor.setEndpointSnitch(snitch);
DatabaseDescriptor.setInternodeCompression(Config.InternodeCompression.dc);
- Assert.assertFalse(OutboundMessagingConnection.shouldCompressConnection(LOCAL_ADDR.getAddress(), REMOTE_ADDR.getAddress()));
+ Assert.assertFalse(OutboundMessagingConnection.shouldCompressConnection(LOCAL_ADDR, REMOTE_ADDR));
}
private static class TestSnitch extends AbstractEndpointSnitch
{
- private Map<InetAddress, String> nodeToDc = new HashMap<>();
+ private Map<InetAddressAndPort, String> nodeToDc = new HashMap<>();
- void add(InetAddress node, String dc)
+ void add(InetAddressAndPort node, String dc)
{
nodeToDc.put(node, dc);
}
- public String getRack(InetAddress endpoint)
+ public String getRack(InetAddressAndPort endpoint)
{
return null;
}
- public String getDatacenter(InetAddress endpoint)
+ public String getDatacenter(InetAddressAndPort endpoint)
{
return nodeToDc.get(endpoint);
}
- public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+ public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2)
{
return 0;
}
@@ -181,11 +183,11 @@ public class OutboundMessagingConnectionTest
public void shouldCompressConnection_DifferentDc()
{
TestSnitch snitch = new TestSnitch();
- snitch.add(LOCAL_ADDR.getAddress(), "dc1");
- snitch.add(REMOTE_ADDR.getAddress(), "dc2");
+ snitch.add(LOCAL_ADDR, "dc1");
+ snitch.add(REMOTE_ADDR, "dc2");
DatabaseDescriptor.setEndpointSnitch(snitch);
DatabaseDescriptor.setInternodeCompression(Config.InternodeCompression.dc);
- Assert.assertTrue(OutboundMessagingConnection.shouldCompressConnection(LOCAL_ADDR.getAddress(), REMOTE_ADDR.getAddress()));
+ Assert.assertTrue(OutboundMessagingConnection.shouldCompressConnection(LOCAL_ADDR, REMOTE_ADDR));
}
@Test
@@ -247,7 +249,7 @@ public class OutboundMessagingConnectionTest
MessageOut messageOut = new MessageOut(MessagingService.Verb.GOSSIP_DIGEST_ACK);
OutboundMessagingPool pool = new OutboundMessagingPool(REMOTE_ADDR, LOCAL_ADDR, null,
- new MessagingServiceTest.MockBackPressureStrategy(null).newState(REMOTE_ADDR.getAddress()), auth);
+ new MessagingServiceTest.MockBackPressureStrategy(null).newState(REMOTE_ADDR), auth);
omc = pool.getConnection(messageOut);
Assert.assertSame(State.NOT_READY, omc.getState());
Assert.assertFalse(omc.connect());
@@ -371,7 +373,7 @@ public class OutboundMessagingConnectionTest
Assert.assertFalse(channelWriter.isClosed());
Assert.assertEquals(channelWriter, omc.getChannelWriter());
Assert.assertEquals(READY, omc.getState());
- Assert.assertEquals(MESSAGING_VERSION, MessagingService.instance().getVersion(REMOTE_ADDR.getAddress()));
+ Assert.assertEquals(MESSAGING_VERSION, MessagingService.instance().getVersion(REMOTE_ADDR));
Assert.assertNull(omc.getConnectionTimeoutFuture());
Assert.assertTrue(connectionTimeoutFuture.isCancelled());
}
@@ -391,7 +393,7 @@ public class OutboundMessagingConnectionTest
Assert.assertTrue(channelWriter.isClosed());
Assert.assertNull(omc.getChannelWriter());
Assert.assertEquals(CLOSED, omc.getState());
- Assert.assertEquals(MESSAGING_VERSION, MessagingService.instance().getVersion(REMOTE_ADDR.getAddress()));
+ Assert.assertEquals(MESSAGING_VERSION, MessagingService.instance().getVersion(REMOTE_ADDR));
Assert.assertNull(omc.getConnectionTimeoutFuture());
Assert.assertTrue(connectionTimeoutFuture.isCancelled());
}
@@ -408,7 +410,7 @@ public class OutboundMessagingConnectionTest
omc.finishHandshake(result);
Assert.assertNotNull(omc.getChannelWriter());
Assert.assertEquals(CREATING_CHANNEL, omc.getState());
- Assert.assertEquals(MESSAGING_VERSION, MessagingService.instance().getVersion(REMOTE_ADDR.getAddress()));
+ Assert.assertEquals(MESSAGING_VERSION, MessagingService.instance().getVersion(REMOTE_ADDR));
Assert.assertEquals(count, omc.backlogSize());
}
@@ -423,7 +425,7 @@ public class OutboundMessagingConnectionTest
HandshakeResult result = HandshakeResult.failed();
omc.finishHandshake(result);
Assert.assertEquals(NOT_READY, omc.getState());
- Assert.assertEquals(MESSAGING_VERSION, MessagingService.instance().getVersion(REMOTE_ADDR.getAddress()));
+ Assert.assertEquals(MESSAGING_VERSION, MessagingService.instance().getVersion(REMOTE_ADDR));
Assert.assertEquals(0, omc.backlogSize());
}
@@ -512,8 +514,8 @@ public class OutboundMessagingConnectionTest
OutboundConnectionIdentifier connectionId = omc.getConnectionId();
omc.maybeUpdateConnectionId();
Assert.assertNotEquals(connectionId, omc.getConnectionId());
- Assert.assertEquals(new InetSocketAddress(REMOTE_ADDR.getAddress(), DatabaseDescriptor.getSSLStoragePort()), omc.getConnectionId().remoteAddress());
- Assert.assertEquals(new InetSocketAddress(REMOTE_ADDR.getAddress(), DatabaseDescriptor.getSSLStoragePort()), omc.getConnectionId().connectionAddress());
+ Assert.assertEquals(InetAddressAndPort.getByAddressOverrideDefaults(REMOTE_ADDR.address, DatabaseDescriptor.getSSLStoragePort()), omc.getConnectionId().remote());
+ Assert.assertEquals(InetAddressAndPort.getByAddressOverrideDefaults(REMOTE_ADDR.address, DatabaseDescriptor.getSSLStoragePort()), omc.getConnectionId().connectionAddress());
Assert.assertEquals(peerVersion, omc.getTargetVersion());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/async/OutboundMessagingPoolTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/OutboundMessagingPoolTest.java b/test/unit/org/apache/cassandra/net/async/OutboundMessagingPoolTest.java
index 655cd15..ecd8697 100644
--- a/test/unit/org/apache/cassandra/net/async/OutboundMessagingPoolTest.java
+++ b/test/unit/org/apache/cassandra/net/async/OutboundMessagingPoolTest.java
@@ -18,10 +18,10 @@
package org.apache.cassandra.net.async;
-import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
+import com.google.common.net.InetAddresses;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -35,6 +35,7 @@ import org.apache.cassandra.gms.GossipDigestSyn;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.BackPressureState;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
@@ -42,9 +43,9 @@ import org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionTyp
public class OutboundMessagingPoolTest
{
- private static final InetSocketAddress LOCAL_ADDR = new InetSocketAddress("127.0.0.1", 9476);
- private static final InetSocketAddress REMOTE_ADDR = new InetSocketAddress("127.0.0.2", 9476);
- private static final InetSocketAddress RECONNECT_ADDR = new InetSocketAddress("127.0.0.3", 9476);
+ private static final InetAddressAndPort LOCAL_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 9476);
+ private static final InetAddressAndPort REMOTE_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 9476);
+ private static final InetAddressAndPort RECONNECT_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.3"), 9476);
private static final List<ConnectionType> INTERNODE_MESSAGING_CONN_TYPES = new ArrayList<ConnectionType>()
{{ add(ConnectionType.GOSSIP); add(ConnectionType.LARGE_MESSAGE); add(ConnectionType.SMALL_MESSAGE); }};
@@ -59,7 +60,7 @@ public class OutboundMessagingPoolTest
@Before
public void setup()
{
- BackPressureState backPressureState = DatabaseDescriptor.getBackPressureStrategy().newState(REMOTE_ADDR.getAddress());
+ BackPressureState backPressureState = DatabaseDescriptor.getBackPressureStrategy().newState(REMOTE_ADDR);
pool = new OutboundMessagingPool(REMOTE_ADDR, LOCAL_ADDR, null, backPressureState, new AllowAllInternodeAuthenticator());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java b/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
index 21c51c6..970e648 100644
--- a/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
+++ b/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.repair;
-import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Set;
import java.util.UUID;
@@ -33,6 +32,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -41,19 +41,19 @@ import org.apache.cassandra.utils.UUIDGen;
@Ignore
public abstract class AbstractRepairTest
{
- protected static final InetAddress COORDINATOR;
- protected static final InetAddress PARTICIPANT1;
- protected static final InetAddress PARTICIPANT2;
- protected static final InetAddress PARTICIPANT3;
+ protected static final InetAddressAndPort COORDINATOR;
+ protected static final InetAddressAndPort PARTICIPANT1;
+ protected static final InetAddressAndPort PARTICIPANT2;
+ protected static final InetAddressAndPort PARTICIPANT3;
static
{
try
{
- COORDINATOR = InetAddress.getByName("10.0.0.1");
- PARTICIPANT1 = InetAddress.getByName("10.0.0.1");
- PARTICIPANT2 = InetAddress.getByName("10.0.0.2");
- PARTICIPANT3 = InetAddress.getByName("10.0.0.3");
+ COORDINATOR = InetAddressAndPort.getByName("10.0.0.1");
+ PARTICIPANT1 = InetAddressAndPort.getByName("10.0.0.1");
+ PARTICIPANT2 = InetAddressAndPort.getByName("10.0.0.2");
+ PARTICIPANT3 = InetAddressAndPort.getByName("10.0.0.3");
}
catch (UnknownHostException e)
{
@@ -64,7 +64,7 @@ public abstract class AbstractRepairTest
DatabaseDescriptor.daemonInitialization();
}
- protected static final Set<InetAddress> PARTICIPANTS = ImmutableSet.of(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3);
+ protected static final Set<InetAddressAndPort> PARTICIPANTS = ImmutableSet.of(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3);
protected static Token t(int v)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
index 7f3dbff..95046bd 100644
--- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.repair;
-import java.net.InetAddress;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
@@ -36,6 +35,7 @@ import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
@@ -78,8 +78,8 @@ public class LocalSyncTaskTest extends AbstractRepairTest
@Test
public void testNoDifference() throws Throwable
{
- final InetAddress ep1 = InetAddress.getByName("127.0.0.1");
- final InetAddress ep2 = InetAddress.getByName("127.0.0.1");
+ final InetAddressAndPort ep1 = InetAddressAndPort.getByName("127.0.0.1");
+ final InetAddressAndPort ep2 = InetAddressAndPort.getByName("127.0.0.1");
Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range));
@@ -106,7 +106,7 @@ public class LocalSyncTaskTest extends AbstractRepairTest
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
- ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddress(),
+ ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddressAndPort(),
Arrays.asList(cfs), Arrays.asList(range), false,
ActiveRepairService.UNREPAIRED_SSTABLE, false,
PreviewKind.NONE);
@@ -128,8 +128,8 @@ public class LocalSyncTaskTest extends AbstractRepairTest
// difference the trees
// note: we reuse the same endpoint which is bogus in theory but fine here
- TreeResponse r1 = new TreeResponse(InetAddress.getByName("127.0.0.1"), tree1);
- TreeResponse r2 = new TreeResponse(InetAddress.getByName("127.0.0.2"), tree2);
+ TreeResponse r1 = new TreeResponse(InetAddressAndPort.getByName("127.0.0.1"), tree1);
+ TreeResponse r2 = new TreeResponse(InetAddressAndPort.getByName("127.0.0.2"), tree2);
LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, false, PreviewKind.NONE);
task.run();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java b/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java
index db76f73..2044106 100644
--- a/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Test;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.repair.RepairRunnable.CommonRange;
import static org.apache.cassandra.repair.RepairRunnable.filterCommonRanges;
@@ -53,7 +54,7 @@ public class RepairRunnableTest extends AbstractRepairTest
{
CommonRange cr1 = new CommonRange(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2), Sets.newHashSet(RANGE1, RANGE2));
CommonRange cr2 = new CommonRange(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3), Sets.newHashSet(RANGE3));
- Set<InetAddress> liveEndpoints = Sets.newHashSet(PARTICIPANT2, PARTICIPANT3); // PARTICIPANT1 is excluded
+ Set<InetAddressAndPort> liveEndpoints = Sets.newHashSet(PARTICIPANT2, PARTICIPANT3); // PARTICIPANT1 is excluded
List<CommonRange> initial = Lists.newArrayList(cr1, cr2);
List<CommonRange> expected = Lists.newArrayList(new CommonRange(Sets.newHashSet(PARTICIPANT2), Sets.newHashSet(RANGE1, RANGE2)),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
index 984218d..54f0511 100644
--- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
@@ -19,7 +19,6 @@
package org.apache.cassandra.repair;
import java.io.IOException;
-import java.net.InetAddress;
import java.util.Arrays;
import java.util.Set;
import java.util.UUID;
@@ -35,7 +34,7 @@ import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.UUIDGen;
@@ -54,7 +53,7 @@ public class RepairSessionTest
@Test
public void testConviction() throws Exception
{
- InetAddress remote = InetAddress.getByName("127.0.0.2");
+ InetAddressAndPort remote = InetAddressAndPort.getByName("127.0.0.2");
Gossiper.instance.initializeNodeUnsafe(remote, UUID.randomUUID(), 1);
// Set up RepairSession
@@ -62,7 +61,7 @@ public class RepairSessionTest
UUID sessionId = UUID.randomUUID();
IPartitioner p = Murmur3Partitioner.instance;
Range<Token> repairRange = new Range<>(p.getToken(ByteBufferUtil.bytes(0)), p.getToken(ByteBufferUtil.bytes(100)));
- Set<InetAddress> endpoints = Sets.newHashSet(remote);
+ Set<InetAddressAndPort> endpoints = Sets.newHashSet(remote);
RepairSession session = new RepairSession(parentSessionId, sessionId, Arrays.asList(repairRange),
"Keyspace1", RepairParallelism.SEQUENTIAL,
endpoints, false, false, false,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index b45edc1..322772a 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.repair;
-import java.net.InetAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
@@ -34,6 +33,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.db.BufferDecoratedKey;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -95,7 +95,7 @@ public class ValidatorTest
final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
- InetAddress remote = InetAddress.getByName("127.0.0.2");
+ InetAddressAndPort remote = InetAddressAndPort.getByName("127.0.0.2");
ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily);
@@ -134,7 +134,7 @@ public class ValidatorTest
final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
- InetAddress remote = InetAddress.getByName("127.0.0.2");
+ InetAddressAndPort remote = InetAddressAndPort.getByName("127.0.0.2");
Validator validator = new Validator(desc, remote, 0, PreviewKind.NONE);
validator.fail();
@@ -189,12 +189,12 @@ public class ValidatorTest
cfs.getTableName(), Collections.singletonList(new Range<>(sstable.first.getToken(),
sstable.last.getToken())));
- ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddress(),
+ ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddressAndPort(),
Collections.singletonList(cfs), desc.ranges, false, ActiveRepairService.UNREPAIRED_SSTABLE,
false, PreviewKind.NONE);
final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
- Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), 0, true, false, PreviewKind.NONE);
+ Validator validator = new Validator(desc, FBUtilities.getBroadcastAddressAndPort(), 0, true, false, PreviewKind.NONE);
CompactionManager.instance.submitValidation(cfs, validator);
MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
@@ -218,7 +218,7 @@ public class ValidatorTest
final CompletableFuture<MessageOut> future = new CompletableFuture<>();
MessagingService.instance().addMessageSink(new IMessageSink()
{
- public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
+ public boolean allowOutgoingMessage(MessageOut message, int id, InetAddressAndPort to)
{
future.complete(message);
return false;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/asymmetric/DifferenceHolderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/asymmetric/DifferenceHolderTest.java b/test/unit/org/apache/cassandra/repair/asymmetric/DifferenceHolderTest.java
index 52a43e6..9693010 100644
--- a/test/unit/org/apache/cassandra/repair/asymmetric/DifferenceHolderTest.java
+++ b/test/unit/org/apache/cassandra/repair/asymmetric/DifferenceHolderTest.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.repair.asymmetric;
-import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Iterator;
@@ -30,6 +29,7 @@ import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.repair.TreeResponse;
import org.apache.cassandra.utils.MerkleTree;
import org.apache.cassandra.utils.MerkleTrees;
@@ -43,8 +43,8 @@ public class DifferenceHolderTest
@Test
public void testFromEmptyMerkleTrees() throws UnknownHostException
{
- InetAddress a1 = InetAddress.getByName("127.0.0.1");
- InetAddress a2 = InetAddress.getByName("127.0.0.2");
+ InetAddressAndPort a1 = InetAddressAndPort.getByName("127.0.0.1");
+ InetAddressAndPort a2 = InetAddressAndPort.getByName("127.0.0.2");
MerkleTrees mt1 = new MerkleTrees(Murmur3Partitioner.instance);
MerkleTrees mt2 = new MerkleTrees(Murmur3Partitioner.instance);
@@ -64,8 +64,8 @@ public class DifferenceHolderTest
IPartitioner partitioner = Murmur3Partitioner.instance;
Range<Token> fullRange = new Range<>(partitioner.getMinimumToken(), partitioner.getMinimumToken());
int maxsize = 16;
- InetAddress a1 = InetAddress.getByName("127.0.0.1");
- InetAddress a2 = InetAddress.getByName("127.0.0.2");
+ InetAddressAndPort a1 = InetAddressAndPort.getByName("127.0.0.1");
+ InetAddressAndPort a2 = InetAddressAndPort.getByName("127.0.0.2");
// merkle tree building stolen from MerkleTreesTest:
MerkleTrees mt1 = new MerkleTrees(partitioner);
MerkleTrees mt2 = new MerkleTrees(partitioner);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/asymmetric/ReduceHelperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/asymmetric/ReduceHelperTest.java b/test/unit/org/apache/cassandra/repair/asymmetric/ReduceHelperTest.java
index 19c42fb..6c64b1a 100644
--- a/test/unit/org/apache/cassandra/repair/asymmetric/ReduceHelperTest.java
+++ b/test/unit/org/apache/cassandra/repair/asymmetric/ReduceHelperTest.java
@@ -19,11 +19,9 @@
package org.apache.cassandra.repair.asymmetric;
-import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -33,13 +31,13 @@ import java.util.Map;
import java.util.Set;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.junit.Test;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
import static junit.framework.TestCase.fail;
import static org.junit.Assert.assertEquals;
@@ -47,24 +45,24 @@ import static org.junit.Assert.assertTrue;
public class ReduceHelperTest
{
- private static final InetAddress[] addresses;
- private static final InetAddress A;
- private static final InetAddress B;
- private static final InetAddress C;
- private static final InetAddress D;
- private static final InetAddress E;
+ private static final InetAddressAndPort[] addresses;
+ private static final InetAddressAndPort A;
+ private static final InetAddressAndPort B;
+ private static final InetAddressAndPort C;
+ private static final InetAddressAndPort D;
+ private static final InetAddressAndPort E;
static
{
try
{
- A = InetAddress.getByName("127.0.0.0");
- B = InetAddress.getByName("127.0.0.1");
- C = InetAddress.getByName("127.0.0.2");
- D = InetAddress.getByName("127.0.0.3");
- E = InetAddress.getByName("127.0.0.4");
+ A = InetAddressAndPort.getByName("127.0.0.0");
+ B = InetAddressAndPort.getByName("127.0.0.1");
+ C = InetAddressAndPort.getByName("127.0.0.2");
+ D = InetAddressAndPort.getByName("127.0.0.3");
+ E = InetAddressAndPort.getByName("127.0.0.4");
// for diff creation in loops:
- addresses = new InetAddress[]{ A, B, C, D, E };
+ addresses = new InetAddressAndPort[]{ A, B, C, D, E };
}
catch (UnknownHostException e)
{
@@ -89,7 +87,7 @@ public class ReduceHelperTest
C x x
D =
*/
- Map<InetAddress, HostDifferences> differences = new HashMap<>();
+ Map<InetAddressAndPort, HostDifferences> differences = new HashMap<>();
for (int i = 0; i < 4; i++)
{
HostDifferences hostDiffs = new HostDifferences();
@@ -105,7 +103,7 @@ public class ReduceHelperTest
}
DifferenceHolder differenceHolder = new DifferenceHolder(differences);
- Map<InetAddress, IncomingRepairStreamTracker> tracker = ReduceHelper.createIncomingRepairStreamTrackers(differenceHolder);
+ Map<InetAddressAndPort, IncomingRepairStreamTracker> tracker = ReduceHelper.createIncomingRepairStreamTrackers(differenceHolder);
assertEquals(set(set(C), set(E,D)), streams(tracker.get(A)));
assertEquals(set(set(C), set(E,D)), streams(tracker.get(B)));
@@ -113,7 +111,7 @@ public class ReduceHelperTest
assertEquals(set(set(A,B), set(C)), streams(tracker.get(D)));
assertEquals(set(set(A,B), set(C)), streams(tracker.get(E)));
- ImmutableMap<InetAddress, HostDifferences> reduced = ReduceHelper.reduce(differenceHolder, (x,y) -> y);
+ ImmutableMap<InetAddressAndPort, HostDifferences> reduced = ReduceHelper.reduce(differenceHolder, (x, y) -> y);
HostDifferences n0 = reduced.get(A);
assertEquals(0, n0.get(A).size());
@@ -163,7 +161,7 @@ public class ReduceHelperTest
C x x
D =
*/
- Map<InetAddress, HostDifferences> differences = new HashMap<>();
+ Map<InetAddressAndPort, HostDifferences> differences = new HashMap<>();
for (int i = 0; i < 4; i++)
{
HostDifferences hostDifferences = new HostDifferences();
@@ -179,7 +177,7 @@ public class ReduceHelperTest
}
DifferenceHolder differenceHolder = new DifferenceHolder(differences);
- Map<InetAddress, IncomingRepairStreamTracker> tracker = ReduceHelper.createIncomingRepairStreamTrackers(differenceHolder);
+ Map<InetAddressAndPort, IncomingRepairStreamTracker> tracker = ReduceHelper.createIncomingRepairStreamTrackers(differenceHolder);
assertEquals(set(set(C), set(E, D)), streams(tracker.get(A)));
assertEquals(set(set(C), set(E, D)), streams(tracker.get(B)));
assertEquals(set(set(A, B), set(E, D)), streams(tracker.get(C)));
@@ -187,7 +185,7 @@ public class ReduceHelperTest
assertEquals(set(set(A, B), set(C)), streams(tracker.get(E)));
// if there is an option, never stream from node 1:
- ImmutableMap<InetAddress, HostDifferences> reduced = ReduceHelper.reduce(differenceHolder, (x,y) -> Sets.difference(y, set(B)));
+ ImmutableMap<InetAddressAndPort, HostDifferences> reduced = ReduceHelper.reduce(differenceHolder, (x,y) -> Sets.difference(y, set(B)));
HostDifferences n0 = reduced.get(A);
assertEquals(0, n0.get(A).size());
@@ -223,7 +221,7 @@ public class ReduceHelperTest
assertEquals(0, n4.get(E).size());
}
- private Iterable<Set<InetAddress>> streams(IncomingRepairStreamTracker incomingRepairStreamTracker)
+ private Iterable<Set<InetAddressAndPort>> streams(IncomingRepairStreamTracker incomingRepairStreamTracker)
{
return incomingRepairStreamTracker.getIncoming().values().iterator().next().allStreams();
}
@@ -248,12 +246,12 @@ public class ReduceHelperTest
B streams (0, 50] from {C}, (50, 100] from {A, C}
C streams (0, 50] from {A, B}, (50, 100] from B
*/
- Map<InetAddress, HostDifferences> differences = new HashMap<>();
+ Map<InetAddressAndPort, HostDifferences> differences = new HashMap<>();
addDifference(A, differences, B, list(range(50, 100)));
addDifference(A, differences, C, list(range(0, 50)));
addDifference(B, differences, C, list(range(0, 100)));
DifferenceHolder differenceHolder = new DifferenceHolder(differences);
- Map<InetAddress, IncomingRepairStreamTracker> tracker = ReduceHelper.createIncomingRepairStreamTrackers(differenceHolder);
+ Map<InetAddressAndPort, IncomingRepairStreamTracker> tracker = ReduceHelper.createIncomingRepairStreamTrackers(differenceHolder);
assertEquals(set(set(C)), tracker.get(A).getIncoming().get(range(0, 50)).allStreams());
assertEquals(set(set(B)), tracker.get(A).getIncoming().get(range(50, 100)).allStreams());
assertEquals(set(set(C)), tracker.get(B).getIncoming().get(range(0, 50)).allStreams());
@@ -261,7 +259,7 @@ public class ReduceHelperTest
assertEquals(set(set(A,B)), tracker.get(C).getIncoming().get(range(0, 50)).allStreams());
assertEquals(set(set(B)), tracker.get(C).getIncoming().get(range(50, 100)).allStreams());
- ImmutableMap<InetAddress, HostDifferences> reduced = ReduceHelper.reduce(differenceHolder, (x, y) -> y);
+ ImmutableMap<InetAddressAndPort, HostDifferences> reduced = ReduceHelper.reduce(differenceHolder, (x, y) -> y);
HostDifferences n0 = reduced.get(A);
@@ -270,7 +268,7 @@ public class ReduceHelperTest
HostDifferences n1 = reduced.get(B);
assertEquals(0, n1.get(B).size());
- if (n1.get(A) != null)
+ if (!n1.get(A).isEmpty())
{
assertTrue(n1.get(C).equals(list(range(0, 50))));
assertTrue(n1.get(A).equals(list(range(50, 100))));
@@ -281,7 +279,7 @@ public class ReduceHelperTest
}
HostDifferences n2 = reduced.get(C);
assertEquals(0, n2.get(C).size());
- if (n2.get(A) != null)
+ if (!n2.get(A).isEmpty())
{
assertTrue(n2.get(A).equals(list(range(0,50))));
assertTrue(n2.get(B).equals(list(range(50, 100))));
@@ -312,13 +310,13 @@ public class ReduceHelperTest
B == C on (5, 10], (40, 45]
*/
- Map<InetAddress, HostDifferences> differences = new HashMap<>();
+ Map<InetAddressAndPort, HostDifferences> differences = new HashMap<>();
addDifference(A, differences, B, list(range(5, 45)));
addDifference(A, differences, C, list(range(0, 10), range(40,50)));
addDifference(B, differences, C, list(range(0, 5), range(10,40), range(45,50)));
DifferenceHolder differenceHolder = new DifferenceHolder(differences);
- Map<InetAddress, IncomingRepairStreamTracker> tracker = ReduceHelper.createIncomingRepairStreamTrackers(differenceHolder);
+ Map<InetAddressAndPort, IncomingRepairStreamTracker> tracker = ReduceHelper.createIncomingRepairStreamTrackers(differenceHolder);
Map<Range<Token>, StreamFromOptions> ranges = tracker.get(A).getIncoming();
assertEquals(5, ranges.size());
@@ -344,21 +342,21 @@ public class ReduceHelperTest
assertEquals(set(set(B)), ranges.get(range(10, 40)).allStreams());
assertEquals(set(set(A)), ranges.get(range(40, 45)).allStreams());
assertEquals(set(set(A,B)), ranges.get(range(45, 50)).allStreams());
- ImmutableMap<InetAddress, HostDifferences> reduced = ReduceHelper.reduce(differenceHolder, (x, y) -> y);
+ ImmutableMap<InetAddressAndPort, HostDifferences> reduced = ReduceHelper.reduce(differenceHolder, (x, y) -> y);
assertNoOverlap(A, reduced.get(A), list(range(0, 50)));
assertNoOverlap(B, reduced.get(B), list(range(0, 50)));
assertNoOverlap(C, reduced.get(C), list(range(0, 50)));
}
- private void assertNoOverlap(InetAddress incomingNode, HostDifferences node, List<Range<Token>> expectedAfterNormalize)
+ private void assertNoOverlap(InetAddressAndPort incomingNode, HostDifferences node, List<Range<Token>> expectedAfterNormalize)
{
Set<Range<Token>> allRanges = new HashSet<>();
- Set<InetAddress> remoteNodes = Sets.newHashSet(A,B,C);
+ Set<InetAddressAndPort> remoteNodes = Sets.newHashSet(A,B,C);
remoteNodes.remove(incomingNode);
- Iterator<InetAddress> iter = remoteNodes.iterator();
+ Iterator<InetAddressAndPort> iter = remoteNodes.iterator();
allRanges.addAll(node.get(iter.next()));
- InetAddress i = iter.next();
+ InetAddressAndPort i = iter.next();
for (Range<Token> r : node.get(i))
{
for (Range<Token> existing : allRanges)
@@ -379,14 +377,14 @@ public class ReduceHelperTest
return ranges;
}
- private static Set<InetAddress> set(InetAddress ... elem)
+ private static Set<InetAddressAndPort> set(InetAddressAndPort ... elem)
{
return Sets.newHashSet(elem);
}
@SafeVarargs
- private static Set<Set<InetAddress>> set(Set<InetAddress> ... elem)
+ private static Set<Set<InetAddressAndPort>> set(Set<InetAddressAndPort> ... elem)
{
- Set<Set<InetAddress>> ret = Sets.newHashSet();
+ Set<Set<InetAddressAndPort>> ret = Sets.newHashSet();
ret.addAll(Arrays.asList(elem));
return ret;
}
@@ -418,7 +416,7 @@ public class ReduceHelperTest
assertTrue(r1.size() > 0 ^ r2.size() > 0);
}
- private void addDifference(InetAddress host1, Map<InetAddress, HostDifferences> differences, InetAddress host2, List<Range<Token>> ranges)
+ private void addDifference(InetAddressAndPort host1, Map<InetAddressAndPort, HostDifferences> differences, InetAddressAndPort host2, List<Range<Token>> ranges)
{
differences.computeIfAbsent(host1, (x) -> new HostDifferences()).add(host2, ranges);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/asymmetric/StreamFromOptionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/asymmetric/StreamFromOptionsTest.java b/test/unit/org/apache/cassandra/repair/asymmetric/StreamFromOptionsTest.java
index 3ba3cfe..e2a7700 100644
--- a/test/unit/org/apache/cassandra/repair/asymmetric/StreamFromOptionsTest.java
+++ b/test/unit/org/apache/cassandra/repair/asymmetric/StreamFromOptionsTest.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.repair.asymmetric;
-import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.HashSet;
@@ -30,6 +29,7 @@ import org.junit.Test;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
import static junit.framework.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -41,16 +41,16 @@ public class StreamFromOptionsTest
public void addAllDiffingTest() throws UnknownHostException
{
StreamFromOptions sfo = new StreamFromOptions(new MockDiffs(true), range(0, 10));
- Set<InetAddress> toAdd = new HashSet<>();
- toAdd.add(InetAddress.getByName("127.0.0.1"));
- toAdd.add(InetAddress.getByName("127.0.0.2"));
- toAdd.add(InetAddress.getByName("127.0.0.3"));
+ Set<InetAddressAndPort> toAdd = new HashSet<>();
+ toAdd.add(InetAddressAndPort.getByName("127.0.0.1"));
+ toAdd.add(InetAddressAndPort.getByName("127.0.0.2"));
+ toAdd.add(InetAddressAndPort.getByName("127.0.0.3"));
toAdd.forEach(sfo::add);
// if all added have differences, each set will contain a single host
assertEquals(3, Iterables.size(sfo.allStreams()));
- Set<InetAddress> allStreams = new HashSet<>();
- for (Set<InetAddress> streams : sfo.allStreams())
+ Set<InetAddressAndPort> allStreams = new HashSet<>();
+ for (Set<InetAddressAndPort> streams : sfo.allStreams())
{
assertEquals(1, streams.size());
allStreams.addAll(streams);
@@ -62,10 +62,10 @@ public class StreamFromOptionsTest
public void addAllMatchingTest() throws UnknownHostException
{
StreamFromOptions sfo = new StreamFromOptions(new MockDiffs(false), range(0, 10));
- Set<InetAddress> toAdd = new HashSet<>();
- toAdd.add(InetAddress.getByName("127.0.0.1"));
- toAdd.add(InetAddress.getByName("127.0.0.2"));
- toAdd.add(InetAddress.getByName("127.0.0.3"));
+ Set<InetAddressAndPort> toAdd = new HashSet<>();
+ toAdd.add(InetAddressAndPort.getByName("127.0.0.1"));
+ toAdd.add(InetAddressAndPort.getByName("127.0.0.2"));
+ toAdd.add(InetAddressAndPort.getByName("127.0.0.3"));
toAdd.forEach(sfo::add);
// if all added match, the set will contain all hosts
@@ -83,10 +83,10 @@ public class StreamFromOptionsTest
private void splitTestHelper(boolean diffing) throws UnknownHostException
{
StreamFromOptions sfo = new StreamFromOptions(new MockDiffs(diffing), range(0, 10));
- Set<InetAddress> toAdd = new HashSet<>();
- toAdd.add(InetAddress.getByName("127.0.0.1"));
- toAdd.add(InetAddress.getByName("127.0.0.2"));
- toAdd.add(InetAddress.getByName("127.0.0.3"));
+ Set<InetAddressAndPort> toAdd = new HashSet<>();
+ toAdd.add(InetAddressAndPort.getByName("127.0.0.1"));
+ toAdd.add(InetAddressAndPort.getByName("127.0.0.2"));
+ toAdd.add(InetAddressAndPort.getByName("127.0.0.3"));
toAdd.forEach(sfo::add);
StreamFromOptions sfo1 = sfo.copy(range(0, 5));
StreamFromOptions sfo2 = sfo.copy(range(5, 10));
@@ -95,8 +95,8 @@ public class StreamFromOptionsTest
assertEquals(range(5, 10), sfo2.range);
assertTrue(Iterables.elementsEqual(sfo1.allStreams(), sfo2.allStreams()));
// verify the backing set is not shared between the copies:
- sfo1.add(InetAddress.getByName("127.0.0.4"));
- sfo2.add(InetAddress.getByName("127.0.0.5"));
+ sfo1.add(InetAddressAndPort.getByName("127.0.0.4"));
+ sfo2.add(InetAddressAndPort.getByName("127.0.0.5"));
assertFalse(Iterables.elementsEqual(sfo1.allStreams(), sfo2.allStreams()));
}
@@ -116,7 +116,7 @@ public class StreamFromOptionsTest
}
@Override
- public boolean hasDifferenceBetween(InetAddress node1, InetAddress node2, Range<Token> range)
+ public boolean hasDifferenceBetween(InetAddressAndPort node1, InetAddressAndPort node2, Range<Token> range)
{
return hasDifference;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java
index 367fea9..4570328 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.repair.consistent;
-import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Set;
import java.util.UUID;
@@ -33,6 +32,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -41,19 +41,19 @@ import org.apache.cassandra.utils.UUIDGen;
@Ignore
public abstract class AbstractConsistentSessionTest
{
- protected static final InetAddress COORDINATOR;
- protected static final InetAddress PARTICIPANT1;
- protected static final InetAddress PARTICIPANT2;
- protected static final InetAddress PARTICIPANT3;
+ protected static final InetAddressAndPort COORDINATOR;
+ protected static final InetAddressAndPort PARTICIPANT1;
+ protected static final InetAddressAndPort PARTICIPANT2;
+ protected static final InetAddressAndPort PARTICIPANT3;
static
{
try
{
- COORDINATOR = InetAddress.getByName("10.0.0.1");
- PARTICIPANT1 = InetAddress.getByName("10.0.0.1");
- PARTICIPANT2 = InetAddress.getByName("10.0.0.2");
- PARTICIPANT3 = InetAddress.getByName("10.0.0.3");
+ COORDINATOR = InetAddressAndPort.getByName("10.0.0.1");
+ PARTICIPANT1 = InetAddressAndPort.getByName("10.0.0.1");
+ PARTICIPANT2 = InetAddressAndPort.getByName("10.0.0.2");
+ PARTICIPANT3 = InetAddressAndPort.getByName("10.0.0.3");
}
catch (UnknownHostException e)
{
@@ -64,7 +64,7 @@ public abstract class AbstractConsistentSessionTest
DatabaseDescriptor.daemonInitialization();
}
- protected static final Set<InetAddress> PARTICIPANTS = ImmutableSet.of(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3);
+ protected static final Set<InetAddressAndPort> PARTICIPANTS = ImmutableSet.of(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3);
protected static Token t(int v)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
index fb312c3..5d054d3 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.repair.consistent;
-import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -35,6 +34,7 @@ import com.google.common.util.concurrent.SettableFuture;
import org.junit.Assert;
import org.junit.Test;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.repair.AbstractRepairTest;
import org.apache.cassandra.repair.RepairSessionResult;
import org.apache.cassandra.repair.messages.FailSession;
@@ -77,7 +77,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
return new RepairSessionResult(coordinator.sessionID, "ks", coordinator.ranges, null, false);
}
- private static void assertMessageSent(InstrumentedCoordinatorSession coordinator, InetAddress participant, RepairMessage expected)
+ private static void assertMessageSent(InstrumentedCoordinatorSession coordinator, InetAddressAndPort participant, RepairMessage expected)
{
Assert.assertTrue(coordinator.sentMessages.containsKey(participant));
Assert.assertEquals(1, coordinator.sentMessages.get(participant).size());
@@ -91,9 +91,9 @@ public class CoordinatorSessionTest extends AbstractRepairTest
super(builder);
}
- Map<InetAddress, List<RepairMessage>> sentMessages = new HashMap<>();
+ Map<InetAddressAndPort, List<RepairMessage>> sentMessages = new HashMap<>();
- protected void sendMessage(InetAddress destination, RepairMessage message)
+ protected void sendMessage(InetAddressAndPort destination, RepairMessage message)
{
if (!sentMessages.containsKey(destination))
{
@@ -189,7 +189,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
coordinator.fail();
Assert.assertEquals(FAILED, coordinator.getState());
- for (InetAddress participant : PARTICIPANTS)
+ for (InetAddressAndPort participant : PARTICIPANTS)
{
assertMessageSent(coordinator, participant, new FailSession(coordinator.sessionID));
}
@@ -221,7 +221,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
Assert.assertTrue(coordinator.sentMessages.isEmpty());
ListenableFuture sessionResult = coordinator.execute(sessionSupplier, hasFailures);
- for (InetAddress participant : PARTICIPANTS)
+ for (InetAddressAndPort participant : PARTICIPANTS)
{
RepairMessage expected = new PrepareConsistentRequest(coordinator.sessionID, COORDINATOR, new HashSet<>(PARTICIPANTS));
@@ -254,7 +254,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
repairFuture.set(results);
// propose messages should have been sent once all repair sessions completed successfully
- for (InetAddress participant : PARTICIPANTS)
+ for (InetAddressAndPort participant : PARTICIPANTS)
{
RepairMessage expected = new FinalizePropose(coordinator.sessionID);
assertMessageSent(coordinator, participant, expected);
@@ -277,7 +277,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
Assert.assertTrue(coordinator.finalizeCommitCalled);
Assert.assertEquals(ConsistentSession.State.FINALIZED, coordinator.getState());
- for (InetAddress participant : PARTICIPANTS)
+ for (InetAddressAndPort participant : PARTICIPANTS)
{
RepairMessage expected = new FinalizeCommit(coordinator.sessionID);
assertMessageSent(coordinator, participant, expected);
@@ -304,7 +304,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
Assert.assertFalse(repairSubmitted.get());
Assert.assertTrue(coordinator.sentMessages.isEmpty());
ListenableFuture sessionResult = coordinator.execute(sessionSupplier, hasFailures);
- for (InetAddress participant : PARTICIPANTS)
+ for (InetAddressAndPort participant : PARTICIPANTS)
{
PrepareConsistentRequest expected = new PrepareConsistentRequest(coordinator.sessionID, COORDINATOR, new HashSet<>(PARTICIPANTS));
assertMessageSent(coordinator, participant, expected);
@@ -339,7 +339,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
Assert.assertTrue(coordinator.failCalled);
// all participants should have been notified of session failure
- for (InetAddress participant : PARTICIPANTS)
+ for (InetAddressAndPort participant : PARTICIPANTS)
{
RepairMessage expected = new FailSession(coordinator.sessionID);
assertMessageSent(coordinator, participant, expected);
@@ -366,7 +366,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
Assert.assertFalse(repairSubmitted.get());
Assert.assertTrue(coordinator.sentMessages.isEmpty());
ListenableFuture sessionResult = coordinator.execute(sessionSupplier, hasFailures);
- for (InetAddress participant : PARTICIPANTS)
+ for (InetAddressAndPort participant : PARTICIPANTS)
{
PrepareConsistentRequest expected = new PrepareConsistentRequest(coordinator.sessionID, COORDINATOR, new HashSet<>(PARTICIPANTS));
assertMessageSent(coordinator, participant, expected);
@@ -394,7 +394,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
Assert.assertFalse(repairSubmitted.get());
// all participants should have been notified of session failure
- for (InetAddress participant : PARTICIPANTS)
+ for (InetAddressAndPort participant : PARTICIPANTS)
{
RepairMessage expected = new FailSession(coordinator.sessionID);
assertMessageSent(coordinator, participant, expected);
@@ -422,7 +422,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
Assert.assertTrue(coordinator.sentMessages.isEmpty());
ListenableFuture sessionResult = coordinator.execute(sessionSupplier, hasFailures);
- for (InetAddress participant : PARTICIPANTS)
+ for (InetAddressAndPort participant : PARTICIPANTS)
{
RepairMessage expected = new PrepareConsistentRequest(coordinator.sessionID, COORDINATOR, new HashSet<>(PARTICIPANTS));
@@ -455,7 +455,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
repairFuture.set(results);
// propose messages should have been sent once all repair sessions completed successfully
- for (InetAddress participant : PARTICIPANTS)
+ for (InetAddressAndPort participant : PARTICIPANTS)
{
RepairMessage expected = new FinalizePropose(coordinator.sessionID);
assertMessageSent(coordinator, participant, expected);
@@ -481,7 +481,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
Assert.assertEquals(ConsistentSession.State.FAILED, coordinator.getState());
// failure messages should have been sent to all participants
- for (InetAddress participant : PARTICIPANTS)
+ for (InetAddressAndPort participant : PARTICIPANTS)
{
RepairMessage expected = new FailSession(coordinator.sessionID);
assertMessageSent(coordinator, participant, expected);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java
index b40e185..9bf4270 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.repair.consistent;
-import java.net.InetAddress;
import java.util.Set;
import java.util.UUID;
@@ -29,6 +28,7 @@ import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.repair.AbstractRepairTest;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.Schema;
@@ -55,9 +55,9 @@ public class CoordinatorSessionsTest extends AbstractRepairTest
}
int prepareResponseCalls = 0;
- InetAddress preparePeer = null;
+ InetAddressAndPort preparePeer = null;
boolean prepareSuccess = false;
- public synchronized void handlePrepareResponse(InetAddress participant, boolean success)
+ public synchronized void handlePrepareResponse(InetAddressAndPort participant, boolean success)
{
prepareResponseCalls++;
preparePeer = participant;
@@ -65,9 +65,9 @@ public class CoordinatorSessionsTest extends AbstractRepairTest
}
int finalizePromiseCalls = 0;
- InetAddress promisePeer = null;
+ InetAddressAndPort promisePeer = null;
boolean promiseSuccess = false;
- public synchronized void handleFinalizePromise(InetAddress participant, boolean success)
+ public synchronized void handleFinalizePromise(InetAddressAndPort participant, boolean success)
{
finalizePromiseCalls++;
promisePeer = participant;
@@ -93,7 +93,7 @@ public class CoordinatorSessionsTest extends AbstractRepairTest
return (InstrumentedCoordinatorSession) super.getSession(sessionId);
}
- public InstrumentedCoordinatorSession registerSession(UUID sessionId, Set<InetAddress> peers)
+ public InstrumentedCoordinatorSession registerSession(UUID sessionId, Set<InetAddressAndPort> peers)
{
return (InstrumentedCoordinatorSession) super.registerSession(sessionId, peers);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java
index 6808efe..3ea888d 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java
@@ -18,10 +18,10 @@
package org.apache.cassandra.repair.consistent;
-import java.net.InetAddress;
import java.util.Set;
import java.util.UUID;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.service.ActiveRepairService;
/**
@@ -36,7 +36,7 @@ public class LocalSessionAccessor
ARS.consistent.local.start();
}
- public static void prepareUnsafe(UUID sessionID, InetAddress coordinator, Set<InetAddress> peers)
+ public static void prepareUnsafe(UUID sessionID, InetAddressAndPort coordinator, Set<InetAddressAndPort> peers)
{
ActiveRepairService.ParentRepairSession prs = ARS.getParentRepairSession(sessionID);
assert prs != null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
index 6e6d222..5fa43a9 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.repair.consistent;
-import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -42,6 +41,7 @@ import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.cql3.statements.CreateTableStatement;
import org.apache.cassandra.repair.AbstractRepairTest;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaConstants;
@@ -106,20 +106,20 @@ public class LocalSessionTest extends AbstractRepairTest
}
}
- private static void assertNoMessagesSent(InstrumentedLocalSessions sessions, InetAddress to)
+ private static void assertNoMessagesSent(InstrumentedLocalSessions sessions, InetAddressAndPort to)
{
Assert.assertNull(sessions.sentMessages.get(to));
}
- private static void assertMessagesSent(InstrumentedLocalSessions sessions, InetAddress to, RepairMessage... expected)
+ private static void assertMessagesSent(InstrumentedLocalSessions sessions, InetAddressAndPort to, RepairMessage... expected)
{
Assert.assertEquals(Lists.newArrayList(expected), sessions.sentMessages.get(to));
}
static class InstrumentedLocalSessions extends LocalSessions
{
- Map<InetAddress, List<RepairMessage>> sentMessages = new HashMap<>();
- protected void sendMessage(InetAddress destination, RepairMessage message)
+ Map<InetAddressAndPort, List<RepairMessage>> sentMessages = new HashMap<>();
+ protected void sendMessage(InetAddressAndPort destination, RepairMessage message)
{
if (!sentMessages.containsKey(destination))
{
@@ -159,12 +159,13 @@ public class LocalSessionTest extends AbstractRepairTest
return getSession(sessionID);
}
- protected InetAddress getBroadcastAddress()
+ @Override
+ protected InetAddressAndPort getBroadcastAddressAndPort()
{
return PARTICIPANT1;
}
- protected boolean isAlive(InetAddress address)
+ protected boolean isAlive(InetAddressAndPort address)
{
return true;
}
@@ -811,7 +812,7 @@ public class LocalSessionTest extends AbstractRepairTest
sessions.start();
Assert.assertNotNull(sessions.getSession(session.sessionID));
- QueryProcessor.instance.executeInternal("DELETE participants FROM system.repairs WHERE parent_id=?", session.sessionID);
+ QueryProcessor.instance.executeInternal("DELETE participants, participants_wp FROM system.repairs WHERE parent_id=?", session.sessionID);
sessions = new LocalSessions();
sessions.start();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java b/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java
index 5aeab3e..213cdd3 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.repair.consistent;
-import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -43,6 +42,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
@@ -148,7 +148,7 @@ public class PendingAntiCompactionTest
// create a session so the anti compaction can fine it
UUID sessionID = UUIDGen.getTimeUUID();
- ActiveRepairService.instance.registerParentRepairSession(sessionID, InetAddress.getLocalHost(), Lists.newArrayList(cfs), ranges, true, 1, true, PreviewKind.NONE);
+ ActiveRepairService.instance.registerParentRepairSession(sessionID, InetAddressAndPort.getLocalHost(), Lists.newArrayList(cfs), ranges, true, 1, true, PreviewKind.NONE);
PendingAntiCompaction pac;
ExecutorService executor = Executors.newSingleThreadExecutor();
@@ -352,7 +352,7 @@ public class PendingAntiCompactionTest
PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
UUID sessionID = UUIDGen.getTimeUUID();
ActiveRepairService.instance.registerParentRepairSession(sessionID,
- InetAddress.getByName("127.0.0.1"),
+ InetAddressAndPort.getByName("127.0.0.1"),
Lists.newArrayList(cfs),
FULL_RANGE,
true,0,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org