You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/01/25 20:12:03 UTC
[06/19] cassandra git commit: Allow storage port to be configurable
per node
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
index 84a4c32..52fc6ac 100644
--- a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
+++ b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
@@ -19,10 +19,14 @@ package org.apache.cassandra.utils;
import java.net.InetAddress;
import java.nio.ByteBuffer;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
import java.util.*;
+import java.util.stream.Collectors;
import com.datastax.driver.core.*;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.*;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.ColumnMetadata.ClusteringOrder;
@@ -34,33 +38,46 @@ import org.apache.cassandra.dht.Token.TokenFactory;
import org.apache.cassandra.io.sstable.SSTableLoader;
import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.CQLTypeParser;
+import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.schema.Types;
+
public class NativeSSTableLoaderClient extends SSTableLoader.Client
{
protected final Map<String, TableMetadataRef> tables;
- private final Collection<InetAddress> hosts;
+ private final Collection<InetSocketAddress> hosts;
private final int port;
+ private final int storagePort;
private final AuthProvider authProvider;
private final SSLOptions sslOptions;
+ private final boolean allowServerPortDiscovery;
- public NativeSSTableLoaderClient(Collection<InetAddress> hosts, int port, String username, String password, SSLOptions sslOptions)
+ public NativeSSTableLoaderClient(Collection<InetSocketAddress> hosts, int nativePort, int storagePort, String username, String password, SSLOptions sslOptions, boolean allowServerPortDiscovery)
{
- this(hosts, port, new PlainTextAuthProvider(username, password), sslOptions);
+ this(hosts, nativePort, storagePort, new PlainTextAuthProvider(username, password), sslOptions, allowServerPortDiscovery);
}
- public NativeSSTableLoaderClient(Collection<InetAddress> hosts, int port, AuthProvider authProvider, SSLOptions sslOptions)
+ public NativeSSTableLoaderClient(Collection<InetSocketAddress> hosts, int nativePort, int storagePort, AuthProvider authProvider, SSLOptions sslOptions, boolean allowServerPortDiscovery)
{
super();
this.tables = new HashMap<>();
this.hosts = hosts;
- this.port = port;
+ this.port = nativePort;
this.authProvider = authProvider;
this.sslOptions = sslOptions;
+ this.allowServerPortDiscovery = allowServerPortDiscovery;
+ this.storagePort = storagePort;
}
public void init(String keyspace)
{
- Cluster.Builder builder = Cluster.builder().addContactPoints(hosts).withPort(port);
+ Set<InetAddress> hostAddresses = hosts.stream().map(host -> host.getAddress()).collect(Collectors.toSet());
+ Cluster.Builder builder = Cluster.builder().addContactPoints(hostAddresses).withPort(port).allowBetaProtocolVersion();
+
+ if (allowServerPortDiscovery)
+ builder = builder.allowServerPortDiscovery();
+
if (sslOptions != null)
builder.withSSL(sslOptions);
if (authProvider != null)
@@ -82,7 +99,18 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client
Range<Token> range = new Range<>(tokenFactory.fromString(tokenRange.getStart().getValue().toString()),
tokenFactory.fromString(tokenRange.getEnd().getValue().toString()));
for (Host endpoint : endpoints)
- addRangeForEndpoint(range, endpoint.getAddress());
+ {
+ int portToUse;
+ if (allowServerPortDiscovery)
+ {
+ portToUse = endpoint.getBroadcastAddressOptPort().portOrElse(storagePort);
+ }
+ else
+ {
+ portToUse = storagePort;
+ }
+ addRangeForEndpoint(range, InetAddressAndPort.getByNameOverrideDefaults(endpoint.getAddress().getHostAddress(), portToUse));
+ }
}
Types types = fetchTypes(keyspace, session);
@@ -91,6 +119,10 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client
// We only need the TableMetadata for the views, so we only load that.
tables.putAll(fetchViews(keyspace, session, partitioner, types));
}
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
}
public TableMetadataRef getTableMetadata(String tableName)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/utils/UUIDGen.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/UUIDGen.java b/src/java/org/apache/cassandra/utils/UUIDGen.java
index 94b5832..19a0f83 100644
--- a/src/java/org/apache/cassandra/utils/UUIDGen.java
+++ b/src/java/org/apache/cassandra/utils/UUIDGen.java
@@ -356,7 +356,7 @@ public class UUIDGen
* The spec says that one option is to take as many source that identify
* this node as possible and hash them together. That's what we do here by
* gathering all the ip of this host.
- * Note that FBUtilities.getBroadcastAddress() should be enough to uniquely
+ * Note that FBUtilities.getJustBroadcastAddress() should be enough to uniquely
* identify the node *in the cluster* but it triggers DatabaseDescriptor
* instanciation and the UUID generator is used in Stress for instance,
* where we don't want to require the yaml.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index ead2a88..640f9b3 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -17,6 +17,7 @@ hints_directory: build/test/cassandra/hints
partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner
listen_address: 127.0.0.1
storage_port: 7010
+ssl_storage_port: 7011
start_native_transport: true
native_transport_port: 9042
column_index_size_in_kb: 4
@@ -27,7 +28,7 @@ disk_access_mode: mmap
seed_provider:
- class_name: org.apache.cassandra.locator.SimpleSeedProvider
parameters:
- - seeds: "127.0.0.1"
+ - seeds: "127.0.0.1:7010"
endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch
dynamic_snitch: true
server_encryption_options:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/data/serialization/4.0/gms.EndpointState.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/4.0/gms.EndpointState.bin b/test/data/serialization/4.0/gms.EndpointState.bin
index fb7d168..17fc088 100644
Binary files a/test/data/serialization/4.0/gms.EndpointState.bin and b/test/data/serialization/4.0/gms.EndpointState.bin differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/data/serialization/4.0/gms.Gossip.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/4.0/gms.Gossip.bin b/test/data/serialization/4.0/gms.Gossip.bin
index af5ac57..2fbd5d4 100644
Binary files a/test/data/serialization/4.0/gms.Gossip.bin and b/test/data/serialization/4.0/gms.Gossip.bin differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/data/serialization/4.0/service.SyncComplete.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/4.0/service.SyncComplete.bin b/test/data/serialization/4.0/service.SyncComplete.bin
index ba84349..15cccb8 100644
Binary files a/test/data/serialization/4.0/service.SyncComplete.bin and b/test/data/serialization/4.0/service.SyncComplete.bin differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/data/serialization/4.0/service.SyncRequest.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/4.0/service.SyncRequest.bin b/test/data/serialization/4.0/service.SyncRequest.bin
index 6d688a4..f4eb532 100644
Binary files a/test/data/serialization/4.0/service.SyncRequest.bin and b/test/data/serialization/4.0/service.SyncRequest.bin differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/data/serialization/4.0/service.ValidationComplete.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/4.0/service.ValidationComplete.bin b/test/data/serialization/4.0/service.ValidationComplete.bin
index 7433d64..edc90b3 100644
Binary files a/test/data/serialization/4.0/service.ValidationComplete.bin and b/test/data/serialization/4.0/service.ValidationComplete.bin differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/data/serialization/4.0/service.ValidationRequest.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/4.0/service.ValidationRequest.bin b/test/data/serialization/4.0/service.ValidationRequest.bin
index a00763b..e45eb70 100644
Binary files a/test/data/serialization/4.0/service.ValidationRequest.bin and b/test/data/serialization/4.0/service.ValidationRequest.bin differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/long/org/apache/cassandra/locator/DynamicEndpointSnitchLongTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/locator/DynamicEndpointSnitchLongTest.java b/test/long/org/apache/cassandra/locator/DynamicEndpointSnitchLongTest.java
index 35bf5b4..a5025a3 100644
--- a/test/long/org/apache/cassandra/locator/DynamicEndpointSnitchLongTest.java
+++ b/test/long/org/apache/cassandra/locator/DynamicEndpointSnitchLongTest.java
@@ -20,7 +20,6 @@
package org.apache.cassandra.locator;
import java.io.IOException;
-import java.net.InetAddress;
import java.util.*;
import org.junit.Test;
@@ -53,19 +52,19 @@ public class DynamicEndpointSnitchLongTest
StorageService.instance.unsafeInitialize();
SimpleSnitch ss = new SimpleSnitch();
DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(ss, String.valueOf(ss.hashCode()));
- InetAddress self = FBUtilities.getBroadcastAddress();
+ InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
- List<InetAddress> hosts = new ArrayList<>();
+ List<InetAddressAndPort> hosts = new ArrayList<>();
// We want a big list of hosts so sorting takes time, making it much more likely to reproduce the
// problem we're looking for.
for (int i = 0; i < 100; i++)
for (int j = 0; j < 256; j++)
- hosts.add(InetAddress.getByAddress(new byte[]{127, 0, (byte)i, (byte)j}));
+ hosts.add(InetAddressAndPort.getByAddress(new byte[]{ 127, 0, (byte)i, (byte)j}));
ScoreUpdater updater = new ScoreUpdater(dsnitch, hosts);
updater.start();
- List<InetAddress> result = null;
+ List<InetAddressAndPort> result = null;
for (int i = 0; i < ITERATIONS; i++)
result = dsnitch.getSortedListByProximity(self, hosts);
@@ -85,10 +84,10 @@ public class DynamicEndpointSnitchLongTest
public volatile boolean stopped;
private final DynamicEndpointSnitch dsnitch;
- private final List<InetAddress> hosts;
+ private final List<InetAddressAndPort> hosts;
private final Random random = new Random();
- public ScoreUpdater(DynamicEndpointSnitch dsnitch, List<InetAddress> hosts)
+ public ScoreUpdater(DynamicEndpointSnitch dsnitch, List<InetAddressAndPort> hosts)
{
this.dsnitch = dsnitch;
this.hosts = hosts;
@@ -98,7 +97,7 @@ public class DynamicEndpointSnitchLongTest
{
while (!stopped)
{
- InetAddress host = hosts.get(random.nextInt(hosts.size()));
+ InetAddressAndPort host = hosts.get(random.nextInt(hosts.size()));
int score = random.nextInt(SCORE_RANGE);
dsnitch.receiveTiming(host, score);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
index 799ac77..bd7ef20 100644
--- a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
+++ b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
@@ -122,7 +122,7 @@ public class LongStreamingTest
public void init(String keyspace)
{
for (Range<Token> range : StorageService.instance.getLocalRanges(KS))
- addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
+ addRangeForEndpoint(range, FBUtilities.getBroadcastAddressAndPort());
this.ks = keyspace;
}
@@ -149,7 +149,7 @@ public class LongStreamingTest
public void init(String keyspace)
{
for (Range<Token> range : StorageService.instance.getLocalRanges(KS))
- addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
+ addRangeForEndpoint(range, FBUtilities.getBroadcastAddressAndPort());
this.ks = keyspace;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java
----------------------------------------------------------------------
diff --git a/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java b/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java
index 9ec1aa6..68cfd7e 100644
--- a/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java
+++ b/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java
@@ -25,11 +25,11 @@ import com.google.common.collect.Multimap;
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.PendingRangeMaps;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;
-import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.HashSet;
@@ -50,7 +50,7 @@ public class PendingRangesBench
PendingRangeMaps pendingRangeMaps;
int maxToken = 256 * 100;
- Multimap<Range<Token>, InetAddress> oldPendingRanges;
+ Multimap<Range<Token>, InetAddressAndPort> oldPendingRanges;
private Range<Token> genRange(String left, String right)
{
@@ -63,7 +63,7 @@ public class PendingRangesBench
pendingRangeMaps = new PendingRangeMaps();
oldPendingRanges = HashMultimap.create();
- InetAddress[] addresses = {InetAddress.getByName("127.0.0.1"), InetAddress.getByName("127.0.0.2")};
+ InetAddressAndPort[] addresses = { InetAddressAndPort.getByName("127.0.0.1"), InetAddressAndPort.getByName("127.0.0.2")};
for (int i = 0; i < maxToken; i++)
{
@@ -97,8 +97,8 @@ public class PendingRangesBench
{
int randomToken = ThreadLocalRandom.current().nextInt(maxToken * 10 + 5);
Token searchToken = new RandomPartitioner.BigIntegerToken(Integer.toString(randomToken));
- Set<InetAddress> endpoints = new HashSet<>();
- for (Map.Entry<Range<Token>, Collection<InetAddress>> entry : oldPendingRanges.asMap().entrySet())
+ Set<InetAddressAndPort> endpoints = new HashSet<>();
+ for (Map.Entry<Range<Token>, Collection<InetAddressAndPort>> entry : oldPendingRanges.asMap().entrySet())
{
if (entry.getKey().contains(searchToken))
endpoints.addAll(entry.getValue());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java b/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java
index 0e4a3cf..d06caba 100644
--- a/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java
+++ b/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java
@@ -20,8 +20,15 @@ package org.apache.cassandra;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.YamlConfigurationLoader;
import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.locator.InetAddressAndPort;
import java.io.File;
+import java.net.Inet6Address;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+import com.google.common.base.Joiner;
public class OffsetAwareConfigurationLoader extends YamlConfigurationLoader
@@ -52,6 +59,31 @@ public class OffsetAwareConfigurationLoader extends YamlConfigurationLoader
config.native_transport_port += offset;
config.storage_port += offset;
+ //Rewrite the seed ports string
+ String[] hosts = config.seed_provider.parameters.get("seeds").split(",", -1);
+ String rewrittenSeeds = Joiner.on(", ").join(Arrays.stream(hosts).map(host -> {
+ StringBuilder sb = new StringBuilder();
+ try
+ {
+ InetAddressAndPort address = InetAddressAndPort.getByName(host.trim());
+ if (address.address instanceof Inet6Address)
+ {
+ sb.append('[').append(address.address.getHostAddress()).append(']');
+ }
+ else
+ {
+ sb.append(address.address.getHostAddress());
+ }
+ sb.append(':').append(address.port + offset);
+ return sb.toString();
+ }
+ catch (UnknownHostException e)
+ {
+ throw new ConfigurationException("Error in OffsetAwareConfigurationLoader reworking seed list", e);
+ }
+ }).collect(Collectors.toList()));
+ config.seed_provider.parameters.put("seeds", rewrittenSeeds);
+
config.commitlog_directory += sep + offset;
config.saved_caches_directory += sep + offset;
config.hints_directory += sep + offset;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 7e62c41..1201efa 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -22,7 +22,6 @@ package org.apache.cassandra;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOError;
-import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.*;
@@ -39,6 +38,7 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
@@ -192,7 +192,7 @@ public class Util
* Creates initial set of nodes and tokens. Nodes are added to StorageService as 'normal'
*/
public static void createInitialRing(StorageService ss, IPartitioner partitioner, List<Token> endpointTokens,
- List<Token> keyTokens, List<InetAddress> hosts, List<UUID> hostIds, int howMany)
+ List<Token> keyTokens, List<InetAddressAndPort> hosts, List<UUID> hostIds, int howMany)
throws UnknownHostException
{
// Expand pool of host IDs as necessary
@@ -210,10 +210,13 @@ public class Util
for (int i=0; i<endpointTokens.size(); i++)
{
- InetAddress ep = InetAddress.getByName("127.0.0." + String.valueOf(i + 1));
+ InetAddressAndPort ep = InetAddressAndPort.getByName("127.0.0." + String.valueOf(i + 1));
Gossiper.instance.initializeNodeUnsafe(ep, hostIds.get(i), 1);
Gossiper.instance.injectApplicationState(ep, ApplicationState.TOKENS, new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(endpointTokens.get(i))));
ss.onChange(ep,
+ ApplicationState.STATUS_WITH_PORT,
+ new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(endpointTokens.get(i))));
+ ss.onChange(ep,
ApplicationState.STATUS,
new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(endpointTokens.get(i))));
hosts.add(ep);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java b/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java
index 7db1cfa..41564d9 100644
--- a/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java
+++ b/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java
@@ -17,11 +17,9 @@
*/
package org.apache.cassandra.batchlog;
-import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import com.google.common.collect.ImmutableMultimap;
@@ -29,6 +27,8 @@ import com.google.common.collect.Multimap;
import org.junit.Test;
import org.junit.matchers.JUnitMatchers;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
@@ -39,89 +39,89 @@ public class BatchlogEndpointFilterTest
@Test
public void shouldSelect2hostsFromNonLocalRacks() throws UnknownHostException
{
- Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
- .put(LOCAL, InetAddress.getByName("0"))
- .put(LOCAL, InetAddress.getByName("00"))
- .put("1", InetAddress.getByName("1"))
- .put("1", InetAddress.getByName("11"))
- .put("2", InetAddress.getByName("2"))
- .put("2", InetAddress.getByName("22"))
+ Multimap<String, InetAddressAndPort> endpoints = ImmutableMultimap.<String, InetAddressAndPort> builder()
+ .put(LOCAL, InetAddressAndPort.getByName("0"))
+ .put(LOCAL, InetAddressAndPort.getByName("00"))
+ .put("1", InetAddressAndPort.getByName("1"))
+ .put("1", InetAddressAndPort.getByName("11"))
+ .put("2", InetAddressAndPort.getByName("2"))
+ .put("2", InetAddressAndPort.getByName("22"))
.build();
- Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+ Collection<InetAddressAndPort> result = new TestEndpointFilter(LOCAL, endpoints).filter();
assertThat(result.size(), is(2));
- assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("11")));
- assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("22")));
+ assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("11")));
+ assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("22")));
}
@Test
public void shouldSelectHostFromLocal() throws UnknownHostException
{
- Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
- .put(LOCAL, InetAddress.getByName("0"))
- .put(LOCAL, InetAddress.getByName("00"))
- .put("1", InetAddress.getByName("1"))
+ Multimap<String, InetAddressAndPort> endpoints = ImmutableMultimap.<String, InetAddressAndPort> builder()
+ .put(LOCAL, InetAddressAndPort.getByName("0"))
+ .put(LOCAL, InetAddressAndPort.getByName("00"))
+ .put("1", InetAddressAndPort.getByName("1"))
.build();
- Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+ Collection<InetAddressAndPort> result = new TestEndpointFilter(LOCAL, endpoints).filter();
assertThat(result.size(), is(2));
- assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("1")));
- assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0")));
+ assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("1")));
+ assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("0")));
}
@Test
public void shouldReturnAsIsIfNoEnoughEndpoints() throws UnknownHostException
{
- Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
- .put(LOCAL, InetAddress.getByName("0"))
+ Multimap<String, InetAddressAndPort> endpoints = ImmutableMultimap.<String, InetAddressAndPort> builder()
+ .put(LOCAL, InetAddressAndPort.getByName("0"))
.build();
- Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+ Collection<InetAddressAndPort> result = new TestEndpointFilter(LOCAL, endpoints).filter();
assertThat(result.size(), is(1));
- assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0")));
+ assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("0")));
}
@Test
public void shouldSelectTwoRandomHostsFromSingleOtherRack() throws UnknownHostException
{
- Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
- .put(LOCAL, InetAddress.getByName("0"))
- .put(LOCAL, InetAddress.getByName("00"))
- .put("1", InetAddress.getByName("1"))
- .put("1", InetAddress.getByName("11"))
- .put("1", InetAddress.getByName("111"))
+ Multimap<String, InetAddressAndPort> endpoints = ImmutableMultimap.<String, InetAddressAndPort> builder()
+ .put(LOCAL, InetAddressAndPort.getByName("0"))
+ .put(LOCAL, InetAddressAndPort.getByName("00"))
+ .put("1", InetAddressAndPort.getByName("1"))
+ .put("1", InetAddressAndPort.getByName("11"))
+ .put("1", InetAddressAndPort.getByName("111"))
.build();
- Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+ Collection<InetAddressAndPort> result = new TestEndpointFilter(LOCAL, endpoints).filter();
// result should be the last two non-local replicas
// (Collections.shuffle has been replaced with Collections.reverse for testing)
assertThat(result.size(), is(2));
- assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("11")));
- assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("111")));
+ assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("11")));
+ assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("111")));
}
@Test
public void shouldSelectTwoRandomHostsFromSingleRack() throws UnknownHostException
{
- Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
- .put(LOCAL, InetAddress.getByName("1"))
- .put(LOCAL, InetAddress.getByName("11"))
- .put(LOCAL, InetAddress.getByName("111"))
- .put(LOCAL, InetAddress.getByName("1111"))
+ Multimap<String, InetAddressAndPort> endpoints = ImmutableMultimap.<String, InetAddressAndPort> builder()
+ .put(LOCAL, InetAddressAndPort.getByName("1"))
+ .put(LOCAL, InetAddressAndPort.getByName("11"))
+ .put(LOCAL, InetAddressAndPort.getByName("111"))
+ .put(LOCAL, InetAddressAndPort.getByName("1111"))
.build();
- Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+ Collection<InetAddressAndPort> result = new TestEndpointFilter(LOCAL, endpoints).filter();
// result should be the last two non-local replicas
// (Collections.shuffle has been replaced with Collections.reverse for testing)
assertThat(result.size(), is(2));
- assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("111")));
- assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("1111")));
+ assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("111")));
+ assertThat(result, JUnitMatchers.hasItem(InetAddressAndPort.getByName("1111")));
}
private static class TestEndpointFilter extends BatchlogManager.EndpointFilter
{
- TestEndpointFilter(String localRack, Multimap<String, InetAddress> endpoints)
+ TestEndpointFilter(String localRack, Multimap<String, InetAddressAndPort> endpoints)
{
super(localRack, endpoints);
}
@Override
- protected boolean isValid(InetAddress input)
+ protected boolean isValid(InetAddressAndPort input)
{
// We will use always alive non-localhost endpoints
return true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
index 34902fe..33fb209 100644
--- a/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
+++ b/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.batchlog;
import java.io.IOException;
-import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.ExecutionException;
@@ -29,6 +28,7 @@ import org.junit.*;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.Util.PartitionerSwitcher;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.schema.Schema;
@@ -94,7 +94,7 @@ public class BatchlogManagerTest
public void setUp() throws Exception
{
TokenMetadata metadata = StorageService.instance.getTokenMetadata();
- InetAddress localhost = InetAddress.getByName("127.0.0.1");
+ InetAddressAndPort localhost = InetAddressAndPort.getByName("127.0.0.1");
metadata.updateNormalToken(Util.token("A"), localhost);
metadata.updateHostId(UUIDGen.getTimeUUID(), localhost);
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).truncateBlocking();
@@ -344,7 +344,7 @@ public class BatchlogManagerTest
@Test
public void testReplayWithNoPeers() throws Exception
{
- StorageService.instance.getTokenMetadata().removeEndpoint(InetAddress.getByName("127.0.0.1"));
+ StorageService.instance.getTokenMetadata().removeEndpoint(InetAddressAndPort.getByName("127.0.0.1"));
long initialAllBatches = BatchlogManager.instance.countAllBatches();
long initialReplayedBatches = BatchlogManager.instance.getTotalBatchesReplayed();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index b50a050..589afd5 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -77,6 +77,7 @@ public class DatabaseDescriptorRefTest
"org.apache.cassandra.config.EncryptionOptions$ClientEncryptionOptions",
"org.apache.cassandra.config.EncryptionOptions$ServerEncryptionOptions",
"org.apache.cassandra.config.EncryptionOptions$ServerEncryptionOptions$InternodeEncryption",
+ "org.apache.cassandra.config.EncryptionOptions$ServerEncryptionOptions$OutgoingEncryptedPortSource",
"org.apache.cassandra.config.YamlConfigurationLoader",
"org.apache.cassandra.config.YamlConfigurationLoader$PropertiesChecker",
"org.apache.cassandra.config.YamlConfigurationLoader$PropertiesChecker$1",
@@ -126,6 +127,7 @@ public class DatabaseDescriptorRefTest
"org.apache.cassandra.config.EncryptionOptions$ServerEncryptionOptionsCustomizer",
"org.apache.cassandra.ConsoleAppenderBeanInfo",
"org.apache.cassandra.ConsoleAppenderCustomizer",
+ "org.apache.cassandra.locator.InetAddressAndPort"
};
static final Set<String> checkedClasses = new HashSet<>(Arrays.asList(validClasses));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 6993a65..69d2fb5 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -48,6 +48,7 @@ import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.index.SecondaryIndexManager;
import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.*;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.functions.FunctionName;
@@ -144,9 +145,9 @@ public abstract class CQLTester
// Register an EndpointSnitch which returns fixed values for test.
DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch()
{
- @Override public String getRack(InetAddress endpoint) { return RACK1; }
- @Override public String getDatacenter(InetAddress endpoint) { return DATA_CENTER; }
- @Override public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2) { return 0; }
+ @Override public String getRack(InetAddressAndPort endpoint) { return RACK1; }
+ @Override public String getDatacenter(InetAddressAndPort endpoint) { return DATA_CENTER; }
+ @Override public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2) { return 0; }
});
try
@@ -829,6 +830,11 @@ public abstract class CQLTester
return sessionNet(protocolVersion).execute(formatQuery(query), values);
}
+ protected com.datastax.driver.core.ResultSet executeNetWithPaging(ProtocolVersion version, String query, int pageSize) throws Throwable
+ {
+ return sessionNet(version).execute(new SimpleStatement(formatQuery(query)).setFetchSize(pageSize));
+ }
+
protected com.datastax.driver.core.ResultSet executeNetWithPaging(String query, int pageSize) throws Throwable
{
return sessionNet().execute(new SimpleStatement(formatQuery(query)).setFetchSize(pageSize));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
index 0a314da..ce5de62 100644
--- a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
@@ -223,7 +223,6 @@ public class PreparedStatementsTest extends CQLTester
.withClusterName("Test Cluster")
.withPort(nativePort)
.withoutJMXReporting()
- .allowBetaProtocolVersion()
.build())
{
try (Session newSession = newCluster.connect())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/cql3/ViewComplexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/ViewComplexTest.java b/test/unit/org/apache/cassandra/cql3/ViewComplexTest.java
index 26b9d65..665bc44 100644
--- a/test/unit/org/apache/cassandra/cql3/ViewComplexTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ViewComplexTest.java
@@ -851,10 +851,10 @@ public class ViewComplexTest extends CQLTester
for (String view : Arrays.asList("mv1", "mv2"))
{
// paging
- assertEquals(1, executeNetWithPaging(String.format("SELECT k,a,b FROM %s limit 1", view), 1).all().size());
- assertEquals(2, executeNetWithPaging(String.format("SELECT k,a,b FROM %s limit 2", view), 1).all().size());
- assertEquals(2, executeNetWithPaging(String.format("SELECT k,a,b FROM %s", view), 1).all().size());
- assertRowsNet(executeNetWithPaging(String.format("SELECT k,a,b FROM %s ", view), 1),
+ assertEquals(1, executeNetWithPaging(protocolVersion, String.format("SELECT k,a,b FROM %s limit 1", view), 1).all().size());
+ assertEquals(2, executeNetWithPaging(protocolVersion, String.format("SELECT k,a,b FROM %s limit 2", view), 1).all().size());
+ assertEquals(2, executeNetWithPaging(protocolVersion, String.format("SELECT k,a,b FROM %s", view), 1).all().size());
+ assertRowsNet(protocolVersion, executeNetWithPaging(protocolVersion, String.format("SELECT k,a,b FROM %s ", view), 1),
row(50, 50, 50),
row(100, 100, 100));
// limit
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
index ac261ca..dc90b4e 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.cql3.validation.operations;
-import java.net.InetAddress;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
@@ -26,6 +25,7 @@ import java.util.UUID;
import org.junit.Test;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.schema.Schema;
@@ -514,13 +514,13 @@ public class CreateTest extends CQLTester
DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch()
{
@Override
- public String getRack(InetAddress endpoint) { return RACK1; }
+ public String getRack(InetAddressAndPort endpoint) { return RACK1; }
@Override
- public String getDatacenter(InetAddress endpoint) { return "us-east-1"; }
+ public String getDatacenter(InetAddressAndPort endpoint) { return "us-east-1"; }
@Override
- public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2) { return 0; }
+ public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2) { return 0; }
});
execute("CREATE KEYSPACE Foo WITH replication = { 'class' : 'NetworkTopologyStrategy', 'us-east-1' : 1 };");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/CleanupTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CleanupTest.java b/test/unit/org/apache/cassandra/db/CleanupTest.java
index 044a49e..a096c78 100644
--- a/test/unit/org/apache/cassandra/db/CleanupTest.java
+++ b/test/unit/org/apache/cassandra/db/CleanupTest.java
@@ -19,7 +19,6 @@
package org.apache.cassandra.db;
import java.io.IOException;
-import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
@@ -37,6 +36,7 @@ import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.KeyspaceMetadata;
@@ -89,13 +89,13 @@ public class CleanupTest
DatabaseDescriptor.setEndpointSnitch(new AbstractNetworkTopologySnitch()
{
@Override
- public String getRack(InetAddress endpoint)
+ public String getRack(InetAddressAndPort endpoint)
{
return "RC1";
}
@Override
- public String getDatacenter(InetAddress endpoint)
+ public String getDatacenter(InetAddressAndPort endpoint)
{
return "DC1";
}
@@ -166,8 +166,8 @@ public class CleanupTest
byte[] tk1 = new byte[1], tk2 = new byte[1];
tk1[0] = 2;
tk2[0] = 1;
- tmd.updateNormalToken(new BytesToken(tk1), InetAddress.getByName("127.0.0.1"));
- tmd.updateNormalToken(new BytesToken(tk2), InetAddress.getByName("127.0.0.2"));
+ tmd.updateNormalToken(new BytesToken(tk1), InetAddressAndPort.getByName("127.0.0.1"));
+ tmd.updateNormalToken(new BytesToken(tk2), InetAddressAndPort.getByName("127.0.0.2"));
CompactionManager.instance.performCleanup(cfs, 2);
@@ -198,8 +198,8 @@ public class CleanupTest
byte[] tk1 = new byte[1], tk2 = new byte[1];
tk1[0] = 2;
tk2[0] = 1;
- tmd.updateNormalToken(new BytesToken(tk1), InetAddress.getByName("127.0.0.1"));
- tmd.updateNormalToken(new BytesToken(tk2), InetAddress.getByName("127.0.0.2"));
+ tmd.updateNormalToken(new BytesToken(tk1), InetAddressAndPort.getByName("127.0.0.1"));
+ tmd.updateNormalToken(new BytesToken(tk2), InetAddressAndPort.getByName("127.0.0.2"));
CompactionManager.instance.performCleanup(cfs, 2);
assertEquals(0, Util.getAll(Util.cmd(cfs).build()).size());
@@ -222,9 +222,9 @@ public class CleanupTest
TokenMetadata tmd = StorageService.instance.getTokenMetadata();
tmd.clearUnsafe();
- tmd.updateHostId(UUID.randomUUID(), InetAddress.getByName("127.0.0.1"));
+ tmd.updateHostId(UUID.randomUUID(), InetAddressAndPort.getByName("127.0.0.1"));
byte[] tk1 = {2};
- tmd.updateNormalToken(new BytesToken(tk1), InetAddress.getByName("127.0.0.1"));
+ tmd.updateNormalToken(new BytesToken(tk1), InetAddressAndPort.getByName("127.0.0.1"));
Keyspace keyspace = Keyspace.open(KEYSPACE2);
@@ -270,8 +270,8 @@ public class CleanupTest
byte[] tk1 = new byte[1], tk2 = new byte[1];
tk1[0] = 2;
tk2[0] = 1;
- tmd.updateNormalToken(new BytesToken(tk1), InetAddress.getByName("127.0.0.1"));
- tmd.updateNormalToken(new BytesToken(tk2), InetAddress.getByName("127.0.0.2"));
+ tmd.updateNormalToken(new BytesToken(tk1), InetAddressAndPort.getByName("127.0.0.1"));
+ tmd.updateNormalToken(new BytesToken(tk2), InetAddressAndPort.getByName("127.0.0.2"));
for(SSTableReader r: cfs.getLiveSSTables())
CompactionManager.instance.forceUserDefinedCleanup(r.getFilename());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java b/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
index c56368f..6e2a714 100644
--- a/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
@@ -19,7 +19,6 @@
package org.apache.cassandra.db;
import java.io.File;
-import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
@@ -30,6 +29,7 @@ import org.junit.Test;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.dht.BootStrapper;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
@@ -51,7 +51,7 @@ public class DiskBoundaryManagerTest extends CQLTester
{
BlacklistedDirectories.clearUnwritableUnsafe();
TokenMetadata metadata = StorageService.instance.getTokenMetadata();
- metadata.updateNormalTokens(BootStrapper.getRandomTokens(metadata, 10), FBUtilities.getBroadcastAddress());
+ metadata.updateNormalTokens(BootStrapper.getRandomTokens(metadata, 10), FBUtilities.getBroadcastAddressAndPort());
createTable("create table %s (id int primary key, x text)");
dirs = new Directories(getCurrentColumnFamilyStore().metadata(), Lists.newArrayList(new Directories.DataDirectory(new File("/tmp/1")),
new Directories.DataDirectory(new File("/tmp/2")),
@@ -86,7 +86,7 @@ public class DiskBoundaryManagerTest extends CQLTester
public void updateTokensTest() throws UnknownHostException
{
DiskBoundaries dbv1 = dbm.getDiskBoundaries(mock);
- StorageService.instance.getTokenMetadata().updateNormalTokens(BootStrapper.getRandomTokens(StorageService.instance.getTokenMetadata(), 10), InetAddress.getByName("127.0.0.10"));
+ StorageService.instance.getTokenMetadata().updateNormalTokens(BootStrapper.getRandomTokens(StorageService.instance.getTokenMetadata(), 10), InetAddressAndPort.getByName("127.0.0.10"));
DiskBoundaries dbv2 = dbm.getDiskBoundaries(mock);
assertFalse(dbv1.equals(dbv2));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/ReadCommandTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
index 64ac627..6bb0a1a 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@ -50,9 +50,11 @@ import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
+import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
@@ -312,7 +314,8 @@ public class ReadCommandTest
}
}
- public void serializerTest() throws IOException
+ @Test
+ public void testSerializer() throws IOException
{
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
@@ -324,10 +327,11 @@ public class ReadCommandTest
ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).includeRow("dd").build();
int messagingVersion = MessagingService.current_version;
- long size = ReadCommand.serializer.serializedSize(readCommand, messagingVersion);
-
FakeOutputStream out = new FakeOutputStream();
- ReadCommand.serializer.serialize(readCommand, new WrappedDataOutputStreamPlus(out), messagingVersion);
+ Tracing.instance.newSession(Tracing.TraceType.QUERY);
+ MessageOut<ReadCommand> messageOut = new MessageOut(MessagingService.Verb.READ, readCommand, ReadCommand.serializer);
+ long size = messageOut.serializedSize(messagingVersion);
+ messageOut.serialize(new WrappedDataOutputStreamPlus(out), messagingVersion);
Assert.assertEquals(size, out.count);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/RowCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowCacheTest.java b/test/unit/org/apache/cassandra/db/RowCacheTest.java
index fee3f2c..5ca1eef 100644
--- a/test/unit/org/apache/cassandra/db/RowCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.db;
-import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -32,6 +31,7 @@ import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.cache.RowCacheKey;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.db.rows.*;
@@ -300,8 +300,8 @@ public class RowCacheTest
byte[] tk1, tk2;
tk1 = "key1000".getBytes();
tk2 = "key1050".getBytes();
- tmd.updateNormalToken(new BytesToken(tk1), InetAddress.getByName("127.0.0.1"));
- tmd.updateNormalToken(new BytesToken(tk2), InetAddress.getByName("127.0.0.2"));
+ tmd.updateNormalToken(new BytesToken(tk1), InetAddressAndPort.getByName("127.0.0.1"));
+ tmd.updateNormalToken(new BytesToken(tk2), InetAddressAndPort.getByName("127.0.0.2"));
store.cleanupCache();
assertEquals(50, CacheService.instance.rowCache.size());
CacheService.instance.setRowCacheCapacityInMB(0);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator40Test.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator40Test.java b/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator40Test.java
new file mode 100644
index 0000000..1c051f5
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/SystemKeyspaceMigrator40Test.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.junit.Assert.assertEquals;
+
+public class SystemKeyspaceMigrator40Test extends CQLTester
+{
+ @Test
+ public void testMigratePeers() throws Throwable
+ {
+ String insert = String.format("INSERT INTO %s ("
+ + "peer, "
+ + "data_center, "
+ + "host_id, "
+ + "preferred_ip, "
+ + "rack, "
+ + "release_version, "
+ + "rpc_address, "
+ + "schema_version, "
+ + "tokens) "
+ + " values ( ?, ?, ? , ? , ?, ?, ?, ?, ?)",
+ SystemKeyspaceMigrator40.legacyPeersName);
+ UUID hostId = UUIDGen.getTimeUUID();
+ UUID schemaVersion = UUIDGen.getTimeUUID();
+ execute(insert,
+ InetAddress.getByName("127.0.0.1"),
+ "dcFoo",
+ hostId,
+ InetAddress.getByName("127.0.0.2"),
+ "rackFoo", "4.0",
+ InetAddress.getByName("127.0.0.3"),
+ schemaVersion,
+ ImmutableSet.of("foobar"));
+ SystemKeyspaceMigrator40.migrate();
+
+ int rowCount = 0;
+ for (UntypedResultSet.Row row : execute(String.format("SELECT * FROM %s", SystemKeyspaceMigrator40.peersName)))
+ {
+ rowCount++;
+ assertEquals(InetAddress.getByName("127.0.0.1"), row.getInetAddress("peer"));
+ assertEquals(DatabaseDescriptor.getStoragePort(), row.getInt("peer_port"));
+ assertEquals("dcFoo", row.getString("data_center"));
+ assertEquals(hostId, row.getUUID("host_id"));
+ assertEquals(InetAddress.getByName("127.0.0.2"), row.getInetAddress("preferred_ip"));
+ assertEquals(DatabaseDescriptor.getStoragePort(), row.getInt("preferred_port"));
+ assertEquals("rackFoo", row.getString("rack"));
+ assertEquals("4.0", row.getString("release_version"));
+ assertEquals(InetAddress.getByName("127.0.0.3"), row.getInetAddress("native_address"));
+ assertEquals(DatabaseDescriptor.getNativeTransportPort(), row.getInt("native_port"));
+ assertEquals(schemaVersion, row.getUUID("schema_version"));
+ assertEquals(ImmutableSet.of("foobar"), row.getSet("tokens", UTF8Type.instance));
+ }
+ assertEquals(1, rowCount);
+
+ //Test nulls/missing don't prevent the row from propagating
+ execute(String.format("TRUNCATE %s", SystemKeyspaceMigrator40.legacyPeersName));
+ execute(String.format("TRUNCATE %s", SystemKeyspaceMigrator40.peersName));
+
+ execute(String.format("INSERT INTO %s (peer) VALUES (?)", SystemKeyspaceMigrator40.legacyPeersName),
+ InetAddress.getByName("127.0.0.1"));
+ SystemKeyspaceMigrator40.migrate();
+
+ rowCount = 0;
+ for (UntypedResultSet.Row row : execute(String.format("SELECT * FROM %s", SystemKeyspaceMigrator40.peersName)))
+ {
+ rowCount++;
+ }
+ assertEquals(1, rowCount);
+ }
+
+ @Test
+ public void testMigratePeerEvents() throws Throwable
+ {
+ String insert = String.format("INSERT INTO %s ("
+ + "peer, "
+ + "hints_dropped) "
+ + " values ( ?, ? )",
+ SystemKeyspaceMigrator40.legacyPeerEventsName);
+ UUID uuid = UUIDGen.getTimeUUID();
+ execute(insert,
+ InetAddress.getByName("127.0.0.1"),
+ ImmutableMap.of(uuid, 42));
+ SystemKeyspaceMigrator40.migrate();
+
+ int rowCount = 0;
+ for (UntypedResultSet.Row row : execute(String.format("SELECT * FROM %s", SystemKeyspaceMigrator40.peerEventsName)))
+ {
+ rowCount++;
+ assertEquals(InetAddress.getByName("127.0.0.1"), row.getInetAddress("peer"));
+ assertEquals(DatabaseDescriptor.getStoragePort(), row.getInt("peer_port"));
+ assertEquals(ImmutableMap.of(uuid, 42), row.getMap("hints_dropped", UUIDType.instance, Int32Type.instance));
+ }
+ assertEquals(1, rowCount);
+
+ //Test nulls/missing don't prevent the row from propagating
+ execute(String.format("TRUNCATE %s", SystemKeyspaceMigrator40.legacyPeerEventsName));
+ execute(String.format("TRUNCATE %s", SystemKeyspaceMigrator40.peerEventsName));
+
+ execute(String.format("INSERT INTO %s (peer) VALUES (?)", SystemKeyspaceMigrator40.legacyPeerEventsName),
+ InetAddress.getByName("127.0.0.1"));
+ SystemKeyspaceMigrator40.migrate();
+
+ rowCount = 0;
+ for (UntypedResultSet.Row row : execute(String.format("SELECT * FROM %s", SystemKeyspaceMigrator40.peerEventsName)))
+ {
+ rowCount++;
+ }
+ assertEquals(1, rowCount);
+ }
+
+ @Test
+ public void testMigrateTransferredRanges() throws Throwable
+ {
+ String insert = String.format("INSERT INTO %s ("
+ + "operation, "
+ + "peer, "
+ + "keyspace_name, "
+ + "ranges) "
+ + " values ( ?, ?, ?, ? )",
+ SystemKeyspaceMigrator40.legacyTransferredRangesName);
+ execute(insert,
+ "foo",
+ InetAddress.getByName("127.0.0.1"),
+ "bar",
+ ImmutableSet.of(ByteBuffer.wrap(new byte[] { 42 })));
+ SystemKeyspaceMigrator40.migrate();
+
+ int rowCount = 0;
+ for (UntypedResultSet.Row row : execute(String.format("SELECT * FROM %s", SystemKeyspaceMigrator40.transferredRangesName)))
+ {
+ rowCount++;
+ assertEquals("foo", row.getString("operation"));
+ assertEquals(InetAddress.getByName("127.0.0.1"), row.getInetAddress("peer"));
+ assertEquals(DatabaseDescriptor.getStoragePort(), row.getInt("peer_port"));
+ assertEquals("bar", row.getString("keyspace_name"));
+ assertEquals(ImmutableSet.of(ByteBuffer.wrap(new byte[] { 42 })), row.getSet("ranges", BytesType.instance));
+ }
+ assertEquals(1, rowCount);
+
+ //Test nulls/missing don't prevent the row from propagating
+ execute(String.format("TRUNCATE %s", SystemKeyspaceMigrator40.legacyTransferredRangesName));
+ execute(String.format("TRUNCATE %s", SystemKeyspaceMigrator40.transferredRangesName));
+
+ execute(String.format("INSERT INTO %s (operation, peer, keyspace_name) VALUES (?, ?, ?)", SystemKeyspaceMigrator40.legacyTransferredRangesName),
+ "foo",
+ InetAddress.getByName("127.0.0.1"),
+ "bar");
+ SystemKeyspaceMigrator40.migrate();
+
+ rowCount = 0;
+ for (UntypedResultSet.Row row : execute(String.format("SELECT * FROM %s", SystemKeyspaceMigrator40.transferredRangesName)))
+ {
+ rowCount++;
+ }
+ assertEquals(1, rowCount);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java b/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
index d1b8ff5..3bc04c1 100644
--- a/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.db;
import java.io.IOException;
-import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
@@ -26,6 +25,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
@@ -53,7 +53,7 @@ public class SystemKeyspaceTest
public void testLocalTokens()
{
// Remove all existing tokens
- Collection<Token> current = SystemKeyspace.loadTokens().asMap().get(FBUtilities.getLocalAddress());
+ Collection<Token> current = SystemKeyspace.loadTokens().asMap().get(FBUtilities.getLocalAddressAndPort());
if (current != null && !current.isEmpty())
SystemKeyspace.updateTokens(current);
@@ -74,7 +74,7 @@ public class SystemKeyspaceTest
public void testNonLocalToken() throws UnknownHostException
{
BytesToken token = new BytesToken(ByteBufferUtil.bytes("token3"));
- InetAddress address = InetAddress.getByName("127.0.0.2");
+ InetAddressAndPort address = InetAddressAndPort.getByName("127.0.0.2");
SystemKeyspace.updateTokens(address, Collections.<Token>singletonList(token));
assert SystemKeyspace.loadTokens().get(address).contains(token);
SystemKeyspace.removeEndpoint(address);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java b/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
index 0baad3b..3e38dfc 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java
@@ -19,7 +19,6 @@
package org.apache.cassandra.db.compaction;
import java.io.IOException;
-import java.net.InetAddress;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
@@ -31,6 +30,7 @@ import org.junit.Ignore;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.cql3.QueryProcessor;
@@ -67,7 +67,7 @@ public class AbstractPendingRepairTest extends AbstractRepairTest
// cutoff messaging service
MessagingService.instance().addMessageSink(new IMessageSink()
{
- public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
+ public boolean allowOutgoingMessage(MessageOut message, int id, InetAddressAndPort to)
{
return false;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 5f05fab..6e7e184 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db.compaction;
import java.io.File;
import java.io.IOException;
-import java.net.InetAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@@ -35,6 +34,7 @@ import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.*;
@@ -96,7 +96,7 @@ public class AntiCompactionTest
private void registerParentRepairSession(UUID sessionID, Collection<Range<Token>> ranges, long repairedAt, UUID pendingRepair) throws IOException
{
ActiveRepairService.instance.registerParentRepairSession(sessionID,
- InetAddress.getByName("10.0.0.1"),
+ InetAddressAndPort.getByName("10.0.0.1"),
Lists.newArrayList(cfs), ranges,
pendingRepair != null || repairedAt != UNREPAIRED_SSTABLE,
repairedAt, true, PreviewKind.NONE);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java
index b9e3c17..8290adf 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.db.compaction;
-import java.net.InetAddress;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
@@ -33,6 +32,7 @@ import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.schema.Schema;
@@ -58,7 +58,7 @@ public class CompactionManagerGetSSTablesForValidationTest
private String ks;
private static final String tbl = "tbl";
private ColumnFamilyStore cfs;
- private static InetAddress coordinator;
+ private static InetAddressAndPort coordinator;
private static Token MT;
@@ -73,7 +73,7 @@ public class CompactionManagerGetSSTablesForValidationTest
public static void setupClass() throws Exception
{
SchemaLoader.prepareServer();
- coordinator = InetAddress.getByName("10.0.0.1");
+ coordinator = InetAddressAndPort.getByName("10.0.0.1");
MT = DatabaseDescriptor.getPartitioner().getMinimumToken();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 624f119..567984d 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -195,7 +195,7 @@ public class LeveledCompactionStrategyTest
int gcBefore = keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(FBUtilities.nowInSeconds());
UUID parentRepSession = UUID.randomUUID();
ActiveRepairService.instance.registerParentRepairSession(parentRepSession,
- FBUtilities.getBroadcastAddress(),
+ FBUtilities.getBroadcastAddressAndPort(),
Arrays.asList(cfs),
Arrays.asList(range),
false,
@@ -203,7 +203,7 @@ public class LeveledCompactionStrategyTest
true,
PreviewKind.NONE);
RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, Arrays.asList(range));
- Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore, PreviewKind.NONE);
+ Validator validator = new Validator(desc, FBUtilities.getBroadcastAddressAndPort(), gcBefore, PreviewKind.NONE);
CompactionManager.instance.submitValidation(cfs, validator).get();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java b/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
index 658b87a..423ad28 100644
--- a/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
+++ b/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.db.view;
-import java.net.InetAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -28,6 +27,7 @@ import org.junit.Test;
import junit.framework.Assert;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.OrderPreservingPartitioner.StringToken;
@@ -59,12 +59,12 @@ public class ViewUtilsTest
metadata.clearUnsafe();
// DC1
- metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1"));
- metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.2"));
+ metadata.updateNormalToken(new StringToken("A"), InetAddressAndPort.getByName("127.0.0.1"));
+ metadata.updateNormalToken(new StringToken("C"), InetAddressAndPort.getByName("127.0.0.2"));
// DC2
- metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4"));
- metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5"));
+ metadata.updateNormalToken(new StringToken("B"), InetAddressAndPort.getByName("127.0.0.4"));
+ metadata.updateNormalToken(new StringToken("D"), InetAddressAndPort.getByName("127.0.0.5"));
Map<String, String> replicationMap = new HashMap<>();
replicationMap.put(ReplicationParams.CLASS, NetworkTopologyStrategy.class.getName());
@@ -76,12 +76,12 @@ public class ViewUtilsTest
KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, replicationMap));
Schema.instance.load(meta);
- Optional<InetAddress> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
- new StringToken("CA"),
- new StringToken("BB"));
+ Optional<InetAddressAndPort> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
+ new StringToken("CA"),
+ new StringToken("BB"));
Assert.assertTrue(naturalEndpoint.isPresent());
- Assert.assertEquals(InetAddress.getByName("127.0.0.2"), naturalEndpoint.get());
+ Assert.assertEquals(InetAddressAndPort.getByName("127.0.0.2"), naturalEndpoint.get());
}
@@ -92,12 +92,12 @@ public class ViewUtilsTest
metadata.clearUnsafe();
// DC1
- metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1"));
- metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.2"));
+ metadata.updateNormalToken(new StringToken("A"), InetAddressAndPort.getByName("127.0.0.1"));
+ metadata.updateNormalToken(new StringToken("C"), InetAddressAndPort.getByName("127.0.0.2"));
// DC2
- metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4"));
- metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5"));
+ metadata.updateNormalToken(new StringToken("B"), InetAddressAndPort.getByName("127.0.0.4"));
+ metadata.updateNormalToken(new StringToken("D"), InetAddressAndPort.getByName("127.0.0.5"));
Map<String, String> replicationMap = new HashMap<>();
replicationMap.put(ReplicationParams.CLASS, NetworkTopologyStrategy.class.getName());
@@ -109,12 +109,12 @@ public class ViewUtilsTest
KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, replicationMap));
Schema.instance.load(meta);
- Optional<InetAddress> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
- new StringToken("CA"),
- new StringToken("BB"));
+ Optional<InetAddressAndPort> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
+ new StringToken("CA"),
+ new StringToken("BB"));
Assert.assertTrue(naturalEndpoint.isPresent());
- Assert.assertEquals(InetAddress.getByName("127.0.0.1"), naturalEndpoint.get());
+ Assert.assertEquals(InetAddressAndPort.getByName("127.0.0.1"), naturalEndpoint.get());
}
@Test
@@ -124,12 +124,12 @@ public class ViewUtilsTest
metadata.clearUnsafe();
// DC1
- metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1"));
- metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.2"));
+ metadata.updateNormalToken(new StringToken("A"), InetAddressAndPort.getByName("127.0.0.1"));
+ metadata.updateNormalToken(new StringToken("C"), InetAddressAndPort.getByName("127.0.0.2"));
// DC2
- metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4"));
- metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5"));
+ metadata.updateNormalToken(new StringToken("B"), InetAddressAndPort.getByName("127.0.0.4"));
+ metadata.updateNormalToken(new StringToken("D"), InetAddressAndPort.getByName("127.0.0.5"));
Map<String, String> replicationMap = new HashMap<>();
replicationMap.put(ReplicationParams.CLASS, NetworkTopologyStrategy.class.getName());
@@ -141,9 +141,9 @@ public class ViewUtilsTest
KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, replicationMap));
Schema.instance.load(meta);
- Optional<InetAddress> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
- new StringToken("AB"),
- new StringToken("BB"));
+ Optional<InetAddressAndPort> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
+ new StringToken("AB"),
+ new StringToken("BB"));
Assert.assertFalse(naturalEndpoint.isPresent());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
index a1054bb..f11cb62 100644
--- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
+++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.dht;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.HashSet;
@@ -41,6 +40,7 @@ import org.junit.runner.RunWith;
import org.apache.cassandra.OrderedJUnit4ClassRunner;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.tokenallocator.TokenAllocation;
@@ -96,32 +96,32 @@ public class BootStrapperTest
generateFakeEndpoints(numOldNodes);
Token myToken = tmd.partitioner.getRandomToken();
- InetAddress myEndpoint = InetAddress.getByName("127.0.0.1");
+ InetAddressAndPort myEndpoint = InetAddressAndPort.getByName("127.0.0.1");
assertEquals(numOldNodes, tmd.sortedTokens().size());
RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, StreamOperation.BOOTSTRAP, true, DatabaseDescriptor.getEndpointSnitch(), new StreamStateStore(), false, 1);
IFailureDetector mockFailureDetector = new IFailureDetector()
{
- public boolean isAlive(InetAddress ep)
+ public boolean isAlive(InetAddressAndPort ep)
{
return true;
}
- public void interpret(InetAddress ep) { throw new UnsupportedOperationException(); }
- public void report(InetAddress ep) { throw new UnsupportedOperationException(); }
+ public void interpret(InetAddressAndPort ep) { throw new UnsupportedOperationException(); }
+ public void report(InetAddressAndPort ep) { throw new UnsupportedOperationException(); }
public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener) { throw new UnsupportedOperationException(); }
public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener) { throw new UnsupportedOperationException(); }
- public void remove(InetAddress ep) { throw new UnsupportedOperationException(); }
- public void forceConviction(InetAddress ep) { throw new UnsupportedOperationException(); }
+ public void remove(InetAddressAndPort ep) { throw new UnsupportedOperationException(); }
+ public void forceConviction(InetAddressAndPort ep) { throw new UnsupportedOperationException(); }
};
s.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(mockFailureDetector));
s.addRanges(keyspaceName, Keyspace.open(keyspaceName).getReplicationStrategy().getPendingAddressRanges(tmd, myToken, myEndpoint));
- Collection<Map.Entry<InetAddress, Collection<Range<Token>>>> toFetch = s.toFetch().get(keyspaceName);
+ Collection<Map.Entry<InetAddressAndPort, Collection<Range<Token>>>> toFetch = s.toFetch().get(keyspaceName);
// Check we get get RF new ranges in total
Set<Range<Token>> ranges = new HashSet<>();
- for (Map.Entry<InetAddress, Collection<Range<Token>>> e : toFetch)
+ for (Map.Entry<InetAddressAndPort, Collection<Range<Token>>> e : toFetch)
ranges.addAll(e.getValue());
assertEquals(replicationFactor, ranges.size());
@@ -151,7 +151,7 @@ public class BootStrapperTest
for (int i = 1; i <= numOldNodes; i++)
{
// leave .1 for myEndpoint
- InetAddress addr = InetAddress.getByName("127." + dc + "." + rack + "." + (i + 1));
+ InetAddressAndPort addr = InetAddressAndPort.getByName("127." + dc + "." + rack + "." + (i + 1));
List<Token> tokens = Lists.newArrayListWithCapacity(numVNodes);
for (int j = 0; j < numVNodes; ++j)
tokens.add(p.getRandomToken());
@@ -167,7 +167,7 @@ public class BootStrapperTest
String ks = "BootStrapperTestKeyspace3";
TokenMetadata tm = new TokenMetadata();
generateFakeEndpoints(tm, 10, vn);
- InetAddress addr = FBUtilities.getBroadcastAddress();
+ InetAddressAndPort addr = FBUtilities.getBroadcastAddressAndPort();
allocateTokensForNode(vn, ks, tm, addr);
}
@@ -184,15 +184,15 @@ public class BootStrapperTest
// Register peers with expected DC for NetworkTopologyStrategy.
TokenMetadata metadata = StorageService.instance.getTokenMetadata();
metadata.clearUnsafe();
- metadata.updateHostId(UUID.randomUUID(), InetAddress.getByName("127.1.0.99"));
- metadata.updateHostId(UUID.randomUUID(), InetAddress.getByName("127.15.0.99"));
+ metadata.updateHostId(UUID.randomUUID(), InetAddressAndPort.getByName("127.1.0.99"));
+ metadata.updateHostId(UUID.randomUUID(), InetAddressAndPort.getByName("127.15.0.99"));
SchemaLoader.createKeyspace(ks, KeyspaceParams.nts(dc, replicas, "15", 15), SchemaLoader.standardCFMD(ks, "Standard1"));
TokenMetadata tm = StorageService.instance.getTokenMetadata();
tm.clearUnsafe();
for (int i = 0; i < rackCount; ++i)
generateFakeEndpoints(tm, 10, vn, dc, Integer.toString(i));
- InetAddress addr = InetAddress.getByName("127." + dc + ".0.99");
+ InetAddressAndPort addr = InetAddressAndPort.getByName("127." + dc + ".0.99");
allocateTokensForNode(vn, ks, tm, addr);
// Note: Not matching replication factor in second datacentre, but this should not affect us.
} finally {
@@ -230,7 +230,7 @@ public class BootStrapperTest
testAllocateTokensNetworkStrategy(1, 1);
}
- private void allocateTokensForNode(int vn, String ks, TokenMetadata tm, InetAddress addr)
+ private void allocateTokensForNode(int vn, String ks, TokenMetadata tm, InetAddressAndPort addr)
{
SummaryStatistics os = TokenAllocation.replicatedOwnershipStats(tm.cloneOnlyTokenMap(), Keyspace.open(ks).getReplicationStrategy(), addr);
Collection<Token> tokens = BootStrapper.allocateTokens(tm, addr, ks, vn, 0);
@@ -260,14 +260,14 @@ public class BootStrapperTest
TokenMetadata tm = new TokenMetadata();
generateFakeEndpoints(tm, 10, vn);
- InetAddress dcaddr = FBUtilities.getBroadcastAddress();
+ InetAddressAndPort dcaddr = FBUtilities.getBroadcastAddressAndPort();
SummaryStatistics os3 = TokenAllocation.replicatedOwnershipStats(tm, Keyspace.open(ks3).getReplicationStrategy(), dcaddr);
SummaryStatistics os2 = TokenAllocation.replicatedOwnershipStats(tm, Keyspace.open(ks2).getReplicationStrategy(), dcaddr);
String cks = ks3;
String nks = ks2;
for (int i=11; i<=20; ++i)
{
- allocateTokensForNode(vn, cks, tm, InetAddress.getByName("127.0.0." + (i + 1)));
+ allocateTokensForNode(vn, cks, tm, InetAddressAndPort.getByName("127.0.0." + (i + 1)));
String t = cks; cks = nks; nks = t;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org