You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/01/25 20:12:01 UTC
[04/19] cassandra git commit: Allow storage port to be configurable
per node
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/locator/PropertyFileSnitchTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/PropertyFileSnitchTest.java b/test/unit/org/apache/cassandra/locator/PropertyFileSnitchTest.java
index 522b118..df81a9b 100644
--- a/test/unit/org/apache/cassandra/locator/PropertyFileSnitchTest.java
+++ b/test/unit/org/apache/cassandra/locator/PropertyFileSnitchTest.java
@@ -18,7 +18,7 @@
package org.apache.cassandra.locator;
import java.io.IOException;
-import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -32,8 +32,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-
-import com.google.common.net.InetAddresses;
+import java.util.regex.Matcher;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.IPartitioner;
@@ -61,7 +60,7 @@ public class PropertyFileSnitchTest
private Path backupFile;
private VersionedValue.VersionedValueFactory valueFactory;
- private Map<InetAddress, Set<Token>> tokenMap;
+ private Map<InetAddressAndPort, Set<Token>> tokenMap;
@BeforeClass
public static void setupDD()
@@ -78,17 +77,17 @@ public class PropertyFileSnitchTest
restoreOrigConfigFile();
- InetAddress[] hosts = {
- InetAddress.getByName("127.0.0.1"), // this exists in the config file
- InetAddress.getByName("127.0.0.2"), // this exists in the config file
- InetAddress.getByName("127.0.0.9"), // this does not exist in the config file
+ InetAddressAndPort[] hosts = {
+ InetAddressAndPort.getByName("127.0.0.1"), // this exists in the config file
+ InetAddressAndPort.getByName("127.0.0.2"), // this exists in the config file
+ InetAddressAndPort.getByName("127.0.0.9"), // this does not exist in the config file
};
IPartitioner partitioner = new RandomPartitioner();
valueFactory = new VersionedValue.VersionedValueFactory(partitioner);
tokenMap = new HashMap<>();
- for (InetAddress host : hosts)
+ for (InetAddressAndPort host : hosts)
{
Set<Token> tokens = Collections.singleton(partitioner.getRandomToken());
Gossiper.instance.initializeNodeUnsafe(host, UUID.randomUUID(), 1);
@@ -117,13 +116,21 @@ public class PropertyFileSnitchTest
for (String line : lines)
{
String[] info = line.split("=");
- if (info.length == 2 && replacements.containsKey(info[0]))
+ if (info.length == 2 && !line.startsWith("#") && !line.startsWith("default="))
{
- String replacement = replacements.get(info[0]);
- if (!replacement.isEmpty()) // empty means remove this line
- newLines.add(info[0] + '=' + replacement);
-
- replaced.add(info[0]);
+ InetAddressAndPort address = InetAddressAndPort.getByName(info[0].replaceAll(Matcher.quoteReplacement("\\:"), ":"));
+ String replacement = replacements.get(address.toString());
+ if (replacement != null)
+ {
+ if (!replacement.isEmpty()) // empty means remove this line
+ newLines.add(info[0] + '=' + replacement);
+
+ replaced.add(address.toString());
+ }
+ else
+ {
+ newLines.add(line);
+ }
}
else
{
@@ -138,21 +145,26 @@ public class PropertyFileSnitchTest
continue;
if (!replacement.getValue().isEmpty()) // empty means remove this line so do nothing here
- newLines.add(replacement.getKey() + '=' + replacement.getValue());
+ {
+ String escaped = replacement.getKey().replaceAll(Matcher.quoteReplacement(":"), "\\\\:");
+ newLines.add(escaped + '=' + replacement.getValue());
+ }
}
Files.write(effectiveFile, newLines, StandardCharsets.UTF_8, StandardOpenOption.TRUNCATE_EXISTING);
}
- private void setNodeShutdown(InetAddress host)
+ private void setNodeShutdown(InetAddressAndPort host)
{
StorageService.instance.getTokenMetadata().removeEndpoint(host);
+ Gossiper.instance.injectApplicationState(host, ApplicationState.STATUS_WITH_PORT, valueFactory.shutdown(true));
Gossiper.instance.injectApplicationState(host, ApplicationState.STATUS, valueFactory.shutdown(true));
Gossiper.instance.markDead(host, Gossiper.instance.getEndpointStateForEndpoint(host));
}
- private void setNodeLive(InetAddress host)
+ private void setNodeLive(InetAddressAndPort host)
{
+ Gossiper.instance.injectApplicationState(host, ApplicationState.STATUS_WITH_PORT, valueFactory.normal(tokenMap.get(host)));
Gossiper.instance.injectApplicationState(host, ApplicationState.STATUS, valueFactory.normal(tokenMap.get(host)));
Gossiper.instance.realMarkAlive(host, Gossiper.instance.getEndpointStateForEndpoint(host));
StorageService.instance.getTokenMetadata().updateNormalTokens(tokenMap.get(host), host);
@@ -160,9 +172,9 @@ public class PropertyFileSnitchTest
private static void checkEndpoint(final AbstractNetworkTopologySnitch snitch,
final String endpointString, final String expectedDatacenter,
- final String expectedRack)
+ final String expectedRack) throws UnknownHostException
{
- final InetAddress endpoint = InetAddresses.forString(endpointString);
+ final InetAddressAndPort endpoint = InetAddressAndPort.getByName(endpointString);
assertEquals(expectedDatacenter, snitch.getDatacenter(endpoint));
assertEquals(expectedRack, snitch.getRack(endpoint));
}
@@ -174,25 +186,25 @@ public class PropertyFileSnitchTest
@Test
public void testChangeHostRack() throws Exception
{
- final InetAddress host = InetAddress.getByName("127.0.0.1");
+ final InetAddressAndPort host = InetAddressAndPort.getByName("127.0.0.1");
final PropertyFileSnitch snitch = new PropertyFileSnitch(/*refreshPeriodInSeconds*/1);
- checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC1");
+ checkEndpoint(snitch, host.toString(), "DC1", "RAC1");
try
{
setNodeLive(host);
Files.copy(effectiveFile, backupFile);
- replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "DC1:RAC2"));
+ replaceConfigFile(Collections.singletonMap(host.toString(), "DC1:RAC2"));
Thread.sleep(1500);
- checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC1");
+ checkEndpoint(snitch, host.toString(), "DC1", "RAC1");
setNodeShutdown(host);
- replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "DC1:RAC2"));
+ replaceConfigFile(Collections.singletonMap(host.toString(), "DC1:RAC2"));
Thread.sleep(1500);
- checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC2");
+ checkEndpoint(snitch, host.toString(), "DC1", "RAC2");
}
finally
{
@@ -208,25 +220,25 @@ public class PropertyFileSnitchTest
@Test
public void testChangeHostDc() throws Exception
{
- final InetAddress host = InetAddress.getByName("127.0.0.1");
+ final InetAddressAndPort host = InetAddressAndPort.getByName("127.0.0.1");
final PropertyFileSnitch snitch = new PropertyFileSnitch(/*refreshPeriodInSeconds*/1);
- checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC1");
+ checkEndpoint(snitch, host.toString(), "DC1", "RAC1");
try
{
setNodeLive(host);
Files.copy(effectiveFile, backupFile);
- replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "DC2:RAC1"));
+ replaceConfigFile(Collections.singletonMap(host.toString(), "DC2:RAC1"));
Thread.sleep(1500);
- checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC1");
+ checkEndpoint(snitch, host.toString(), "DC1", "RAC1");
setNodeShutdown(host);
- replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "DC2:RAC1"));
+ replaceConfigFile(Collections.singletonMap(host.toString(), "DC2:RAC1"));
Thread.sleep(1500);
- checkEndpoint(snitch, host.getHostAddress(), "DC2", "RAC1");
+ checkEndpoint(snitch, host.toString(), "DC2", "RAC1");
}
finally
{
@@ -243,25 +255,25 @@ public class PropertyFileSnitchTest
@Test
public void testAddHost() throws Exception
{
- final InetAddress host = InetAddress.getByName("127.0.0.9");
+ final InetAddressAndPort host = InetAddressAndPort.getByName("127.0.0.9");
final PropertyFileSnitch snitch = new PropertyFileSnitch(/*refreshPeriodInSeconds*/1);
- checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // default
+ checkEndpoint(snitch, host.toString(), "DC1", "r1"); // default
try
{
setNodeLive(host);
Files.copy(effectiveFile, backupFile);
- replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "DC2:RAC2")); // add this line if not yet there
+ replaceConfigFile(Collections.singletonMap(host.toString(), "DC2:RAC2")); // add this line if not yet there
Thread.sleep(1500);
- checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // unchanged
+ checkEndpoint(snitch, host.toString(), "DC1", "r1"); // unchanged
setNodeShutdown(host);
- replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "DC2:RAC2")); // add this line if not yet there
+ replaceConfigFile(Collections.singletonMap(host.toString(), "DC2:RAC2")); // add this line if not yet there
Thread.sleep(1500);
- checkEndpoint(snitch, host.getHostAddress(), "DC2", "RAC2"); // changed
+ checkEndpoint(snitch, host.toString(), "DC2", "RAC2"); // changed
}
finally
{
@@ -278,25 +290,25 @@ public class PropertyFileSnitchTest
@Test
public void testRemoveHost() throws Exception
{
- final InetAddress host = InetAddress.getByName("127.0.0.2");
+ final InetAddressAndPort host = InetAddressAndPort.getByName("127.0.0.2");
final PropertyFileSnitch snitch = new PropertyFileSnitch(/*refreshPeriodInSeconds*/1);
- checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC2");
+ checkEndpoint(snitch, host.toString(), "DC1", "RAC2");
try
{
setNodeLive(host);
Files.copy(effectiveFile, backupFile);
- replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "")); // removes line if found
+ replaceConfigFile(Collections.singletonMap(host.toString(), "")); // removes line if found
Thread.sleep(1500);
- checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC2"); // unchanged
+ checkEndpoint(snitch, host.toString(), "DC1", "RAC2"); // unchanged
setNodeShutdown(host);
- replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "")); // removes line if found
+ replaceConfigFile(Collections.singletonMap(host.toString(), "")); // removes line if found
Thread.sleep(1500);
- checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // default
+ checkEndpoint(snitch, host.toString(), "DC1", "r1"); // default
}
finally
{
@@ -313,9 +325,9 @@ public class PropertyFileSnitchTest
@Test
public void testChangeDefault() throws Exception
{
- final InetAddress host = InetAddress.getByName("127.0.0.9");
+ final InetAddressAndPort host = InetAddressAndPort.getByName("127.0.0.9");
final PropertyFileSnitch snitch = new PropertyFileSnitch(/*refreshPeriodInSeconds*/1);
- checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // default
+ checkEndpoint(snitch, host.toString(), "DC1", "r1"); // default
try
{
@@ -325,13 +337,13 @@ public class PropertyFileSnitchTest
replaceConfigFile(Collections.singletonMap("default", "DC2:r2")); // change default
Thread.sleep(1500);
- checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // unchanged
+ checkEndpoint(snitch, host.toString(), "DC1", "r1"); // unchanged
setNodeShutdown(host);
replaceConfigFile(Collections.singletonMap("default", "DC2:r2")); // change default again (refresh file update)
Thread.sleep(1500);
- checkEndpoint(snitch, host.getHostAddress(), "DC2", "r2"); // default updated
+ checkEndpoint(snitch, host.toString(), "DC2", "r2"); // default updated
}
finally
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java b/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java
index 232865a..b1c3775 100644
--- a/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java
+++ b/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java
@@ -50,7 +50,7 @@ public class ReconnectableSnitchHelperTest
public void failedAuthentication() throws Exception
{
DatabaseDescriptor.setInternodeAuthenticator(MessagingServiceTest.ALLOW_NOTHING_AUTHENTICATOR);
- InetAddress address = InetAddress.getByName("127.0.0.250");
+ InetAddressAndPort address = InetAddressAndPort.getByName("127.0.0.250");
//Should tolerate null returns by MS for the connection
ReconnectableSnitchHelper.reconnect(address, address, null, null);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java b/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
index b83194b..a8caa72 100644
--- a/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
+++ b/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.locator;
-import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -55,14 +54,14 @@ public class ReplicationStrategyEndpointCacheTest
strategy = getStrategyWithNewTokenMetadata(Keyspace.open(KEYSPACE).getReplicationStrategy(), tmd);
- tmd.updateNormalToken(new BigIntegerToken(String.valueOf(10)), InetAddress.getByName("127.0.0.1"));
- tmd.updateNormalToken(new BigIntegerToken(String.valueOf(20)), InetAddress.getByName("127.0.0.2"));
- tmd.updateNormalToken(new BigIntegerToken(String.valueOf(30)), InetAddress.getByName("127.0.0.3"));
- tmd.updateNormalToken(new BigIntegerToken(String.valueOf(40)), InetAddress.getByName("127.0.0.4"));
- //tmd.updateNormalToken(new BigIntegerToken(String.valueOf(50)), InetAddress.getByName("127.0.0.5"));
- tmd.updateNormalToken(new BigIntegerToken(String.valueOf(60)), InetAddress.getByName("127.0.0.6"));
- tmd.updateNormalToken(new BigIntegerToken(String.valueOf(70)), InetAddress.getByName("127.0.0.7"));
- tmd.updateNormalToken(new BigIntegerToken(String.valueOf(80)), InetAddress.getByName("127.0.0.8"));
+ tmd.updateNormalToken(new BigIntegerToken(String.valueOf(10)), InetAddressAndPort.getByName("127.0.0.1"));
+ tmd.updateNormalToken(new BigIntegerToken(String.valueOf(20)), InetAddressAndPort.getByName("127.0.0.2"));
+ tmd.updateNormalToken(new BigIntegerToken(String.valueOf(30)), InetAddressAndPort.getByName("127.0.0.3"));
+ tmd.updateNormalToken(new BigIntegerToken(String.valueOf(40)), InetAddressAndPort.getByName("127.0.0.4"));
+ //tmd.updateNormalToken(new BigIntegerToken(String.valueOf(50)), InetAddressAndPort.getByName("127.0.0.5", null, null));
+ tmd.updateNormalToken(new BigIntegerToken(String.valueOf(60)), InetAddressAndPort.getByName("127.0.0.6"));
+ tmd.updateNormalToken(new BigIntegerToken(String.valueOf(70)), InetAddressAndPort.getByName("127.0.0.7"));
+ tmd.updateNormalToken(new BigIntegerToken(String.valueOf(80)), InetAddressAndPort.getByName("127.0.0.8"));
}
@Test
@@ -90,31 +89,31 @@ public class ReplicationStrategyEndpointCacheTest
public void runCacheRespectsTokenChangesTest(Class stratClass, Map<String, String> configOptions) throws Exception
{
setup(stratClass, configOptions);
- ArrayList<InetAddress> initial;
- ArrayList<InetAddress> endpoints;
+ ArrayList<InetAddressAndPort> initial;
+ ArrayList<InetAddressAndPort> endpoints;
endpoints = strategy.getNaturalEndpoints(searchToken);
assert endpoints.size() == 5 : StringUtils.join(endpoints, ",");
// test token addition, in DC2 before existing token
initial = strategy.getNaturalEndpoints(searchToken);
- tmd.updateNormalToken(new BigIntegerToken(String.valueOf(35)), InetAddress.getByName("127.0.0.5"));
+ tmd.updateNormalToken(new BigIntegerToken(String.valueOf(35)), InetAddressAndPort.getByName("127.0.0.5"));
endpoints = strategy.getNaturalEndpoints(searchToken);
assert endpoints.size() == 5 : StringUtils.join(endpoints, ",");
assert !endpoints.equals(initial);
// test token removal, newly created token
initial = strategy.getNaturalEndpoints(searchToken);
- tmd.removeEndpoint(InetAddress.getByName("127.0.0.5"));
+ tmd.removeEndpoint(InetAddressAndPort.getByName("127.0.0.5"));
endpoints = strategy.getNaturalEndpoints(searchToken);
assert endpoints.size() == 5 : StringUtils.join(endpoints, ",");
- assert !endpoints.contains(InetAddress.getByName("127.0.0.5"));
+ assert !endpoints.contains(InetAddressAndPort.getByName("127.0.0.5"));
assert !endpoints.equals(initial);
// test token change
initial = strategy.getNaturalEndpoints(searchToken);
//move .8 after search token but before other DC3
- tmd.updateNormalToken(new BigIntegerToken(String.valueOf(25)), InetAddress.getByName("127.0.0.8"));
+ tmd.updateNormalToken(new BigIntegerToken(String.valueOf(25)), InetAddressAndPort.getByName("127.0.0.8"));
endpoints = strategy.getNaturalEndpoints(searchToken);
assert endpoints.size() == 5 : StringUtils.join(endpoints, ",");
assert !endpoints.equals(initial);
@@ -129,7 +128,7 @@ public class ReplicationStrategyEndpointCacheTest
super(keyspaceName, tokenMetadata, snitch, configOptions);
}
- public List<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
+ public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
{
assert !called : "calculateNaturalEndpoints was already called, result should have been cached";
called = true;
@@ -146,7 +145,7 @@ public class ReplicationStrategyEndpointCacheTest
super(keyspaceName, tokenMetadata, snitch, configOptions);
}
- public List<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
+ public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
{
assert !called : "calculateNaturalEndpoints was already called, result should have been cached";
called = true;
@@ -163,7 +162,7 @@ public class ReplicationStrategyEndpointCacheTest
super(keyspaceName, tokenMetadata, snitch, configOptions);
}
- public List<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
+ public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
{
assert !called : "calculateNaturalEndpoints was already called, result should have been cached";
called = true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
index f97a6e5..fe77b0e 100644
--- a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.locator;
-import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
@@ -98,22 +97,22 @@ public class SimpleStrategyTest
{
tmd = new TokenMetadata();
strategy = getStrategy(keyspaceName, tmd);
- List<InetAddress> hosts = new ArrayList<InetAddress>();
+ List<InetAddressAndPort> hosts = new ArrayList<>();
for (int i = 0; i < endpointTokens.length; i++)
{
- InetAddress ep = InetAddress.getByName("127.0.0." + String.valueOf(i + 1));
+ InetAddressAndPort ep = InetAddressAndPort.getByName("127.0.0." + String.valueOf(i + 1));
tmd.updateNormalToken(endpointTokens[i], ep);
hosts.add(ep);
}
for (int i = 0; i < keyTokens.length; i++)
{
- List<InetAddress> endpoints = strategy.getNaturalEndpoints(keyTokens[i]);
+ List<InetAddressAndPort> endpoints = strategy.getNaturalEndpoints(keyTokens[i]);
assertEquals(strategy.getReplicationFactor(), endpoints.size());
- List<InetAddress> correctEndpoints = new ArrayList<InetAddress>();
+ List<InetAddressAndPort> correctEndpoints = new ArrayList<>();
for (int j = 0; j < endpoints.size(); j++)
correctEndpoints.add(hosts.get((i + j + 1) % hosts.size()));
- assertEquals(new HashSet<InetAddress>(correctEndpoints), new HashSet<InetAddress>(endpoints));
+ assertEquals(new HashSet<>(correctEndpoints), new HashSet<>(endpoints));
}
}
}
@@ -135,17 +134,17 @@ public class SimpleStrategyTest
keyTokens[i] = new BigIntegerToken(String.valueOf(RING_SIZE * 2 * i + RING_SIZE));
}
- List<InetAddress> hosts = new ArrayList<InetAddress>();
+ List<InetAddressAndPort> hosts = new ArrayList<>();
for (int i = 0; i < endpointTokens.length; i++)
{
- InetAddress ep = InetAddress.getByName("127.0.0." + String.valueOf(i + 1));
+ InetAddressAndPort ep = InetAddressAndPort.getByName("127.0.0." + String.valueOf(i + 1));
tmd.updateNormalToken(endpointTokens[i], ep);
hosts.add(ep);
}
// bootstrap at the end of the ring
Token bsToken = new BigIntegerToken(String.valueOf(210));
- InetAddress bootstrapEndpoint = InetAddress.getByName("127.0.0.11");
+ InetAddressAndPort bootstrapEndpoint = InetAddressAndPort.getByName("127.0.0.11");
tmd.addBootstrapToken(bsToken, bootstrapEndpoint);
AbstractReplicationStrategy strategy = null;
@@ -159,7 +158,7 @@ public class SimpleStrategyTest
for (int i = 0; i < keyTokens.length; i++)
{
- Collection<InetAddress> endpoints = tmd.getWriteEndpoints(keyTokens[i], keyspaceName, strategy.getNaturalEndpoints(keyTokens[i]));
+ Collection<InetAddressAndPort> endpoints = tmd.getWriteEndpoints(keyTokens[i], keyspaceName, strategy.getNaturalEndpoints(keyTokens[i]));
assertTrue(endpoints.size() >= replicationFactor);
for (int j = 0; j < replicationFactor; j++)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
index e5a86fd..b589d2d 100644
--- a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
+++ b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.locator;
-import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Map;
@@ -56,8 +55,8 @@ public class TokenMetadataTest
{
DatabaseDescriptor.daemonInitialization();
tmd = StorageService.instance.getTokenMetadata();
- tmd.updateNormalToken(token(ONE), InetAddress.getByName("127.0.0.1"));
- tmd.updateNormalToken(token(SIX), InetAddress.getByName("127.0.0.6"));
+ tmd.updateNormalToken(token(ONE), InetAddressAndPort.getByName("127.0.0.1"));
+ tmd.updateNormalToken(token(SIX), InetAddressAndPort.getByName("127.0.0.6"));
}
private static void testRingIterator(ArrayList<Token> ring, String start, boolean includeMin, String... expected)
@@ -98,8 +97,8 @@ public class TokenMetadataTest
@Test
public void testTopologyUpdate_RackConsolidation() throws UnknownHostException
{
- final InetAddress first = InetAddress.getByName("127.0.0.1");
- final InetAddress second = InetAddress.getByName("127.0.0.6");
+ final InetAddressAndPort first = InetAddressAndPort.getByName("127.0.0.1");
+ final InetAddressAndPort second = InetAddressAndPort.getByName("127.0.0.6");
final String DATA_CENTER = "datacenter1";
final String RACK1 = "rack1";
final String RACK2 = "rack2";
@@ -107,19 +106,19 @@ public class TokenMetadataTest
DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch()
{
@Override
- public String getRack(InetAddress endpoint)
+ public String getRack(InetAddressAndPort endpoint)
{
return endpoint.equals(first) ? RACK1 : RACK2;
}
@Override
- public String getDatacenter(InetAddress endpoint)
+ public String getDatacenter(InetAddressAndPort endpoint)
{
return DATA_CENTER;
}
@Override
- public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+ public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2)
{
return 0;
}
@@ -134,14 +133,14 @@ public class TokenMetadataTest
TokenMetadata.Topology topology = tokenMetadata.getTopology();
assertNotNull(topology);
- Multimap<String, InetAddress> allEndpoints = topology.getDatacenterEndpoints();
+ Multimap<String, InetAddressAndPort> allEndpoints = topology.getDatacenterEndpoints();
assertNotNull(allEndpoints);
assertTrue(allEndpoints.size() == 2);
assertTrue(allEndpoints.containsKey(DATA_CENTER));
assertTrue(allEndpoints.get(DATA_CENTER).contains(first));
assertTrue(allEndpoints.get(DATA_CENTER).contains(second));
- Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks();
+ Map<String, Multimap<String, InetAddressAndPort>> racks = topology.getDatacenterRacks();
assertNotNull(racks);
assertTrue(racks.size() == 1);
assertTrue(racks.containsKey(DATA_CENTER));
@@ -154,19 +153,19 @@ public class TokenMetadataTest
DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch()
{
@Override
- public String getRack(InetAddress endpoint)
+ public String getRack(InetAddressAndPort endpoint)
{
return RACK1;
}
@Override
- public String getDatacenter(InetAddress endpoint)
+ public String getDatacenter(InetAddressAndPort endpoint)
{
return DATA_CENTER;
}
@Override
- public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+ public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2)
{
return 0;
}
@@ -196,8 +195,8 @@ public class TokenMetadataTest
@Test
public void testTopologyUpdate_RackExpansion() throws UnknownHostException
{
- final InetAddress first = InetAddress.getByName("127.0.0.1");
- final InetAddress second = InetAddress.getByName("127.0.0.6");
+ final InetAddressAndPort first = InetAddressAndPort.getByName("127.0.0.1");
+ final InetAddressAndPort second = InetAddressAndPort.getByName("127.0.0.6");
final String DATA_CENTER = "datacenter1";
final String RACK1 = "rack1";
final String RACK2 = "rack2";
@@ -205,19 +204,19 @@ public class TokenMetadataTest
DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch()
{
@Override
- public String getRack(InetAddress endpoint)
+ public String getRack(InetAddressAndPort endpoint)
{
return RACK1;
}
@Override
- public String getDatacenter(InetAddress endpoint)
+ public String getDatacenter(InetAddressAndPort endpoint)
{
return DATA_CENTER;
}
@Override
- public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+ public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2)
{
return 0;
}
@@ -232,14 +231,14 @@ public class TokenMetadataTest
TokenMetadata.Topology topology = tokenMetadata.getTopology();
assertNotNull(topology);
- Multimap<String, InetAddress> allEndpoints = topology.getDatacenterEndpoints();
+ Multimap<String, InetAddressAndPort> allEndpoints = topology.getDatacenterEndpoints();
assertNotNull(allEndpoints);
assertTrue(allEndpoints.size() == 2);
assertTrue(allEndpoints.containsKey(DATA_CENTER));
assertTrue(allEndpoints.get(DATA_CENTER).contains(first));
assertTrue(allEndpoints.get(DATA_CENTER).contains(second));
- Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks();
+ Map<String, Multimap<String, InetAddressAndPort>> racks = topology.getDatacenterRacks();
assertNotNull(racks);
assertTrue(racks.size() == 1);
assertTrue(racks.containsKey(DATA_CENTER));
@@ -252,19 +251,19 @@ public class TokenMetadataTest
DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch()
{
@Override
- public String getRack(InetAddress endpoint)
+ public String getRack(InetAddressAndPort endpoint)
{
return endpoint.equals(first) ? RACK1 : RACK2;
}
@Override
- public String getDatacenter(InetAddress endpoint)
+ public String getDatacenter(InetAddressAndPort endpoint)
{
return DATA_CENTER;
}
@Override
- public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+ public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2)
{
return 0;
}
@@ -293,8 +292,8 @@ public class TokenMetadataTest
@Test
public void testEndpointSizes() throws UnknownHostException
{
- final InetAddress first = InetAddress.getByName("127.0.0.1");
- final InetAddress second = InetAddress.getByName("127.0.0.6");
+ final InetAddressAndPort first = InetAddressAndPort.getByName("127.0.0.1");
+ final InetAddressAndPort second = InetAddressAndPort.getByName("127.0.0.6");
tmd.updateNormalToken(token(ONE), first);
tmd.updateNormalToken(token(SIX), second);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/metrics/HintedHandOffMetricsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/metrics/HintedHandOffMetricsTest.java b/test/unit/org/apache/cassandra/metrics/HintedHandOffMetricsTest.java
index 2394e0c..2b13715 100644
--- a/test/unit/org/apache/cassandra/metrics/HintedHandOffMetricsTest.java
+++ b/test/unit/org/apache/cassandra/metrics/HintedHandOffMetricsTest.java
@@ -20,7 +20,6 @@
*/
package org.apache.cassandra.metrics;
-import java.net.InetAddress;
import java.util.Map;
import java.util.UUID;
@@ -35,6 +34,7 @@ import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.hints.HintsService;
+import org.apache.cassandra.locator.InetAddressAndPort;
import static org.junit.Assert.assertEquals;
import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
@@ -53,11 +53,15 @@ public class HintedHandOffMetricsTest
DatabaseDescriptor.getHintsDirectory().mkdirs();
for (int i = 0; i < 99; i++)
- HintsService.instance.metrics.incrPastWindow(InetAddress.getByName("127.0.0.1"));
+ HintsService.instance.metrics.incrPastWindow(InetAddressAndPort.getLocalHost());
HintsService.instance.metrics.log();
- UntypedResultSet rows = executeInternal("SELECT hints_dropped FROM system." + SystemKeyspace.PEER_EVENTS);
+ UntypedResultSet rows = executeInternal("SELECT hints_dropped FROM system." + SystemKeyspace.PEER_EVENTS_V2);
Map<UUID, Integer> returned = rows.one().getMap("hints_dropped", UUIDType.instance, Int32Type.instance);
assertEquals(Iterators.getLast(returned.values().iterator()).intValue(), 99);
+
+ rows = executeInternal("SELECT hints_dropped FROM system." + SystemKeyspace.LEGACY_PEER_EVENTS);
+ returned = rows.one().getMap("hints_dropped", UUIDType.instance, Int32Type.instance);
+ assertEquals(Iterators.getLast(returned.values().iterator()).intValue(), 99);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/CompactEndpointSerializationHelperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/CompactEndpointSerializationHelperTest.java b/test/unit/org/apache/cassandra/net/CompactEndpointSerializationHelperTest.java
new file mode 100644
index 0000000..3232455
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/CompactEndpointSerializationHelperTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+import static org.junit.Assert.assertEquals;
+
+public class CompactEndpointSerializationHelperTest
+{
+
+ @Test
+ public void testRoundtrip() throws Exception
+ {
+ InetAddressAndPort ipv4 = InetAddressAndPort.getByName("127.0.0.1:42");
+ InetAddressAndPort ipv6 = InetAddressAndPort.getByName("[2001:db8:0:0:0:ff00:42:8329]:42");
+
+ testAddress(ipv4, MessagingService.VERSION_30);
+ testAddress(ipv6, MessagingService.VERSION_30);
+ testAddress(ipv4, MessagingService.current_version);
+ testAddress(ipv6, MessagingService.current_version);
+ }
+
+ private void testAddress(InetAddressAndPort address, int version) throws Exception
+ {
+ ByteBuffer out;
+ try (DataOutputBuffer dob = new DataOutputBuffer())
+ {
+ CompactEndpointSerializationHelper.instance.serialize(address, dob, version);
+ out = dob.buffer();
+ }
+ assertEquals(out.remaining(), CompactEndpointSerializationHelper.instance.serializedSize(address, version));
+
+ InetAddressAndPort roundtripped;
+ try (DataInputBuffer dib = new DataInputBuffer(out, false))
+ {
+ roundtripped = CompactEndpointSerializationHelper.instance.deserialize(dib, version);
+ }
+
+ if (version >= MessagingService.VERSION_40)
+ {
+ assertEquals(address, roundtripped);
+ }
+ else
+ {
+ assertEquals(roundtripped.address, address.address);
+ assertEquals(7000, roundtripped.port);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/ForwardToContainerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/ForwardToContainerTest.java b/test/unit/org/apache/cassandra/net/ForwardToContainerTest.java
new file mode 100644
index 0000000..195d734
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/ForwardToContainerTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Test;
+
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ForwardToContainerTest
+{
+ @Test
+ public void testCurrent() throws Exception
+ {
+ testVersion(MessagingService.current_version);
+ }
+
+ @Test
+ public void test30() throws Exception
+ {
+ testVersion(MessagingService.VERSION_30);
+ }
+
+ private void testVersion(int version) throws Exception
+ {
+ InetAddressAndPort.initializeDefaultPort(65532);
+ List<InetAddressAndPort> addresses = ImmutableList.of(InetAddressAndPort.getByNameOverrideDefaults("127.0.0.1", 42),
+ InetAddressAndPort.getByName("127.0.0.1"),
+ InetAddressAndPort.getByName("127.0.0.1:7000"),
+ InetAddressAndPort.getByNameOverrideDefaults("2001:0db8:0000:0000:0000:ff00:0042:8329", 42),
+ InetAddressAndPort.getByName("2001:0db8:0000:0000:0000:ff00:0042:8329"),
+ InetAddressAndPort.getByName("[2001:0db8:0000:0000:0000:ff00:0042:8329]:7000"));
+
+ ForwardToContainer ftc = new ForwardToContainer(addresses, new int[] { 44, 45, 46, 47, 48, 49 });
+ ByteBuffer buffer;
+ try (DataOutputBuffer dob = new DataOutputBuffer())
+ {
+ ForwardToSerializer.instance.serialize(ftc, dob, version);
+ buffer = dob.buffer();
+ }
+
+ assertEquals(buffer.remaining(), ForwardToSerializer.instance.serializedSize(ftc, version));
+
+ ForwardToContainer deserialized;
+ try (DataInputBuffer dib = new DataInputBuffer(buffer, false))
+ {
+ deserialized = ForwardToSerializer.instance.deserialize(dib, version);
+ }
+
+ assertTrue(Arrays.equals(ftc.messageIds, deserialized.messageIds));
+
+ Iterator<InetAddressAndPort> iterator = deserialized.targets.iterator();
+ if (version >= MessagingService.VERSION_40)
+ {
+ for (int ii = 0; ii < addresses.size(); ii++)
+ {
+ InetAddressAndPort original = addresses.get(ii);
+ InetAddressAndPort roundtripped = iterator.next();
+ assertEquals(original, roundtripped);
+ }
+ }
+ else
+ {
+ for (int ii = 0; ii < addresses.size(); ii++)
+ {
+ InetAddressAndPort original = addresses.get(ii);
+ InetAddressAndPort roundtripped = iterator.next();
+ assertEquals(original.address, roundtripped.address);
+ //3.0 can't send port numbers so you get the defaults
+ assertEquals(65532, roundtripped.port);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/Matcher.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/Matcher.java b/test/unit/org/apache/cassandra/net/Matcher.java
index cd1b667..27b685f 100644
--- a/test/unit/org/apache/cassandra/net/Matcher.java
+++ b/test/unit/org/apache/cassandra/net/Matcher.java
@@ -17,7 +17,7 @@
*/
package org.apache.cassandra.net;
-import java.net.InetAddress;
+import org.apache.cassandra.locator.InetAddressAndPort;
/**
* Predicate based on intercepted, outgoing messange and the message's destination address.
@@ -28,5 +28,5 @@ public interface Matcher<T>
* @param obj intercepted outgoing message
* @param to destination address
*/
- public boolean matches(MessageOut<T> obj, InetAddress to);
+ public boolean matches(MessageOut<T> obj, InetAddressAndPort to);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/MatcherResponse.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MatcherResponse.java b/test/unit/org/apache/cassandra/net/MatcherResponse.java
index 6cd8085..7a1772a 100644
--- a/test/unit/org/apache/cassandra/net/MatcherResponse.java
+++ b/test/unit/org/apache/cassandra/net/MatcherResponse.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.net;
-import java.net.InetAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.Queue;
@@ -27,6 +26,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
/**
* Sends a response for an incoming message with a matching {@link Matcher}.
* The actual behavior by any instance of this class can be inspected by
@@ -76,7 +77,7 @@ public class MatcherResponse
* Respond with the message created by the provided function that will be called with each intercepted outbound message.
* @param fnResponse function to call for creating reply based on intercepted message and target address
*/
- public <T, S> MockMessagingSpy respond(BiFunction<MessageOut<T>, InetAddress, MessageIn<S>> fnResponse)
+ public <T, S> MockMessagingSpy respond(BiFunction<MessageOut<T>, InetAddressAndPort, MessageIn<S>> fnResponse)
{
return respondN(fnResponse, Integer.MAX_VALUE);
}
@@ -101,7 +102,7 @@ public class MatcherResponse
*/
public <T, S> MockMessagingSpy respondNWithPayloadForEachReceiver(Function<MessageOut<T>, S> fnResponse, MessagingService.Verb verb, int limit)
{
- return respondN((MessageOut<T> msg, InetAddress to) -> {
+ return respondN((MessageOut<T> msg, InetAddressAndPort to) -> {
S payload = fnResponse.apply(msg);
if (payload == null)
return null;
@@ -147,7 +148,7 @@ public class MatcherResponse
* each intercepted outbound message.
* @param fnResponse function to call for creating reply based on intercepted message and target address
*/
- public <T, S> MockMessagingSpy respondN(BiFunction<MessageOut<T>, InetAddress, MessageIn<S>> fnResponse, int limit)
+ public <T, S> MockMessagingSpy respondN(BiFunction<MessageOut<T>, InetAddressAndPort, MessageIn<S>> fnResponse, int limit)
{
limitCounter.set(limit);
@@ -155,7 +156,7 @@ public class MatcherResponse
sink = new IMessageSink()
{
- public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
+ public boolean allowOutgoingMessage(MessageOut message, int id, InetAddressAndPort to)
{
// prevent outgoing message from being send in case matcher indicates a match
// and instead send the mocked response
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
index f0a959e..4ce3422 100644
--- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
+++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
@@ -48,6 +48,10 @@ import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.monitoring.ApproximateTime;
import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessagingService.ServerChannel;
import org.apache.cassandra.net.async.NettyFactory;
import org.apache.cassandra.net.async.OutboundConnectionIdentifier;
@@ -81,7 +85,7 @@ public class MessagingServiceTest
};
private static IInternodeAuthenticator originalAuthenticator;
private static ServerEncryptionOptions originalServerEncryptionOptions;
- private static InetAddress originalListenAddress;
+ private static InetAddressAndPort originalListenAddress;
private final MessagingService messagingService = MessagingService.test();
@@ -93,7 +97,7 @@ public class MessagingServiceTest
DatabaseDescriptor.setBroadcastAddress(InetAddress.getByName("127.0.0.1"));
originalAuthenticator = DatabaseDescriptor.getInternodeAuthenticator();
originalServerEncryptionOptions = DatabaseDescriptor.getServerEncryptionOptions();
- originalListenAddress = DatabaseDescriptor.getListenAddress();
+ originalListenAddress = InetAddressAndPort.getByAddressOverrideDefaults(DatabaseDescriptor.getListenAddress(), DatabaseDescriptor.getStoragePort());
}
private static int metricScopeId = 0;
@@ -103,8 +107,8 @@ public class MessagingServiceTest
{
messagingService.resetDroppedMessagesMap(Integer.toString(metricScopeId++));
MockBackPressureStrategy.applied = false;
- messagingService.destroyConnectionPool(InetAddress.getByName("127.0.0.2"));
- messagingService.destroyConnectionPool(InetAddress.getByName("127.0.0.3"));
+ messagingService.destroyConnectionPool(InetAddressAndPort.getByName("127.0.0.2"));
+ messagingService.destroyConnectionPool(InetAddressAndPort.getByName("127.0.0.3"));
}
@After
@@ -113,7 +117,7 @@ public class MessagingServiceTest
DatabaseDescriptor.setInternodeAuthenticator(originalAuthenticator);
DatabaseDescriptor.setServerEncryptionOptions(originalServerEncryptionOptions);
DatabaseDescriptor.setShouldListenOnBroadcastAddress(false);
- DatabaseDescriptor.setListenAddress(originalListenAddress);
+ DatabaseDescriptor.setListenAddress(originalListenAddress.address);
FBUtilities.reset();
}
@@ -221,44 +225,44 @@ public class MessagingServiceTest
@Test
public void testUpdatesBackPressureOnSendWhenEnabledAndWithSupportedCallback() throws UnknownHostException
{
- MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getBackPressureState(InetAddress.getByName("127.0.0.2"));
+ MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getBackPressureState(InetAddressAndPort.getByName("127.0.0.2"));
IAsyncCallback bpCallback = new BackPressureCallback();
IAsyncCallback noCallback = new NoBackPressureCallback();
MessageOut<?> ignored = null;
DatabaseDescriptor.setBackPressureEnabled(true);
- messagingService.updateBackPressureOnSend(InetAddress.getByName("127.0.0.2"), noCallback, ignored);
+ messagingService.updateBackPressureOnSend(InetAddressAndPort.getByName("127.0.0.2"), noCallback, ignored);
assertFalse(backPressureState.onSend);
DatabaseDescriptor.setBackPressureEnabled(false);
- messagingService.updateBackPressureOnSend(InetAddress.getByName("127.0.0.2"), bpCallback, ignored);
+ messagingService.updateBackPressureOnSend(InetAddressAndPort.getByName("127.0.0.2"), bpCallback, ignored);
assertFalse(backPressureState.onSend);
DatabaseDescriptor.setBackPressureEnabled(true);
- messagingService.updateBackPressureOnSend(InetAddress.getByName("127.0.0.2"), bpCallback, ignored);
+ messagingService.updateBackPressureOnSend(InetAddressAndPort.getByName("127.0.0.2"), bpCallback, ignored);
assertTrue(backPressureState.onSend);
}
@Test
public void testUpdatesBackPressureOnReceiveWhenEnabledAndWithSupportedCallback() throws UnknownHostException
{
- MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getBackPressureState(InetAddress.getByName("127.0.0.2"));
+ MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getBackPressureState(InetAddressAndPort.getByName("127.0.0.2"));
IAsyncCallback bpCallback = new BackPressureCallback();
IAsyncCallback noCallback = new NoBackPressureCallback();
boolean timeout = false;
DatabaseDescriptor.setBackPressureEnabled(true);
- messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), noCallback, timeout);
+ messagingService.updateBackPressureOnReceive(InetAddressAndPort.getByName("127.0.0.2"), noCallback, timeout);
assertFalse(backPressureState.onReceive);
assertFalse(backPressureState.onTimeout);
DatabaseDescriptor.setBackPressureEnabled(false);
- messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), bpCallback, timeout);
+ messagingService.updateBackPressureOnReceive(InetAddressAndPort.getByName("127.0.0.2"), bpCallback, timeout);
assertFalse(backPressureState.onReceive);
assertFalse(backPressureState.onTimeout);
DatabaseDescriptor.setBackPressureEnabled(true);
- messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), bpCallback, timeout);
+ messagingService.updateBackPressureOnReceive(InetAddressAndPort.getByName("127.0.0.2"), bpCallback, timeout);
assertTrue(backPressureState.onReceive);
assertFalse(backPressureState.onTimeout);
}
@@ -266,23 +270,23 @@ public class MessagingServiceTest
@Test
public void testUpdatesBackPressureOnTimeoutWhenEnabledAndWithSupportedCallback() throws UnknownHostException
{
- MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getBackPressureState(InetAddress.getByName("127.0.0.2"));
+ MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getBackPressureState(InetAddressAndPort.getByName("127.0.0.2"));
IAsyncCallback bpCallback = new BackPressureCallback();
IAsyncCallback noCallback = new NoBackPressureCallback();
boolean timeout = true;
DatabaseDescriptor.setBackPressureEnabled(true);
- messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), noCallback, timeout);
+ messagingService.updateBackPressureOnReceive(InetAddressAndPort.getByName("127.0.0.2"), noCallback, timeout);
assertFalse(backPressureState.onReceive);
assertFalse(backPressureState.onTimeout);
DatabaseDescriptor.setBackPressureEnabled(false);
- messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), bpCallback, timeout);
+ messagingService.updateBackPressureOnReceive(InetAddressAndPort.getByName("127.0.0.2"), bpCallback, timeout);
assertFalse(backPressureState.onReceive);
assertFalse(backPressureState.onTimeout);
DatabaseDescriptor.setBackPressureEnabled(true);
- messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), bpCallback, timeout);
+ messagingService.updateBackPressureOnReceive(InetAddressAndPort.getByName("127.0.0.2"), bpCallback, timeout);
assertFalse(backPressureState.onReceive);
assertTrue(backPressureState.onTimeout);
}
@@ -291,11 +295,11 @@ public class MessagingServiceTest
public void testAppliesBackPressureWhenEnabled() throws UnknownHostException
{
DatabaseDescriptor.setBackPressureEnabled(false);
- messagingService.applyBackPressure(Arrays.asList(InetAddress.getByName("127.0.0.2")), ONE_SECOND);
+ messagingService.applyBackPressure(Arrays.asList(InetAddressAndPort.getByName("127.0.0.2")), ONE_SECOND);
assertFalse(MockBackPressureStrategy.applied);
DatabaseDescriptor.setBackPressureEnabled(true);
- messagingService.applyBackPressure(Arrays.asList(InetAddress.getByName("127.0.0.2")), ONE_SECOND);
+ messagingService.applyBackPressure(Arrays.asList(InetAddressAndPort.getByName("127.0.0.2")), ONE_SECOND);
assertTrue(MockBackPressureStrategy.applied);
}
@@ -303,13 +307,13 @@ public class MessagingServiceTest
public void testDoesntApplyBackPressureToBroadcastAddress() throws UnknownHostException
{
DatabaseDescriptor.setBackPressureEnabled(true);
- messagingService.applyBackPressure(Arrays.asList(InetAddress.getByName("127.0.0.1")), ONE_SECOND);
+ messagingService.applyBackPressure(Arrays.asList(InetAddressAndPort.getByName("127.0.0.1")), ONE_SECOND);
assertFalse(MockBackPressureStrategy.applied);
}
private static void addDCLatency(long sentAt, long nowTime) throws IOException
{
- MessageIn.deriveConstructionTime(InetAddress.getLocalHost(), (int)sentAt, nowTime);
+ MessageIn.deriveConstructionTime(InetAddressAndPort.getLocalHost(), (int)sentAt, nowTime);
}
public static class MockBackPressureStrategy implements BackPressureStrategy<MockBackPressureStrategy.MockBackPressureState>
@@ -328,19 +332,19 @@ public class MessagingServiceTest
}
@Override
- public MockBackPressureState newState(InetAddress host)
+ public MockBackPressureState newState(InetAddressAndPort host)
{
return new MockBackPressureState(host);
}
public static class MockBackPressureState implements BackPressureState
{
- private final InetAddress host;
+ private final InetAddressAndPort host;
public volatile boolean onSend = false;
public volatile boolean onReceive = false;
public volatile boolean onTimeout = false;
- private MockBackPressureState(InetAddress host)
+ private MockBackPressureState(InetAddressAndPort host)
{
this.host = host;
}
@@ -370,7 +374,7 @@ public class MessagingServiceTest
}
@Override
- public InetAddress getHost()
+ public InetAddressAndPort getHost()
{
return host;
}
@@ -429,7 +433,7 @@ public class MessagingServiceTest
{
MessagingService ms = MessagingService.instance();
DatabaseDescriptor.setInternodeAuthenticator(ALLOW_NOTHING_AUTHENTICATOR);
- InetAddress address = InetAddress.getByName("127.0.0.250");
+ InetAddressAndPort address = InetAddressAndPort.getByName("127.0.0.250");
//Should return null
MessageOut messageOut = new MessageOut(MessagingService.Verb.GOSSIP_DIGEST_ACK);
@@ -444,20 +448,20 @@ public class MessagingServiceTest
public void testOutboundMessagingConnectionCleansUp() throws Exception
{
MessagingService ms = MessagingService.instance();
- InetSocketAddress local = new InetSocketAddress("127.0.0.1", 9876);
- InetSocketAddress remote = new InetSocketAddress("127.0.0.2", 9876);
+ InetAddressAndPort local = InetAddressAndPort.getByNameOverrideDefaults("127.0.0.1", 9876);
+ InetAddressAndPort remote = InetAddressAndPort.getByNameOverrideDefaults("127.0.0.2", 9876);
- OutboundMessagingPool pool = new OutboundMessagingPool(remote, local, null, new MockBackPressureStrategy(null).newState(remote.getAddress()), ALLOW_NOTHING_AUTHENTICATOR);
- ms.channelManagers.put(remote.getAddress(), pool);
+ OutboundMessagingPool pool = new OutboundMessagingPool(remote, local, null, new MockBackPressureStrategy(null).newState(remote), ALLOW_NOTHING_AUTHENTICATOR);
+ ms.channelManagers.put(remote, pool);
pool.sendMessage(new MessageOut(MessagingService.Verb.GOSSIP_DIGEST_ACK), 0);
- assertFalse(ms.channelManagers.containsKey(remote.getAddress()));
+ assertFalse(ms.channelManagers.containsKey(remote));
}
@Test
- public void reconnectWithNewIp() throws UnknownHostException
+ public void reconnectWithNewIp() throws Exception
{
- InetAddress publicIp = InetAddress.getByName("127.0.0.2");
- InetAddress privateIp = InetAddress.getByName("127.0.0.3");
+ InetAddressAndPort publicIp = InetAddressAndPort.getByName("127.0.0.2");
+ InetAddressAndPort privateIp = InetAddressAndPort.getByName("127.0.0.3");
// reset the preferred IP value, for good test hygene
SystemKeyspace.updatePreferredIP(publicIp, publicIp);
@@ -485,8 +489,8 @@ public class MessagingServiceTest
Assert.assertEquals(0, serverChannel.size());
// now, create a connection and make sure it's in a channel group
- InetSocketAddress server = new InetSocketAddress(FBUtilities.getBroadcastAddress(), DatabaseDescriptor.getStoragePort());
- OutboundConnectionIdentifier id = OutboundConnectionIdentifier.small(new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 0), server);
+ InetAddressAndPort server = FBUtilities.getBroadcastAddressAndPort();
+ OutboundConnectionIdentifier id = OutboundConnectionIdentifier.small(InetAddressAndPort.getByNameOverrideDefaults("127.0.0.2", 0), server);
CountDownLatch latch = new CountDownLatch(1);
OutboundConnectionParams params = OutboundConnectionParams.builder()
@@ -596,7 +600,7 @@ public class MessagingServiceTest
if (listenOnBroadcastAddr)
{
DatabaseDescriptor.setShouldListenOnBroadcastAddress(true);
- listenAddress = InetAddresses.increment(FBUtilities.getBroadcastAddress());
+ listenAddress = InetAddresses.increment(FBUtilities.getBroadcastAddressAndPort().address);
DatabaseDescriptor.setListenAddress(listenAddress);
FBUtilities.reset();
}
@@ -627,7 +631,7 @@ public class MessagingServiceTest
if (serverEncryptionOptions.enable_legacy_ssl_storage_port)
{
- if (legacySslPort == serverChannel.getAddress().getPort())
+ if (legacySslPort == serverChannel.getAddress().port)
{
foundLegacyListenSslAddress = true;
Assert.assertEquals(ServerChannel.SecurityLevel.REQUIRED, serverChannel.getSecurityLevel());
@@ -646,7 +650,7 @@ public class MessagingServiceTest
int found = 0;
for (ServerChannel serverChannel : messagingService.serverChannels)
{
- if (serverChannel.getAddress().getAddress().equals(listenAddress))
+ if (serverChannel.getAddress().address.equals(listenAddress))
found++;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/MockMessagingService.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MockMessagingService.java b/test/unit/org/apache/cassandra/net/MockMessagingService.java
index 0412759..4ea1bf5 100644
--- a/test/unit/org/apache/cassandra/net/MockMessagingService.java
+++ b/test/unit/org/apache/cassandra/net/MockMessagingService.java
@@ -17,14 +17,15 @@
*/
package org.apache.cassandra.net;
-import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.function.Predicate;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
/**
* Starting point for mocking {@link MessagingService} interactions. Outgoing messages can be
* intercepted by first creating a {@link MatcherResponse} by calling {@link MockMessagingService#when(Matcher)}.
- * Alternatively {@link Matcher}s can be created by using helper methods such as {@link #to(InetAddress)},
+ * Alternatively {@link Matcher}s can be created by using helper methods such as {@link #to(InetAddressAndPort)},
* {@link #verb(MessagingService.Verb)} or {@link #payload(Predicate)} and may also be
* nested using {@link MockMessagingService#all(Matcher[])} or {@link MockMessagingService#any(Matcher[])}.
* After each test, {@link MockMessagingService#cleanup()} must be called for free listeners registered
@@ -58,11 +59,11 @@ public class MockMessagingService
* Creates a matcher that will indicate if the target address of the outgoing message equals the
* provided address.
*/
- public static Matcher<InetAddress> to(String address)
+ public static Matcher<InetAddressAndPort> to(String address)
{
try
{
- return to(InetAddress.getByName(address));
+ return to(InetAddressAndPort.getByName(address));
}
catch (UnknownHostException e)
{
@@ -74,7 +75,7 @@ public class MockMessagingService
* Creates a matcher that will indicate if the target address of the outgoing message equals the
* provided address.
*/
- public static Matcher<InetAddress> to(InetAddress address)
+ public static Matcher<InetAddressAndPort> to(InetAddressAndPort address)
{
return (in, to) -> to == address || to.equals(address);
}
@@ -117,7 +118,7 @@ public class MockMessagingService
*/
public static <T> Matcher<?> all(Matcher<?>... matchers)
{
- return (MessageOut<T> out, InetAddress to) -> {
+ return (MessageOut<T> out, InetAddressAndPort to) -> {
for (Matcher matcher : matchers)
{
if (!matcher.matches(out, to))
@@ -132,7 +133,7 @@ public class MockMessagingService
*/
public static <T> Matcher<?> any(Matcher<?>... matchers)
{
- return (MessageOut<T> out, InetAddress to) -> {
+ return (MessageOut<T> out, InetAddressAndPort to) -> {
for (Matcher matcher : matchers)
{
if (matcher.matches(out, to))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java
index 3f6564e..8d0f91b 100644
--- a/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java
+++ b/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java
@@ -55,7 +55,7 @@ public class MockMessagingServiceTest
public void testRequestResponse() throws InterruptedException, ExecutionException
{
// echo message that we like to mock as incoming reply for outgoing echo message
- MessageIn<EchoMessage> echoMessageIn = MessageIn.create(FBUtilities.getBroadcastAddress(),
+ MessageIn<EchoMessage> echoMessageIn = MessageIn.create(FBUtilities.getBroadcastAddressAndPort(),
EchoMessage.instance,
Collections.emptyMap(),
MessagingService.Verb.ECHO,
@@ -63,14 +63,14 @@ public class MockMessagingServiceTest
MockMessagingSpy spy = MockMessagingService
.when(
all(
- to(FBUtilities.getBroadcastAddress()),
+ to(FBUtilities.getBroadcastAddressAndPort()),
verb(MessagingService.Verb.ECHO)
)
)
.respond(echoMessageIn);
MessageOut<EchoMessage> echoMessageOut = new MessageOut<>(MessagingService.Verb.ECHO, EchoMessage.instance, EchoMessage.serializer);
- MessagingService.instance().sendRR(echoMessageOut, FBUtilities.getBroadcastAddress(), new IAsyncCallback()
+ MessagingService.instance().sendRR(echoMessageOut, FBUtilities.getBroadcastAddressAndPort(), new IAsyncCallback()
{
public void response(MessageIn msg)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java b/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java
index 93fe32e..4d1ae01 100644
--- a/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java
+++ b/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.cassandra.net;
-import java.net.InetAddress;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -29,6 +28,7 @@ import com.google.common.util.concurrent.RateLimiter;
import org.junit.Test;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.utils.TestTimeSource;
import org.apache.cassandra.utils.TimeSource;
@@ -94,17 +94,17 @@ public class RateBasedBackPressureTest
TestTimeSource timeSource = new TestTimeSource();
RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
- RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress());
+ RateBasedBackPressureState state = strategy.newState(InetAddressAndPort.getLoopbackAddress());
state.onMessageSent(null);
assertEquals(0, state.incomingRate.size());
assertEquals(0, state.outgoingRate.size());
- state = strategy.newState(InetAddress.getLoopbackAddress());
+ state = strategy.newState(InetAddressAndPort.getLoopbackAddress());
state.onResponseReceived();
assertEquals(1, state.incomingRate.size());
assertEquals(1, state.outgoingRate.size());
- state = strategy.newState(InetAddress.getLoopbackAddress());
+ state = strategy.newState(InetAddressAndPort.getLoopbackAddress());
state.onResponseTimeout();
assertEquals(0, state.incomingRate.size());
assertEquals(1, state.outgoingRate.size());
@@ -116,7 +116,7 @@ public class RateBasedBackPressureTest
long windowSize = 6000;
TestTimeSource timeSource = new TestTimeSource();
RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
- RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress());
+ RateBasedBackPressureState state = strategy.newState(InetAddressAndPort.getLoopbackAddress());
// Get initial rate:
double initialRate = state.rateLimiter.getRate();
@@ -140,7 +140,7 @@ public class RateBasedBackPressureTest
long windowSize = 6000;
TestTimeSource timeSource = new TestTimeSource();
RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
- RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress());
+ RateBasedBackPressureState state = strategy.newState(InetAddressAndPort.getLoopbackAddress());
// Get initial time:
long current = state.getLastIntervalAcquire();
@@ -174,7 +174,7 @@ public class RateBasedBackPressureTest
long windowSize = 6000;
TestTimeSource timeSource = new TestTimeSource();
RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
- RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress());
+ RateBasedBackPressureState state = strategy.newState(InetAddressAndPort.getLoopbackAddress());
// Update incoming and outgoing rate so that the ratio is 0.5:
state.incomingRate.update(50);
@@ -194,7 +194,7 @@ public class RateBasedBackPressureTest
long windowSize = 6000;
TestTimeSource timeSource = new TestTimeSource();
RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
- RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress());
+ RateBasedBackPressureState state = strategy.newState(InetAddressAndPort.getLoopbackAddress());
// Update incoming and outgoing rate so that the ratio is 0.5:
state.incomingRate.update(50);
@@ -236,9 +236,9 @@ public class RateBasedBackPressureTest
long windowSize = 6000;
TestTimeSource timeSource = new TestTimeSource();
TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
- RateBasedBackPressureState state1 = strategy.newState(InetAddress.getByName("127.0.0.1"));
- RateBasedBackPressureState state2 = strategy.newState(InetAddress.getByName("127.0.0.2"));
- RateBasedBackPressureState state3 = strategy.newState(InetAddress.getByName("127.0.0.3"));
+ RateBasedBackPressureState state1 = strategy.newState(InetAddressAndPort.getByName("127.0.0.1"));
+ RateBasedBackPressureState state2 = strategy.newState(InetAddressAndPort.getByName("127.0.0.2"));
+ RateBasedBackPressureState state3 = strategy.newState(InetAddressAndPort.getByName("127.0.0.3"));
// Update incoming and outgoing rates:
state1.incomingRate.update(50);
@@ -265,9 +265,9 @@ public class RateBasedBackPressureTest
long windowSize = 6000;
TestTimeSource timeSource = new TestTimeSource();
TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "SLOW"), timeSource, windowSize);
- RateBasedBackPressureState state1 = strategy.newState(InetAddress.getByName("127.0.0.1"));
- RateBasedBackPressureState state2 = strategy.newState(InetAddress.getByName("127.0.0.2"));
- RateBasedBackPressureState state3 = strategy.newState(InetAddress.getByName("127.0.0.3"));
+ RateBasedBackPressureState state1 = strategy.newState(InetAddressAndPort.getByName("127.0.0.1"));
+ RateBasedBackPressureState state2 = strategy.newState(InetAddressAndPort.getByName("127.0.0.2"));
+ RateBasedBackPressureState state3 = strategy.newState(InetAddressAndPort.getByName("127.0.0.3"));
// Update incoming and outgoing rates:
state1.incomingRate.update(50);
@@ -294,10 +294,10 @@ public class RateBasedBackPressureTest
long windowSize = 6000;
TestTimeSource timeSource = new TestTimeSource();
TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "SLOW"), timeSource, windowSize);
- RateBasedBackPressureState state1 = strategy.newState(InetAddress.getByName("127.0.0.1"));
- RateBasedBackPressureState state2 = strategy.newState(InetAddress.getByName("127.0.0.2"));
- RateBasedBackPressureState state3 = strategy.newState(InetAddress.getByName("127.0.0.3"));
- RateBasedBackPressureState state4 = strategy.newState(InetAddress.getByName("127.0.0.4"));
+ RateBasedBackPressureState state1 = strategy.newState(InetAddressAndPort.getByName("127.0.0.1"));
+ RateBasedBackPressureState state2 = strategy.newState(InetAddressAndPort.getByName("127.0.0.2"));
+ RateBasedBackPressureState state3 = strategy.newState(InetAddressAndPort.getByName("127.0.0.3"));
+ RateBasedBackPressureState state4 = strategy.newState(InetAddressAndPort.getByName("127.0.0.4"));
// Update incoming and outgoing rates:
state1.incomingRate.update(50); // this
@@ -333,9 +333,9 @@ public class RateBasedBackPressureTest
long windowSize = 10000;
TestTimeSource timeSource = new TestTimeSource();
TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "SLOW"), timeSource, windowSize);
- RateBasedBackPressureState state1 = strategy.newState(InetAddress.getByName("127.0.0.1"));
- RateBasedBackPressureState state2 = strategy.newState(InetAddress.getByName("127.0.0.2"));
- RateBasedBackPressureState state3 = strategy.newState(InetAddress.getByName("127.0.0.3"));
+ RateBasedBackPressureState state1 = strategy.newState(InetAddressAndPort.getByName("127.0.0.1"));
+ RateBasedBackPressureState state2 = strategy.newState(InetAddressAndPort.getByName("127.0.0.2"));
+ RateBasedBackPressureState state3 = strategy.newState(InetAddressAndPort.getByName("127.0.0.3"));
// Update incoming and outgoing rates:
state1.incomingRate.update(5); // slow
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java b/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
index 2d12baf..c8469a8 100644
--- a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
+++ b/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.cassandra.net;
-import java.net.InetAddress;
import java.util.UUID;
import org.junit.BeforeClass;
@@ -32,6 +31,7 @@ import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessagingService.Verb;
import org.apache.cassandra.schema.MockSchema;
import org.apache.cassandra.schema.TableMetadata;
@@ -65,7 +65,7 @@ public class WriteCallbackInfoTest
? new Commit(UUID.randomUUID(), new PartitionUpdate.Builder(metadata, ByteBufferUtil.EMPTY_BYTE_BUFFER, RegularAndStaticColumns.NONE, 1).build())
: new Mutation(PartitionUpdate.simpleBuilder(metadata, "").build());
- WriteCallbackInfo wcbi = new WriteCallbackInfo(InetAddress.getByName("192.168.1.1"), null, new MessageOut(verb, payload, null), null, cl, allowHints);
+ WriteCallbackInfo wcbi = new WriteCallbackInfo(InetAddressAndPort.getByName("192.168.1.1"), null, new MessageOut(verb, payload, null), null, cl, allowHints);
Assert.assertEquals(expectHint, wcbi.shouldHint());
if (expectHint)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/async/ChannelWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/ChannelWriterTest.java b/test/unit/org/apache/cassandra/net/async/ChannelWriterTest.java
index 128fe4b..0211512 100644
--- a/test/unit/org/apache/cassandra/net/async/ChannelWriterTest.java
+++ b/test/unit/org/apache/cassandra/net/async/ChannelWriterTest.java
@@ -24,6 +24,7 @@ import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import com.google.common.net.InetAddresses;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -37,6 +38,7 @@ import io.netty.channel.ChannelPromise;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.embedded.EmbeddedChannel;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.async.ChannelWriter.CoalescingChannelWriter;
@@ -68,8 +70,8 @@ public class ChannelWriterTest
@Before
public void setup()
{
- OutboundConnectionIdentifier id = OutboundConnectionIdentifier.small(new InetSocketAddress("127.0.0.1", 0),
- new InetSocketAddress("127.0.0.2", 0));
+ OutboundConnectionIdentifier id = OutboundConnectionIdentifier.small(InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 0),
+ InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 0));
channel = new EmbeddedChannel();
omc = new NonSendingOutboundMessagingConnection(id, null, Optional.empty());
channelWriter = ChannelWriter.create(channel, omc::handleMessageResult, Optional.empty());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/async/HandshakeHandlersTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/HandshakeHandlersTest.java b/test/unit/org/apache/cassandra/net/async/HandshakeHandlersTest.java
index 100e1e0..f92ce5a 100644
--- a/test/unit/org/apache/cassandra/net/async/HandshakeHandlersTest.java
+++ b/test/unit/org/apache/cassandra/net/async/HandshakeHandlersTest.java
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Optional;
+import com.google.common.net.InetAddresses;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -38,6 +39,7 @@ import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.KeyspaceParams;
@@ -50,8 +52,8 @@ public class HandshakeHandlersTest
private static final String KEYSPACE1 = "NettyPipilineTest";
private static final String STANDARD1 = "Standard1";
- private static final InetSocketAddress LOCAL_ADDR = new InetSocketAddress("127.0.0.1", 9999);
- private static final InetSocketAddress REMOTE_ADDR = new InetSocketAddress("127.0.0.2", 9999);
+ private static final InetAddressAndPort LOCAL_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.1"), 9999);
+ private static final InetAddressAndPort REMOTE_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 9999);
private static final int MESSAGING_VERSION = MessagingService.current_version;
private static final OutboundConnectionIdentifier connectionId = OutboundConnectionIdentifier.small(LOCAL_ADDR, REMOTE_ADDR);
@@ -179,7 +181,7 @@ public class HandshakeHandlersTest
InboundHandshakeHandler handler = new InboundHandshakeHandler(new TestAuthenticator(true));
EmbeddedChannel inboundChannel = new EmbeddedChannel(handler);
- handler.setupMessagingPipeline(inboundChannel.pipeline(), REMOTE_ADDR.getAddress(), compress, MESSAGING_VERSION);
+ handler.setupMessagingPipeline(inboundChannel.pipeline(), REMOTE_ADDR, compress, MESSAGING_VERSION);
return new TestChannels(outboundChannel, inboundChannel);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/net/async/HandshakeProtocolTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/HandshakeProtocolTest.java b/test/unit/org/apache/cassandra/net/async/HandshakeProtocolTest.java
index a3d646d..af48636 100644
--- a/test/unit/org/apache/cassandra/net/async/HandshakeProtocolTest.java
+++ b/test/unit/org/apache/cassandra/net/async/HandshakeProtocolTest.java
@@ -85,7 +85,7 @@ public class HandshakeProtocolTest
@Test
public void thirdMessageTest() throws Exception
{
- ThirdHandshakeMessage before = new ThirdHandshakeMessage(MessagingService.current_version, FBUtilities.getBroadcastAddress());
+ ThirdHandshakeMessage before = new ThirdHandshakeMessage(MessagingService.current_version, FBUtilities.getBroadcastAddressAndPort());
buf = before.encode(PooledByteBufAllocator.DEFAULT);
ThirdHandshakeMessage after = ThirdHandshakeMessage.maybeDecode(buf);
assertEquals(before, after);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org