You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/01/25 20:12:12 UTC
[15/19] cassandra git commit: Allow storage port to be configurable
per node
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/PendingRangeMaps.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/PendingRangeMaps.java b/src/java/org/apache/cassandra/locator/PendingRangeMaps.java
index cfeccc4..92307a3 100644
--- a/src/java/org/apache/cassandra/locator/PendingRangeMaps.java
+++ b/src/java/org/apache/cassandra/locator/PendingRangeMaps.java
@@ -26,10 +26,9 @@ import org.apache.cassandra.dht.Token;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.InetAddress;
import java.util.*;
-public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<InetAddress>>>
+public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<InetAddressAndPort>>>
{
private static final Logger logger = LoggerFactory.getLogger(PendingRangeMaps.class);
@@ -39,7 +38,7 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I
* First two are for non-wrap-around ranges, and the last two are for wrap-around ranges.
*/
// ascendingMap will sort the ranges by the ascending order of right token
- final NavigableMap<Range<Token>, List<InetAddress>> ascendingMap;
+ final NavigableMap<Range<Token>, List<InetAddressAndPort>> ascendingMap;
/**
* sorting end ascending, if ends are same, sorting begin descending, so that token (end, end) will
* come before (begin, end] with the same end, and (begin, end) will be selected in the tailMap.
@@ -58,7 +57,7 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I
};
// ascendingMap will sort the ranges by the descending order of left token
- final NavigableMap<Range<Token>, List<InetAddress>> descendingMap;
+ final NavigableMap<Range<Token>, List<InetAddressAndPort>> descendingMap;
/**
* sorting begin descending, if begins are same, sorting end descending, so that token (begin, begin) will
* come after (begin, end] with the same begin, and (begin, end) won't be selected in the tailMap.
@@ -78,7 +77,7 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I
};
// these two maps are for warp around ranges.
- final NavigableMap<Range<Token>, List<InetAddress>> ascendingMapForWrapAround;
+ final NavigableMap<Range<Token>, List<InetAddressAndPort>> ascendingMapForWrapAround;
/**
* for wrap around range (begin, end], which begin > end.
* Sorting end ascending, if ends are same, sorting begin ascending,
@@ -98,7 +97,7 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I
}
};
- final NavigableMap<Range<Token>, List<InetAddress>> descendingMapForWrapAround;
+ final NavigableMap<Range<Token>, List<InetAddressAndPort>> descendingMapForWrapAround;
/**
* for wrap around ranges, which begin > end.
* Sorting end ascending, so that token (begin, begin) will come after (begin, end] with the same begin,
@@ -118,28 +117,28 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I
public PendingRangeMaps()
{
- this.ascendingMap = new TreeMap<Range<Token>, List<InetAddress>>(ascendingComparator);
- this.descendingMap = new TreeMap<Range<Token>, List<InetAddress>>(descendingComparator);
- this.ascendingMapForWrapAround = new TreeMap<Range<Token>, List<InetAddress>>(ascendingComparatorForWrapAround);
- this.descendingMapForWrapAround = new TreeMap<Range<Token>, List<InetAddress>>(descendingComparatorForWrapAround);
+ this.ascendingMap = new TreeMap<Range<Token>, List<InetAddressAndPort>>(ascendingComparator);
+ this.descendingMap = new TreeMap<Range<Token>, List<InetAddressAndPort>>(descendingComparator);
+ this.ascendingMapForWrapAround = new TreeMap<Range<Token>, List<InetAddressAndPort>>(ascendingComparatorForWrapAround);
+ this.descendingMapForWrapAround = new TreeMap<Range<Token>, List<InetAddressAndPort>>(descendingComparatorForWrapAround);
}
static final void addToMap(Range<Token> range,
- InetAddress address,
- NavigableMap<Range<Token>, List<InetAddress>> ascendingMap,
- NavigableMap<Range<Token>, List<InetAddress>> descendingMap)
+ InetAddressAndPort address,
+ NavigableMap<Range<Token>, List<InetAddressAndPort>> ascendingMap,
+ NavigableMap<Range<Token>, List<InetAddressAndPort>> descendingMap)
{
- List<InetAddress> addresses = ascendingMap.get(range);
+ List<InetAddressAndPort> addresses = ascendingMap.get(range);
if (addresses == null)
{
- addresses = new ArrayList<InetAddress>(1);
+ addresses = new ArrayList<>(1);
ascendingMap.put(range, addresses);
descendingMap.put(range, addresses);
}
addresses.add(address);
}
- public void addPendingRange(Range<Token> range, InetAddress address)
+ public void addPendingRange(Range<Token> range, InetAddressAndPort address)
{
if (Range.isWrapAround(range.left, range.right))
{
@@ -151,14 +150,14 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I
}
}
- static final void addIntersections(Set<InetAddress> endpointsToAdd,
- NavigableMap<Range<Token>, List<InetAddress>> smallerMap,
- NavigableMap<Range<Token>, List<InetAddress>> biggerMap)
+ static final void addIntersections(Set<InetAddressAndPort> endpointsToAdd,
+ NavigableMap<Range<Token>, List<InetAddressAndPort>> smallerMap,
+ NavigableMap<Range<Token>, List<InetAddressAndPort>> biggerMap)
{
// find the intersection of two sets
for (Range<Token> range : smallerMap.keySet())
{
- List<InetAddress> addresses = biggerMap.get(range);
+ List<InetAddressAndPort> addresses = biggerMap.get(range);
if (addresses != null)
{
endpointsToAdd.addAll(addresses);
@@ -166,15 +165,15 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I
}
}
- public Collection<InetAddress> pendingEndpointsFor(Token token)
+ public Collection<InetAddressAndPort> pendingEndpointsFor(Token token)
{
- Set<InetAddress> endpoints = new HashSet<>();
+ Set<InetAddressAndPort> endpoints = new HashSet<>();
Range searchRange = new Range(token, token);
// search for non-wrap-around maps
- NavigableMap<Range<Token>, List<InetAddress>> ascendingTailMap = ascendingMap.tailMap(searchRange, true);
- NavigableMap<Range<Token>, List<InetAddress>> descendingTailMap = descendingMap.tailMap(searchRange, false);
+ NavigableMap<Range<Token>, List<InetAddressAndPort>> ascendingTailMap = ascendingMap.tailMap(searchRange, true);
+ NavigableMap<Range<Token>, List<InetAddressAndPort>> descendingTailMap = descendingMap.tailMap(searchRange, false);
// add intersections of two maps
if (ascendingTailMap.size() < descendingTailMap.size())
@@ -191,11 +190,11 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I
descendingTailMap = descendingMapForWrapAround.tailMap(searchRange, false);
// add them since they are all necessary.
- for (Map.Entry<Range<Token>, List<InetAddress>> entry : ascendingTailMap.entrySet())
+ for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : ascendingTailMap.entrySet())
{
endpoints.addAll(entry.getValue());
}
- for (Map.Entry<Range<Token>, List<InetAddress>> entry : descendingTailMap.entrySet())
+ for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : descendingTailMap.entrySet())
{
endpoints.addAll(entry.getValue());
}
@@ -207,11 +206,11 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I
{
StringBuilder sb = new StringBuilder();
- for (Map.Entry<Range<Token>, List<InetAddress>> entry : this)
+ for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : this)
{
Range<Token> range = entry.getKey();
- for (InetAddress address : entry.getValue())
+ for (InetAddressAndPort address : entry.getValue())
{
sb.append(address).append(':').append(range);
sb.append(System.getProperty("line.separator"));
@@ -222,7 +221,7 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I
}
@Override
- public Iterator<Map.Entry<Range<Token>, List<InetAddress>>> iterator()
+ public Iterator<Map.Entry<Range<Token>, List<InetAddressAndPort>>> iterator()
{
return Iterators.concat(ascendingMap.entrySet().iterator(), ascendingMapForWrapAround.entrySet().iterator());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java b/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
index 2908976..3a9b161 100644
--- a/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
+++ b/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.locator;
import java.io.InputStream;
-import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.HashMap;
@@ -55,7 +54,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
public static final String SNITCH_PROPERTIES_FILENAME = "cassandra-topology.properties";
private static final int DEFAULT_REFRESH_PERIOD_IN_SECONDS = 5;
- private static volatile Map<InetAddress, String[]> endpointMap;
+ private static volatile Map<InetAddressAndPort, String[]> endpointMap;
private static volatile String[] defaultDCRack;
private volatile boolean gossipStarted;
@@ -93,7 +92,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
* @param endpoint endpoint to process
* @return a array of string with the first index being the data center and the second being the rack
*/
- public static String[] getEndpointInfo(InetAddress endpoint)
+ public static String[] getEndpointInfo(InetAddressAndPort endpoint)
{
String[] rawEndpointInfo = getRawEndpointInfo(endpoint);
if (rawEndpointInfo == null)
@@ -101,7 +100,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
return rawEndpointInfo;
}
- private static String[] getRawEndpointInfo(InetAddress endpoint)
+ private static String[] getRawEndpointInfo(InetAddressAndPort endpoint)
{
String[] value = endpointMap.get(endpoint);
if (value == null)
@@ -118,7 +117,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
* @param endpoint the endpoint to process
* @return string of data center
*/
- public String getDatacenter(InetAddress endpoint)
+ public String getDatacenter(InetAddressAndPort endpoint)
{
String[] info = getEndpointInfo(endpoint);
assert info != null : "No location defined for endpoint " + endpoint;
@@ -131,7 +130,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
* @param endpoint the endpoint to process
* @return string of rack
*/
- public String getRack(InetAddress endpoint)
+ public String getRack(InetAddressAndPort endpoint)
{
String[] info = getEndpointInfo(endpoint);
assert info != null : "No location defined for endpoint " + endpoint;
@@ -140,7 +139,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
public void reloadConfiguration(boolean isUpdate) throws ConfigurationException
{
- HashMap<InetAddress, String[]> reloadedMap = new HashMap<>();
+ HashMap<InetAddressAndPort, String[]> reloadedMap = new HashMap<>();
String[] reloadedDefaultDCRack = null;
Properties properties = new Properties();
@@ -168,11 +167,11 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
}
else
{
- InetAddress host;
+ InetAddressAndPort host;
String hostString = StringUtils.remove(key, '/');
try
{
- host = InetAddress.getByName(hostString);
+ host = InetAddressAndPort.getByName(hostString);
}
catch (UnknownHostException e)
{
@@ -186,7 +185,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
reloadedMap.put(host, token);
}
}
- InetAddress broadcastAddress = FBUtilities.getBroadcastAddress();
+ InetAddressAndPort broadcastAddress = FBUtilities.getBroadcastAddressAndPort();
String[] localInfo = reloadedMap.get(broadcastAddress);
if (reloadedDefaultDCRack == null && localInfo == null)
throw new ConfigurationException(String.format("Snitch definitions at %s do not define a location for " +
@@ -194,7 +193,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
SNITCH_PROPERTIES_FILENAME, broadcastAddress));
// internode messaging code converts our broadcast address to local,
// make sure we can be found at that as well.
- InetAddress localAddress = FBUtilities.getLocalAddress();
+ InetAddressAndPort localAddress = FBUtilities.getLocalAddressAndPort();
if (!localAddress.equals(broadcastAddress) && !reloadedMap.containsKey(localAddress))
reloadedMap.put(localAddress, localInfo);
@@ -204,7 +203,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
if (logger.isTraceEnabled())
{
StringBuilder sb = new StringBuilder();
- for (Map.Entry<InetAddress, String[]> entry : reloadedMap.entrySet())
+ for (Map.Entry<InetAddressAndPort, String[]> entry : reloadedMap.entrySet())
sb.append(entry.getKey()).append(':').append(Arrays.toString(entry.getValue())).append(", ");
logger.trace("Loaded network topology from property file: {}", StringUtils.removeEnd(sb.toString(), ", "));
}
@@ -231,17 +230,17 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
* @param reloadedDefaultDCRack - the default dc:rack or null if no default
* @return true if we can continue updating (no live host had dc or rack updated)
*/
- private static boolean livenessCheck(HashMap<InetAddress, String[]> reloadedMap, String[] reloadedDefaultDCRack)
+ private static boolean livenessCheck(HashMap<InetAddressAndPort, String[]> reloadedMap, String[] reloadedDefaultDCRack)
{
// If the default has changed we must check all live hosts but hopefully we will find a live
// host quickly and interrupt the loop. Otherwise we only check the live hosts that were either
// in the old set or in the new set
- Set<InetAddress> hosts = Arrays.equals(defaultDCRack, reloadedDefaultDCRack)
+ Set<InetAddressAndPort> hosts = Arrays.equals(defaultDCRack, reloadedDefaultDCRack)
? Sets.intersection(StorageService.instance.getLiveRingMembers(), // same default
Sets.union(endpointMap.keySet(), reloadedMap.keySet()))
: StorageService.instance.getLiveRingMembers(); // default updated
- for (InetAddress host : hosts)
+ for (InetAddressAndPort host : hosts)
{
String[] origValue = endpointMap.containsKey(host) ? endpointMap.get(host) : defaultDCRack;
String[] updateValue = reloadedMap.containsKey(host) ? reloadedMap.get(host) : reloadedDefaultDCRack;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/RackInferringSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/RackInferringSnitch.java b/src/java/org/apache/cassandra/locator/RackInferringSnitch.java
index a6ea1ab..6ae10cc 100644
--- a/src/java/org/apache/cassandra/locator/RackInferringSnitch.java
+++ b/src/java/org/apache/cassandra/locator/RackInferringSnitch.java
@@ -17,21 +17,19 @@
*/
package org.apache.cassandra.locator;
-import java.net.InetAddress;
-
/**
* A simple endpoint snitch implementation that assumes datacenter and rack information is encoded
* in the 2nd and 3rd octets of the ip address, respectively.
*/
public class RackInferringSnitch extends AbstractNetworkTopologySnitch
{
- public String getRack(InetAddress endpoint)
+ public String getRack(InetAddressAndPort endpoint)
{
- return Integer.toString(endpoint.getAddress()[2] & 0xFF, 10);
+ return Integer.toString(endpoint.address.getAddress()[2] & 0xFF, 10);
}
- public String getDatacenter(InetAddress endpoint)
+ public String getDatacenter(InetAddressAndPort endpoint)
{
- return Integer.toString(endpoint.getAddress()[1] & 0xFF, 10);
+ return Integer.toString(endpoint.address.getAddress()[1] & 0xFF, 10);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
index 0b344c9..5479010 100644
--- a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
+++ b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.locator;
-import java.net.InetAddress;
import java.net.UnknownHostException;
import com.google.common.annotations.VisibleForTesting;
@@ -49,11 +48,11 @@ public class ReconnectableSnitchHelper implements IEndpointStateChangeSubscriber
this.preferLocal = preferLocal;
}
- private void reconnect(InetAddress publicAddress, VersionedValue localAddressValue)
+ private void reconnect(InetAddressAndPort publicAddress, VersionedValue localAddressValue)
{
try
{
- reconnect(publicAddress, InetAddress.getByName(localAddressValue.value), snitch, localDc);
+ reconnect(publicAddress, InetAddressAndPort.getByName(localAddressValue.value), snitch, localDc);
}
catch (UnknownHostException e)
{
@@ -62,9 +61,9 @@ public class ReconnectableSnitchHelper implements IEndpointStateChangeSubscriber
}
@VisibleForTesting
- static void reconnect(InetAddress publicAddress, InetAddress localAddress, IEndpointSnitch snitch, String localDc)
+ static void reconnect(InetAddressAndPort publicAddress, InetAddressAndPort localAddress, IEndpointSnitch snitch, String localDc)
{
- if (!DatabaseDescriptor.getInternodeAuthenticator().authenticate(publicAddress, MessagingService.instance().portFor(publicAddress)))
+ if (!DatabaseDescriptor.getInternodeAuthenticator().authenticate(publicAddress.address, MessagingService.instance().portFor(publicAddress)))
{
logger.debug("InternodeAuthenticator said don't reconnect to {} on {}", publicAddress, localAddress);
return;
@@ -78,40 +77,65 @@ public class ReconnectableSnitchHelper implements IEndpointStateChangeSubscriber
}
}
- public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue)
+ public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue)
{
// no-op
}
- public void onJoin(InetAddress endpoint, EndpointState epState)
+ public void onJoin(InetAddressAndPort endpoint, EndpointState epState)
{
- if (preferLocal && !Gossiper.instance.isDeadState(epState) && epState.getApplicationState(ApplicationState.INTERNAL_IP) != null)
- reconnect(endpoint, epState.getApplicationState(ApplicationState.INTERNAL_IP));
+ if (preferLocal && !Gossiper.instance.isDeadState(epState))
+ {
+ VersionedValue address = epState.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT);
+ if (address == null)
+ {
+ address = epState.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT);
+ }
+ if (address != null)
+ {
+ reconnect(endpoint, address);
+ }
+ }
}
- public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
+ //Skeptical this will always do the right thing all the time port wise. It will converge on the right thing
+ //eventually once INTERNAL_ADDRESS_AND_PORT is populated
+ public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value)
{
- if (preferLocal && state == ApplicationState.INTERNAL_IP && !Gossiper.instance.isDeadState(Gossiper.instance.getEndpointStateForEndpoint(endpoint)))
- reconnect(endpoint, value);
+ if (preferLocal && !Gossiper.instance.isDeadState(Gossiper.instance.getEndpointStateForEndpoint(endpoint)))
+ {
+ if (state == ApplicationState.INTERNAL_ADDRESS_AND_PORT)
+ {
+ reconnect(endpoint, value);
+ }
+ else if (state == ApplicationState.INTERNAL_IP &&
+ null == Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT))
+ {
+ //Only use INTERNAL_IP if INTERNAL_ADDRESS_AND_PORT is unavailable
+ reconnect(endpoint, value);
+ }
+ }
}
- public void onAlive(InetAddress endpoint, EndpointState state)
+ public void onAlive(InetAddressAndPort endpoint, EndpointState state)
{
- if (preferLocal && state.getApplicationState(ApplicationState.INTERNAL_IP) != null)
- reconnect(endpoint, state.getApplicationState(ApplicationState.INTERNAL_IP));
+ VersionedValue internalIP = state.getApplicationState(ApplicationState.INTERNAL_IP);
+ VersionedValue internalIPAndPorts = state.getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT);
+ if (preferLocal && internalIP != null)
+ reconnect(endpoint, internalIPAndPorts != null ? internalIPAndPorts : internalIP);
}
- public void onDead(InetAddress endpoint, EndpointState state)
+ public void onDead(InetAddressAndPort endpoint, EndpointState state)
{
// do nothing.
}
- public void onRemove(InetAddress endpoint)
+ public void onRemove(InetAddressAndPort endpoint)
{
// do nothing.
}
- public void onRestart(InetAddress endpoint, EndpointState state)
+ public void onRestart(InetAddressAndPort endpoint, EndpointState state)
{
// do nothing.
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/SeedProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/SeedProvider.java b/src/java/org/apache/cassandra/locator/SeedProvider.java
index a013fbb..7efa9e0 100644
--- a/src/java/org/apache/cassandra/locator/SeedProvider.java
+++ b/src/java/org/apache/cassandra/locator/SeedProvider.java
@@ -17,10 +17,9 @@
*/
package org.apache.cassandra.locator;
-import java.net.InetAddress;
import java.util.List;
public interface SeedProvider
{
- List<InetAddress> getSeeds();
+ List<InetAddressAndPort> getSeeds();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/SimpleSeedProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/SimpleSeedProvider.java b/src/java/org/apache/cassandra/locator/SimpleSeedProvider.java
index 665261d..47401a0 100644
--- a/src/java/org/apache/cassandra/locator/SimpleSeedProvider.java
+++ b/src/java/org/apache/cassandra/locator/SimpleSeedProvider.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.locator;
-import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
@@ -26,6 +25,7 @@ import java.util.Map;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,7 +35,7 @@ public class SimpleSeedProvider implements SeedProvider
public SimpleSeedProvider(Map<String, String> args) {}
- public List<InetAddress> getSeeds()
+ public List<InetAddressAndPort> getSeeds()
{
Config conf;
try
@@ -47,12 +47,12 @@ public class SimpleSeedProvider implements SeedProvider
throw new AssertionError(e);
}
String[] hosts = conf.seed_provider.parameters.get("seeds").split(",", -1);
- List<InetAddress> seeds = new ArrayList<InetAddress>(hosts.length);
+ List<InetAddressAndPort> seeds = new ArrayList<>(hosts.length);
for (String host : hosts)
{
try
{
- seeds.add(InetAddress.getByName(host.trim()));
+ seeds.add(InetAddressAndPort.getByName(host.trim()));
}
catch (UnknownHostException ex)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/SimpleSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/SimpleSnitch.java b/src/java/org/apache/cassandra/locator/SimpleSnitch.java
index 27648c8..e31fc6b 100644
--- a/src/java/org/apache/cassandra/locator/SimpleSnitch.java
+++ b/src/java/org/apache/cassandra/locator/SimpleSnitch.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.locator;
-import java.net.InetAddress;
import java.util.List;
/**
@@ -27,23 +26,23 @@ import java.util.List;
*/
public class SimpleSnitch extends AbstractEndpointSnitch
{
- public String getRack(InetAddress endpoint)
+ public String getRack(InetAddressAndPort endpoint)
{
return "rack1";
}
- public String getDatacenter(InetAddress endpoint)
+ public String getDatacenter(InetAddressAndPort endpoint)
{
return "datacenter1";
}
@Override
- public void sortByProximity(final InetAddress address, List<InetAddress> addresses)
+ public void sortByProximity(final InetAddressAndPort address, List<InetAddressAndPort> addresses)
{
// Optimization to avoid walking the list
}
- public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+ public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2)
{
// Making all endpoints equal ensures we won't change the original ordering (since
// Collections.sort is guaranteed to be stable)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/SimpleStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/SimpleStrategy.java b/src/java/org/apache/cassandra/locator/SimpleStrategy.java
index 9a5062b..545ad28 100644
--- a/src/java/org/apache/cassandra/locator/SimpleStrategy.java
+++ b/src/java/org/apache/cassandra/locator/SimpleStrategy.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.locator;
-import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Collection;
@@ -42,11 +41,11 @@ public class SimpleStrategy extends AbstractReplicationStrategy
super(keyspaceName, tokenMetadata, snitch, configOptions);
}
- public List<InetAddress> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
+ public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
{
int replicas = getReplicationFactor();
ArrayList<Token> tokens = metadata.sortedTokens();
- List<InetAddress> endpoints = new ArrayList<InetAddress>(replicas);
+ List<InetAddressAndPort> endpoints = new ArrayList<InetAddressAndPort>(replicas);
if (tokens.isEmpty())
return endpoints;
@@ -55,7 +54,7 @@ public class SimpleStrategy extends AbstractReplicationStrategy
Iterator<Token> iter = TokenMetadata.ringIterator(tokens, token, false);
while (endpoints.size() < replicas && iter.hasNext())
{
- InetAddress ep = metadata.getEndpoint(iter.next());
+ InetAddressAndPort ep = metadata.getEndpoint(iter.next());
if (!endpoints.contains(ep))
endpoints.add(ep);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 00f9536..e2c4628 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.locator;
-import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@@ -52,12 +51,12 @@ public class TokenMetadata
* Each Token is associated with exactly one Address, but each Address may have
* multiple tokens. Hence, the BiMultiValMap collection.
*/
- private final BiMultiValMap<Token, InetAddress> tokenToEndpointMap;
+ private final BiMultiValMap<Token, InetAddressAndPort> tokenToEndpointMap;
/** Maintains endpoint to host ID map of every node in the cluster */
- private final BiMap<InetAddress, UUID> endpointToHostIdMap;
+ private final BiMap<InetAddressAndPort, UUID> endpointToHostIdMap;
- // Prior to CASSANDRA-603, we just had <tt>Map<Range, InetAddress> pendingRanges<tt>,
+ // Prior to CASSANDRA-603, we just had <tt>Map<Range, InetAddressAndPort> pendingRanges<tt>,
// which was added to when a node began bootstrap and removed from when it finished.
//
// This is inadequate when multiple changes are allowed simultaneously. For example,
@@ -70,8 +69,8 @@ public class TokenMetadata
//
// So, we made two changes:
//
- // First, we changed pendingRanges to a <tt>Multimap<Range, InetAddress></tt> (now
- // <tt>Map<String, Multimap<Range, InetAddress>></tt>, because replication strategy
+ // First, we changed pendingRanges to a <tt>Multimap<Range, InetAddressAndPort></tt> (now
+ // <tt>Map<String, Multimap<Range, InetAddressAndPort>></tt>, because replication strategy
// and options are per-KeySpace).
//
// Second, we added the bootstrapTokens and leavingEndpoints collections, so we can
@@ -81,17 +80,17 @@ public class TokenMetadata
// Finally, note that recording the tokens of joining nodes in bootstrapTokens also
// means we can detect and reject the addition of multiple nodes at the same token
// before one becomes part of the ring.
- private final BiMultiValMap<Token, InetAddress> bootstrapTokens = new BiMultiValMap<>();
+ private final BiMultiValMap<Token, InetAddressAndPort> bootstrapTokens = new BiMultiValMap<>();
- private final BiMap<InetAddress, InetAddress> replacementToOriginal = HashBiMap.create();
+ private final BiMap<InetAddressAndPort, InetAddressAndPort> replacementToOriginal = HashBiMap.create();
// (don't need to record Token here since it's still part of tokenToEndpointMap until it's done leaving)
- private final Set<InetAddress> leavingEndpoints = new HashSet<>();
+ private final Set<InetAddressAndPort> leavingEndpoints = new HashSet<>();
// this is a cache of the calculation from {tokenToEndpointMap, bootstrapTokens, leavingEndpoints}
private final ConcurrentMap<String, PendingRangeMaps> pendingRanges = new ConcurrentHashMap<String, PendingRangeMaps>();
// nodes which are migrating to the new tokens in the ring
- private final Set<Pair<Token, InetAddress>> movingEndpoints = new HashSet<>();
+ private final Set<Pair<Token, InetAddressAndPort>> movingEndpoints = new HashSet<>();
/* Use this lock for manipulating the token map */
private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
@@ -100,26 +99,18 @@ public class TokenMetadata
private final Topology topology;
public final IPartitioner partitioner;
- private static final Comparator<InetAddress> inetaddressCmp = new Comparator<InetAddress>()
- {
- public int compare(InetAddress o1, InetAddress o2)
- {
- return ByteBuffer.wrap(o1.getAddress()).compareTo(ByteBuffer.wrap(o2.getAddress()));
- }
- };
-
// signals replication strategies that nodes have joined or left the ring and they need to recompute ownership
private volatile long ringVersion = 0;
public TokenMetadata()
{
- this(SortedBiMultiValMap.<Token, InetAddress>create(null, inetaddressCmp),
- HashBiMap.<InetAddress, UUID>create(),
+ this(SortedBiMultiValMap.<Token, InetAddressAndPort>create(),
+ HashBiMap.create(),
new Topology(),
DatabaseDescriptor.getPartitioner());
}
- private TokenMetadata(BiMultiValMap<Token, InetAddress> tokenToEndpointMap, BiMap<InetAddress, UUID> endpointsMap, Topology topology, IPartitioner partitioner)
+ private TokenMetadata(BiMultiValMap<Token, InetAddressAndPort> tokenToEndpointMap, BiMap<InetAddressAndPort, UUID> endpointsMap, Topology topology, IPartitioner partitioner)
{
this.tokenToEndpointMap = tokenToEndpointMap;
this.topology = topology;
@@ -143,7 +134,7 @@ public class TokenMetadata
}
/** @return the number of nodes bootstrapping into source's primary range */
- public int pendingRangeChanges(InetAddress source)
+ public int pendingRangeChanges(InetAddressAndPort source)
{
int n = 0;
Collection<Range<Token>> sourceRanges = getPrimaryRangesFor(getTokens(source));
@@ -165,14 +156,14 @@ public class TokenMetadata
/**
* Update token map with a single token/endpoint pair in normal state.
*/
- public void updateNormalToken(Token token, InetAddress endpoint)
+ public void updateNormalToken(Token token, InetAddressAndPort endpoint)
{
updateNormalTokens(Collections.singleton(token), endpoint);
}
- public void updateNormalTokens(Collection<Token> tokens, InetAddress endpoint)
+ public void updateNormalTokens(Collection<Token> tokens, InetAddressAndPort endpoint)
{
- Multimap<InetAddress, Token> endpointTokens = HashMultimap.create();
+ Multimap<InetAddressAndPort, Token> endpointTokens = HashMultimap.create();
for (Token token : tokens)
endpointTokens.put(endpoint, token);
updateNormalTokens(endpointTokens);
@@ -184,7 +175,7 @@ public class TokenMetadata
* Prefer this whenever there are multiple pairs to update, as each update (whether a single or multiple)
* is expensive (CASSANDRA-3831).
*/
- public void updateNormalTokens(Multimap<InetAddress, Token> endpointTokens)
+ public void updateNormalTokens(Multimap<InetAddressAndPort, Token> endpointTokens)
{
if (endpointTokens.isEmpty())
return;
@@ -193,7 +184,7 @@ public class TokenMetadata
try
{
boolean shouldSortTokens = false;
- for (InetAddress endpoint : endpointTokens.keySet())
+ for (InetAddressAndPort endpoint : endpointTokens.keySet())
{
Collection<Token> tokens = endpointTokens.get(endpoint);
@@ -208,7 +199,7 @@ public class TokenMetadata
for (Token token : tokens)
{
- InetAddress prev = tokenToEndpointMap.put(token, endpoint);
+ InetAddressAndPort prev = tokenToEndpointMap.put(token, endpoint);
if (!endpoint.equals(prev))
{
if (prev != null)
@@ -231,7 +222,7 @@ public class TokenMetadata
* Store an end-point to host ID mapping. Each ID must be unique, and
* cannot be changed after the fact.
*/
- public void updateHostId(UUID hostId, InetAddress endpoint)
+ public void updateHostId(UUID hostId, InetAddressAndPort endpoint)
{
assert hostId != null;
assert endpoint != null;
@@ -239,7 +230,7 @@ public class TokenMetadata
lock.writeLock().lock();
try
{
- InetAddress storedEp = endpointToHostIdMap.inverse().get(hostId);
+ InetAddressAndPort storedEp = endpointToHostIdMap.inverse().get(hostId);
if (storedEp != null)
{
if (!storedEp.equals(endpoint) && (FailureDetector.instance.isAlive(storedEp)))
@@ -265,7 +256,7 @@ public class TokenMetadata
}
/** Return the unique host ID for an end-point. */
- public UUID getHostId(InetAddress endpoint)
+ public UUID getHostId(InetAddressAndPort endpoint)
{
lock.readLock().lock();
try
@@ -279,7 +270,7 @@ public class TokenMetadata
}
/** Return the end-point for a unique host ID */
- public InetAddress getEndpointForHostId(UUID hostId)
+ public InetAddressAndPort getEndpointForHostId(UUID hostId)
{
lock.readLock().lock();
try
@@ -293,12 +284,12 @@ public class TokenMetadata
}
/** @return a copy of the endpoint-to-id map for read-only operations */
- public Map<InetAddress, UUID> getEndpointToHostIdMapForReading()
+ public Map<InetAddressAndPort, UUID> getEndpointToHostIdMapForReading()
{
lock.readLock().lock();
try
{
- Map<InetAddress, UUID> readMap = new HashMap<>();
+ Map<InetAddressAndPort, UUID> readMap = new HashMap<>();
readMap.putAll(endpointToHostIdMap);
return readMap;
}
@@ -309,17 +300,17 @@ public class TokenMetadata
}
@Deprecated
- public void addBootstrapToken(Token token, InetAddress endpoint)
+ public void addBootstrapToken(Token token, InetAddressAndPort endpoint)
{
addBootstrapTokens(Collections.singleton(token), endpoint);
}
- public void addBootstrapTokens(Collection<Token> tokens, InetAddress endpoint)
+ public void addBootstrapTokens(Collection<Token> tokens, InetAddressAndPort endpoint)
{
addBootstrapTokens(tokens, endpoint, null);
}
- private void addBootstrapTokens(Collection<Token> tokens, InetAddress endpoint, InetAddress original)
+ private void addBootstrapTokens(Collection<Token> tokens, InetAddressAndPort endpoint, InetAddressAndPort original)
{
assert tokens != null && !tokens.isEmpty();
assert endpoint != null;
@@ -328,7 +319,7 @@ public class TokenMetadata
try
{
- InetAddress oldEndpoint;
+ InetAddressAndPort oldEndpoint;
for (Token token : tokens)
{
@@ -352,7 +343,7 @@ public class TokenMetadata
}
}
- public void addReplaceTokens(Collection<Token> replacingTokens, InetAddress newNode, InetAddress oldNode)
+ public void addReplaceTokens(Collection<Token> replacingTokens, InetAddressAndPort newNode, InetAddressAndPort oldNode)
{
assert replacingTokens != null && !replacingTokens.isEmpty();
assert newNode != null && oldNode != null;
@@ -379,12 +370,12 @@ public class TokenMetadata
}
}
- public Optional<InetAddress> getReplacementNode(InetAddress endpoint)
+ public Optional<InetAddressAndPort> getReplacementNode(InetAddressAndPort endpoint)
{
return Optional.ofNullable(replacementToOriginal.inverse().get(endpoint));
}
- public Optional<InetAddress> getReplacingNode(InetAddress endpoint)
+ public Optional<InetAddressAndPort> getReplacingNode(InetAddressAndPort endpoint)
{
return Optional.ofNullable((replacementToOriginal.get(endpoint)));
}
@@ -405,7 +396,7 @@ public class TokenMetadata
}
}
- public void addLeavingEndpoint(InetAddress endpoint)
+ public void addLeavingEndpoint(InetAddressAndPort endpoint)
{
assert endpoint != null;
@@ -425,7 +416,7 @@ public class TokenMetadata
* @param token token which is node moving to
* @param endpoint address of the moving node
*/
- public void addMovingEndpoint(Token token, InetAddress endpoint)
+ public void addMovingEndpoint(Token token, InetAddressAndPort endpoint)
{
assert endpoint != null;
@@ -441,7 +432,7 @@ public class TokenMetadata
}
}
- public void removeEndpoint(InetAddress endpoint)
+ public void removeEndpoint(InetAddressAndPort endpoint)
{
assert endpoint != null;
@@ -469,7 +460,7 @@ public class TokenMetadata
/**
* This is called when the snitch properties for this endpoint are updated, see CASSANDRA-10238.
*/
- public void updateTopology(InetAddress endpoint)
+ public void updateTopology(InetAddressAndPort endpoint)
{
assert endpoint != null;
@@ -509,14 +500,14 @@ public class TokenMetadata
* Remove pair of token/address from moving endpoints
* @param endpoint address of the moving node
*/
- public void removeFromMoving(InetAddress endpoint)
+ public void removeFromMoving(InetAddressAndPort endpoint)
{
assert endpoint != null;
lock.writeLock().lock();
try
{
- for (Pair<Token, InetAddress> pair : movingEndpoints)
+ for (Pair<Token, InetAddressAndPort> pair : movingEndpoints)
{
if (pair.right.equals(endpoint))
{
@@ -533,7 +524,7 @@ public class TokenMetadata
}
}
- public Collection<Token> getTokens(InetAddress endpoint)
+ public Collection<Token> getTokens(InetAddressAndPort endpoint)
{
assert endpoint != null;
assert isMember(endpoint); // don't want to return nulls
@@ -550,12 +541,12 @@ public class TokenMetadata
}
@Deprecated
- public Token getToken(InetAddress endpoint)
+ public Token getToken(InetAddressAndPort endpoint)
{
return getTokens(endpoint).iterator().next();
}
- public boolean isMember(InetAddress endpoint)
+ public boolean isMember(InetAddressAndPort endpoint)
{
assert endpoint != null;
@@ -570,7 +561,7 @@ public class TokenMetadata
}
}
- public boolean isLeaving(InetAddress endpoint)
+ public boolean isLeaving(InetAddressAndPort endpoint)
{
assert endpoint != null;
@@ -585,7 +576,7 @@ public class TokenMetadata
}
}
- public boolean isMoving(InetAddress endpoint)
+ public boolean isMoving(InetAddressAndPort endpoint)
{
assert endpoint != null;
@@ -593,7 +584,7 @@ public class TokenMetadata
try
{
- for (Pair<Token, InetAddress> pair : movingEndpoints)
+ for (Pair<Token, InetAddressAndPort> pair : movingEndpoints)
{
if (pair.right.equals(endpoint))
return true;
@@ -618,7 +609,7 @@ public class TokenMetadata
lock.readLock().lock();
try
{
- return new TokenMetadata(SortedBiMultiValMap.create(tokenToEndpointMap, null, inetaddressCmp),
+ return new TokenMetadata(SortedBiMultiValMap.create(tokenToEndpointMap),
HashBiMap.create(endpointToHostIdMap),
new Topology(topology),
partitioner);
@@ -673,9 +664,9 @@ public class TokenMetadata
}
}
- private static TokenMetadata removeEndpoints(TokenMetadata allLeftMetadata, Set<InetAddress> leavingEndpoints)
+ private static TokenMetadata removeEndpoints(TokenMetadata allLeftMetadata, Set<InetAddressAndPort> leavingEndpoints)
{
- for (InetAddress endpoint : leavingEndpoints)
+ for (InetAddressAndPort endpoint : leavingEndpoints)
allLeftMetadata.removeEndpoint(endpoint);
return allLeftMetadata;
@@ -695,11 +686,11 @@ public class TokenMetadata
{
TokenMetadata metadata = cloneOnlyTokenMap();
- for (InetAddress endpoint : leavingEndpoints)
+ for (InetAddressAndPort endpoint : leavingEndpoints)
metadata.removeEndpoint(endpoint);
- for (Pair<Token, InetAddress> pair : movingEndpoints)
+ for (Pair<Token, InetAddressAndPort> pair : movingEndpoints)
metadata.updateNormalToken(pair.left, pair.right);
return metadata;
@@ -710,7 +701,7 @@ public class TokenMetadata
}
}
- public InetAddress getEndpoint(Token token)
+ public InetAddressAndPort getEndpoint(Token token)
{
lock.readLock().lock();
try
@@ -742,17 +733,17 @@ public class TokenMetadata
return sortedTokens;
}
- public Multimap<Range<Token>, InetAddress> getPendingRangesMM(String keyspaceName)
+ public Multimap<Range<Token>, InetAddressAndPort> getPendingRangesMM(String keyspaceName)
{
- Multimap<Range<Token>, InetAddress> map = HashMultimap.create();
+ Multimap<Range<Token>, InetAddressAndPort> map = HashMultimap.create();
PendingRangeMaps pendingRangeMaps = this.pendingRanges.get(keyspaceName);
if (pendingRangeMaps != null)
{
- for (Map.Entry<Range<Token>, List<InetAddress>> entry : pendingRangeMaps)
+ for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : pendingRangeMaps)
{
Range<Token> range = entry.getKey();
- for (InetAddress address : entry.getValue())
+ for (InetAddressAndPort address : entry.getValue())
{
map.put(range, address);
}
@@ -768,10 +759,10 @@ public class TokenMetadata
return this.pendingRanges.get(keyspaceName);
}
- public List<Range<Token>> getPendingRanges(String keyspaceName, InetAddress endpoint)
+ public List<Range<Token>> getPendingRanges(String keyspaceName, InetAddressAndPort endpoint)
{
List<Range<Token>> ranges = new ArrayList<>();
- for (Map.Entry<Range<Token>, InetAddress> entry : getPendingRangesMM(keyspaceName).entries())
+ for (Map.Entry<Range<Token>, InetAddressAndPort> entry : getPendingRangesMM(keyspaceName).entries())
{
if (entry.getValue().equals(endpoint))
{
@@ -824,9 +815,9 @@ public class TokenMetadata
long startedAt = System.currentTimeMillis();
// create clone of current state
- BiMultiValMap<Token, InetAddress> bootstrapTokensClone = new BiMultiValMap<>();
- Set<InetAddress> leavingEndpointsClone = new HashSet<>();
- Set<Pair<Token, InetAddress>> movingEndpointsClone = new HashSet<>();
+ BiMultiValMap<Token, InetAddressAndPort> bootstrapTokensClone = new BiMultiValMap<>();
+ Set<InetAddressAndPort> leavingEndpointsClone = new HashSet<>();
+ Set<Pair<Token, InetAddressAndPort>> movingEndpointsClone = new HashSet<>();
TokenMetadata metadata;
lock.readLock().lock();
@@ -859,29 +850,29 @@ public class TokenMetadata
*/
private static PendingRangeMaps calculatePendingRanges(AbstractReplicationStrategy strategy,
TokenMetadata metadata,
- BiMultiValMap<Token, InetAddress> bootstrapTokens,
- Set<InetAddress> leavingEndpoints,
- Set<Pair<Token, InetAddress>> movingEndpoints)
+ BiMultiValMap<Token, InetAddressAndPort> bootstrapTokens,
+ Set<InetAddressAndPort> leavingEndpoints,
+ Set<Pair<Token, InetAddressAndPort>> movingEndpoints)
{
PendingRangeMaps newPendingRanges = new PendingRangeMaps();
- Multimap<InetAddress, Range<Token>> addressRanges = strategy.getAddressRanges(metadata);
+ Multimap<InetAddressAndPort, Range<Token>> addressRanges = strategy.getAddressRanges(metadata);
// Copy of metadata reflecting the situation after all leave operations are finished.
TokenMetadata allLeftMetadata = removeEndpoints(metadata.cloneOnlyTokenMap(), leavingEndpoints);
// get all ranges that will be affected by leaving nodes
Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>();
- for (InetAddress endpoint : leavingEndpoints)
+ for (InetAddressAndPort endpoint : leavingEndpoints)
affectedRanges.addAll(addressRanges.get(endpoint));
// for each of those ranges, find what new nodes will be responsible for the range when
// all leaving nodes are gone.
for (Range<Token> range : affectedRanges)
{
- Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
- Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
- for (InetAddress address : Sets.difference(newEndpoints, currentEndpoints))
+ Set<InetAddressAndPort> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
+ Set<InetAddressAndPort> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
+ for (InetAddressAndPort address : Sets.difference(newEndpoints, currentEndpoints))
{
newPendingRanges.addPendingRange(range, address);
}
@@ -892,8 +883,8 @@ public class TokenMetadata
// For each of the bootstrapping nodes, simply add and remove them one by one to
// allLeftMetadata and check in between what their ranges would be.
- Multimap<InetAddress, Token> bootstrapAddresses = bootstrapTokens.inverse();
- for (InetAddress endpoint : bootstrapAddresses.keySet())
+ Multimap<InetAddressAndPort, Token> bootstrapAddresses = bootstrapTokens.inverse();
+ for (InetAddressAndPort endpoint : bootstrapAddresses.keySet())
{
Collection<Token> tokens = bootstrapAddresses.get(endpoint);
@@ -910,11 +901,11 @@ public class TokenMetadata
// For each of the moving nodes, we do the same thing we did for bootstrapping:
// simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be.
- for (Pair<Token, InetAddress> moving : movingEndpoints)
+ for (Pair<Token, InetAddressAndPort> moving : movingEndpoints)
{
//Calculate all the ranges which will could be affected. This will include the ranges before and after the move.
Set<Range<Token>> moveAffectedRanges = new HashSet<>();
- InetAddress endpoint = moving.right; // address of the moving node
+ InetAddressAndPort endpoint = moving.right; // address of the moving node
//Add ranges before the move
for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
{
@@ -930,10 +921,10 @@ public class TokenMetadata
for(Range<Token> range : moveAffectedRanges)
{
- Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
- Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
- Set<InetAddress> difference = Sets.difference(newEndpoints, currentEndpoints);
- for(final InetAddress address : difference)
+ Set<InetAddressAndPort> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
+ Set<InetAddressAndPort> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
+ Set<InetAddressAndPort> difference = Sets.difference(newEndpoints, currentEndpoints);
+ for(final InetAddressAndPort address : difference)
{
Collection<Range<Token>> newRanges = strategy.getAddressRanges(allLeftMetadata).get(address);
Collection<Range<Token>> oldRanges = strategy.getAddressRanges(metadata).get(address);
@@ -973,12 +964,12 @@ public class TokenMetadata
}
/** @return a copy of the bootstrapping tokens map */
- public BiMultiValMap<Token, InetAddress> getBootstrapTokens()
+ public BiMultiValMap<Token, InetAddressAndPort> getBootstrapTokens()
{
lock.readLock().lock();
try
{
- return new BiMultiValMap<Token, InetAddress>(bootstrapTokens);
+ return new BiMultiValMap<>(bootstrapTokens);
}
finally
{
@@ -986,7 +977,7 @@ public class TokenMetadata
}
}
- public Set<InetAddress> getAllEndpoints()
+ public Set<InetAddressAndPort> getAllEndpoints()
{
lock.readLock().lock();
try
@@ -1010,7 +1001,7 @@ public class TokenMetadata
}
/** caller should not modify leavingEndpoints */
- public Set<InetAddress> getLeavingEndpoints()
+ public Set<InetAddressAndPort> getLeavingEndpoints()
{
lock.readLock().lock();
try
@@ -1037,7 +1028,7 @@ public class TokenMetadata
* Endpoints which are migrating to the new tokens
* @return set of addresses of moving endpoints
*/
- public Set<Pair<Token, InetAddress>> getMovingEndpoints()
+ public Set<Pair<Token, InetAddressAndPort>> getMovingEndpoints()
{
lock.readLock().lock();
try
@@ -1148,14 +1139,14 @@ public class TokenMetadata
lock.readLock().lock();
try
{
- Multimap<InetAddress, Token> endpointToTokenMap = tokenToEndpointMap.inverse();
- Set<InetAddress> eps = endpointToTokenMap.keySet();
+ Multimap<InetAddressAndPort, Token> endpointToTokenMap = tokenToEndpointMap.inverse();
+ Set<InetAddressAndPort> eps = endpointToTokenMap.keySet();
if (!eps.isEmpty())
{
sb.append("Normal Tokens:");
sb.append(System.getProperty("line.separator"));
- for (InetAddress ep : eps)
+ for (InetAddressAndPort ep : eps)
{
sb.append(ep);
sb.append(':');
@@ -1168,7 +1159,7 @@ public class TokenMetadata
{
sb.append("Bootstrapping Tokens:" );
sb.append(System.getProperty("line.separator"));
- for (Map.Entry<Token, InetAddress> entry : bootstrapTokens.entrySet())
+ for (Map.Entry<Token, InetAddressAndPort> entry : bootstrapTokens.entrySet())
{
sb.append(entry.getValue()).append(':').append(entry.getKey());
sb.append(System.getProperty("line.separator"));
@@ -1179,7 +1170,7 @@ public class TokenMetadata
{
sb.append("Leaving Endpoints:");
sb.append(System.getProperty("line.separator"));
- for (InetAddress ep : leavingEndpoints)
+ for (InetAddressAndPort ep : leavingEndpoints)
{
sb.append(ep);
sb.append(System.getProperty("line.separator"));
@@ -1213,7 +1204,7 @@ public class TokenMetadata
return sb.toString();
}
- public Collection<InetAddress> pendingEndpointsFor(Token token, String keyspaceName)
+ public Collection<InetAddressAndPort> pendingEndpointsFor(Token token, String keyspaceName)
{
PendingRangeMaps pendingRangeMaps = this.pendingRanges.get(keyspaceName);
if (pendingRangeMaps == null)
@@ -1225,19 +1216,19 @@ public class TokenMetadata
/**
* @deprecated retained for benefit of old tests
*/
- public Collection<InetAddress> getWriteEndpoints(Token token, String keyspaceName, Collection<InetAddress> naturalEndpoints)
+ public Collection<InetAddressAndPort> getWriteEndpoints(Token token, String keyspaceName, Collection<InetAddressAndPort> naturalEndpoints)
{
return ImmutableList.copyOf(Iterables.concat(naturalEndpoints, pendingEndpointsFor(token, keyspaceName)));
}
/** @return an endpoint to token multimap representation of tokenToEndpointMap (a copy) */
- public Multimap<InetAddress, Token> getEndpointToTokenMapForReading()
+ public Multimap<InetAddressAndPort, Token> getEndpointToTokenMapForReading()
{
lock.readLock().lock();
try
{
- Multimap<InetAddress, Token> cloned = HashMultimap.create();
- for (Map.Entry<Token, InetAddress> entry : tokenToEndpointMap.entrySet())
+ Multimap<InetAddressAndPort, Token> cloned = HashMultimap.create();
+ for (Map.Entry<Token, InetAddressAndPort> entry : tokenToEndpointMap.entrySet())
cloned.put(entry.getValue(), entry.getKey());
return cloned;
}
@@ -1251,12 +1242,12 @@ public class TokenMetadata
* @return a (stable copy, won't be modified) Token to Endpoint map for all the normal and bootstrapping nodes
* in the cluster.
*/
- public Map<Token, InetAddress> getNormalAndBootstrappingTokenToEndpointMap()
+ public Map<Token, InetAddressAndPort> getNormalAndBootstrappingTokenToEndpointMap()
{
lock.readLock().lock();
try
{
- Map<Token, InetAddress> map = new HashMap<>(tokenToEndpointMap.size() + bootstrapTokens.size());
+ Map<Token, InetAddressAndPort> map = new HashMap<>(tokenToEndpointMap.size() + bootstrapTokens.size());
map.putAll(tokenToEndpointMap);
map.putAll(bootstrapTokens);
return map;
@@ -1302,11 +1293,11 @@ public class TokenMetadata
public static class Topology
{
/** multi-map of DC to endpoints in that DC */
- private final Multimap<String, InetAddress> dcEndpoints;
+ private final Multimap<String, InetAddressAndPort> dcEndpoints;
/** map of DC to multi-map of rack to endpoints in that rack */
- private final Map<String, Multimap<String, InetAddress>> dcRacks;
+ private final Map<String, Multimap<String, InetAddressAndPort>> dcRacks;
/** reverse-lookup map for endpoint to current known dc/rack assignment */
- private final Map<InetAddress, Pair<String, String>> currentLocations;
+ private final Map<InetAddressAndPort, Pair<String, String>> currentLocations;
Topology()
{
@@ -1337,7 +1328,7 @@ public class TokenMetadata
/**
* Stores current DC/rack assignment for ep
*/
- void addEndpoint(InetAddress ep)
+ void addEndpoint(InetAddressAndPort ep)
{
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
String dc = snitch.getDatacenter(ep);
@@ -1353,12 +1344,12 @@ public class TokenMetadata
doAddEndpoint(ep, dc, rack);
}
- private void doAddEndpoint(InetAddress ep, String dc, String rack)
+ private void doAddEndpoint(InetAddressAndPort ep, String dc, String rack)
{
dcEndpoints.put(dc, ep);
if (!dcRacks.containsKey(dc))
- dcRacks.put(dc, HashMultimap.<String, InetAddress>create());
+ dcRacks.put(dc, HashMultimap.create());
dcRacks.get(dc).put(rack, ep);
currentLocations.put(ep, Pair.create(dc, rack));
@@ -1367,7 +1358,7 @@ public class TokenMetadata
/**
* Removes current DC/rack assignment for ep
*/
- void removeEndpoint(InetAddress ep)
+ void removeEndpoint(InetAddressAndPort ep)
{
if (!currentLocations.containsKey(ep))
return;
@@ -1375,13 +1366,13 @@ public class TokenMetadata
doRemoveEndpoint(ep, currentLocations.remove(ep));
}
- private void doRemoveEndpoint(InetAddress ep, Pair<String, String> current)
+ private void doRemoveEndpoint(InetAddressAndPort ep, Pair<String, String> current)
{
dcRacks.get(current.left).remove(current.right, ep);
dcEndpoints.remove(current.left, ep);
}
- void updateEndpoint(InetAddress ep)
+ void updateEndpoint(InetAddressAndPort ep)
{
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
if (snitch == null || !currentLocations.containsKey(ep))
@@ -1396,11 +1387,11 @@ public class TokenMetadata
if (snitch == null)
return;
- for (InetAddress ep : currentLocations.keySet())
+ for (InetAddressAndPort ep : currentLocations.keySet())
updateEndpoint(ep, snitch);
}
- private void updateEndpoint(InetAddress ep, IEndpointSnitch snitch)
+ private void updateEndpoint(InetAddressAndPort ep, IEndpointSnitch snitch)
{
Pair<String, String> current = currentLocations.get(ep);
String dc = snitch.getDatacenter(ep);
@@ -1415,7 +1406,7 @@ public class TokenMetadata
/**
* @return multi-map of DC to endpoints in that DC
*/
- public Multimap<String, InetAddress> getDatacenterEndpoints()
+ public Multimap<String, InetAddressAndPort> getDatacenterEndpoints()
{
return dcEndpoints;
}
@@ -1423,7 +1414,7 @@ public class TokenMetadata
/**
* @return map of DC to multi-map of rack to endpoints in that rack
*/
- public Map<String, Multimap<String, InetAddress>> getDatacenterRacks()
+ public Map<String, Multimap<String, InetAddressAndPort>> getDatacenterRacks()
{
return dcRacks;
}
@@ -1431,7 +1422,7 @@ public class TokenMetadata
/**
* @return The DC and rack of the given endpoint.
*/
- public Pair<String, String> getLocation(InetAddress addr)
+ public Pair<String, String> getLocation(InetAddressAndPort addr)
{
return currentLocations.get(addr);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java b/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
index 7815784..3655a40 100644
--- a/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
@@ -17,14 +17,14 @@
*/
package org.apache.cassandra.metrics;
-import java.net.InetAddress;
-
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import org.apache.cassandra.net.async.OutboundMessagingPool;
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
/**
* Metrics for internode connections.
*/
@@ -65,10 +65,10 @@ public class ConnectionMetrics
*
* @param ip IP address to use for metrics label
*/
- public ConnectionMetrics(InetAddress ip, final OutboundMessagingPool messagingPool)
+ public ConnectionMetrics(InetAddressAndPort ip, final OutboundMessagingPool messagingPool)
{
// ipv6 addresses will contain colons, which are invalid in a JMX ObjectName
- address = ip.getHostAddress().replace(':', '.');
+ address = ip.toString().replace(':', '.');
factory = new DefaultNameFactory("Connection", address);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java b/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java
index 052830a..56888da 100644
--- a/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.metrics;
-import java.net.InetAddress;
import java.util.Map.Entry;
import com.google.common.util.concurrent.MoreExecutors;
@@ -27,6 +26,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.utils.UUIDGen;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,28 +43,28 @@ public class HintedHandoffMetrics
private static final MetricNameFactory factory = new DefaultNameFactory("HintedHandOffManager");
/** Total number of hints which are not stored, This is not a cache. */
- private final LoadingCache<InetAddress, DifferencingCounter> notStored = Caffeine.newBuilder()
- .executor(MoreExecutors.directExecutor())
- .build(DifferencingCounter::new);
+ private final LoadingCache<InetAddressAndPort, DifferencingCounter> notStored = Caffeine.newBuilder()
+ .executor(MoreExecutors.directExecutor())
+ .build(DifferencingCounter::new);
/** Total number of hints that have been created, This is not a cache. */
- private final LoadingCache<InetAddress, Counter> createdHintCounts = Caffeine.newBuilder()
- .executor(MoreExecutors.directExecutor())
- .build(address -> Metrics.counter(factory.createMetricName("Hints_created-" + address.getHostAddress().replace(':', '.'))));
+ private final LoadingCache<InetAddressAndPort, Counter> createdHintCounts = Caffeine.newBuilder()
+ .executor(MoreExecutors.directExecutor())
+ .build(address -> Metrics.counter(factory.createMetricName("Hints_created-" + address.toString().replace(':', '.'))));
- public void incrCreatedHints(InetAddress address)
+ public void incrCreatedHints(InetAddressAndPort address)
{
createdHintCounts.get(address).inc();
}
- public void incrPastWindow(InetAddress address)
+ public void incrPastWindow(InetAddressAndPort address)
{
notStored.get(address).mark();
}
public void log()
{
- for (Entry<InetAddress, DifferencingCounter> entry : notStored.asMap().entrySet())
+ for (Entry<InetAddressAndPort, DifferencingCounter> entry : notStored.asMap().entrySet())
{
long difference = entry.getValue().difference();
if (difference == 0)
@@ -79,9 +79,10 @@ public class HintedHandoffMetrics
private final Counter meter;
private long reported = 0;
- public DifferencingCounter(InetAddress address)
+ public DifferencingCounter(InetAddressAndPort address)
{
- this.meter = Metrics.counter(factory.createMetricName("Hints_not_stored-" + address.getHostAddress().replace(':', '.')));
+ //This changes the name of the metric, people can update their monitoring when upgrading?
+ this.meter = Metrics.counter(factory.createMetricName("Hints_not_stored-" + address.toString().replace(':', '.')));
}
public long difference()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java b/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java
index d6a75f7..424f502 100644
--- a/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/HintsServiceMetrics.java
@@ -17,8 +17,6 @@
*/
package org.apache.cassandra.metrics;
-import java.net.InetAddress;
-
import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,6 +25,7 @@ import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
+import org.apache.cassandra.locator.InetAddressAndPort;
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
@@ -47,11 +46,11 @@ public final class HintsServiceMetrics
private static final Histogram globalDelayHistogram = Metrics.histogram(factory.createMetricName("Hint_delays"), false);
/** Histograms per-endpoint of hint delivery delays, This is not a cache. */
- private static final LoadingCache<InetAddress, Histogram> delayByEndpoint = Caffeine.newBuilder()
- .executor(MoreExecutors.directExecutor())
- .build(address -> Metrics.histogram(factory.createMetricName("Hint_delays-"+address.getHostAddress().replace(':', '.')), false));
+ private static final LoadingCache<InetAddressAndPort, Histogram> delayByEndpoint = Caffeine.newBuilder()
+ .executor(MoreExecutors.directExecutor())
+ .build(address -> Metrics.histogram(factory.createMetricName("Hint_delays-"+address.toString().replace(':', '.')), false));
- public static void updateDelayMetrics(InetAddress endpoint, long delay)
+ public static void updateDelayMetrics(InetAddressAndPort endpoint, long delay)
{
if (delay <= 0)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/metrics/MessagingMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/MessagingMetrics.java b/src/java/org/apache/cassandra/metrics/MessagingMetrics.java
index 5f640b9..2f096f6 100644
--- a/src/java/org/apache/cassandra/metrics/MessagingMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/MessagingMetrics.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.metrics;
-import java.net.InetAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -26,6 +25,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.Timer;
+import org.apache.cassandra.locator.InetAddressAndPort;
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
@@ -47,7 +47,7 @@ public class MessagingMetrics
queueWaitLatency = new ConcurrentHashMap<>();
}
- public void addTimeTaken(InetAddress from, long timeTaken)
+ public void addTimeTaken(InetAddressAndPort from, long timeTaken)
{
String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(from);
Timer timer = dcLatency.get(dc);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/metrics/StreamingMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/StreamingMetrics.java b/src/java/org/apache/cassandra/metrics/StreamingMetrics.java
index 72e9b23..d220ca5 100644
--- a/src/java/org/apache/cassandra/metrics/StreamingMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/StreamingMetrics.java
@@ -17,11 +17,12 @@
*/
package org.apache.cassandra.metrics;
-import java.net.InetAddress;
import java.util.concurrent.ConcurrentMap;
import com.codahale.metrics.Counter;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
@@ -33,7 +34,7 @@ public class StreamingMetrics
{
public static final String TYPE_NAME = "Streaming";
- private static final ConcurrentMap<InetAddress, StreamingMetrics> instances = new NonBlockingHashMap<InetAddress, StreamingMetrics>();
+ private static final ConcurrentMap<InetAddressAndPort, StreamingMetrics> instances = new NonBlockingHashMap<>();
public static final Counter activeStreamsOutbound = Metrics.counter(DefaultNameFactory.createMetricName(TYPE_NAME, "ActiveOutboundStreams", null));
public static final Counter totalIncomingBytes = Metrics.counter(DefaultNameFactory.createMetricName(TYPE_NAME, "TotalIncomingBytes", null));
@@ -41,7 +42,7 @@ public class StreamingMetrics
public final Counter incomingBytes;
public final Counter outgoingBytes;
- public static StreamingMetrics get(InetAddress ip)
+ public static StreamingMetrics get(InetAddressAndPort ip)
{
StreamingMetrics metrics = instances.get(ip);
if (metrics == null)
@@ -52,9 +53,9 @@ public class StreamingMetrics
return metrics;
}
- public StreamingMetrics(final InetAddress peer)
+ public StreamingMetrics(final InetAddressAndPort peer)
{
- MetricNameFactory factory = new DefaultNameFactory("Streaming", peer.getHostAddress().replace(':', '.'));
+ MetricNameFactory factory = new DefaultNameFactory("Streaming", peer.toString().replace(':', '.'));
incomingBytes = Metrics.counter(factory.createMetricName("IncomingBytes"));
outgoingBytes= Metrics.counter(factory.createMetricName("OutgoingBytes"));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/BackPressureState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/BackPressureState.java b/src/java/org/apache/cassandra/net/BackPressureState.java
index 34fd0dd..886c075 100644
--- a/src/java/org/apache/cassandra/net/BackPressureState.java
+++ b/src/java/org/apache/cassandra/net/BackPressureState.java
@@ -17,7 +17,7 @@
*/
package org.apache.cassandra.net;
-import java.net.InetAddress;
+import org.apache.cassandra.locator.InetAddressAndPort;
/**
* Interface meant to track the back-pressure state per replica host.
@@ -47,5 +47,5 @@ public interface BackPressureState
/**
* Returns the host this state refers to.
*/
- InetAddress getHost();
+ InetAddressAndPort getHost();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/BackPressureStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/BackPressureStrategy.java b/src/java/org/apache/cassandra/net/BackPressureStrategy.java
index 78f748b..6b49495 100644
--- a/src/java/org/apache/cassandra/net/BackPressureStrategy.java
+++ b/src/java/org/apache/cassandra/net/BackPressureStrategy.java
@@ -17,10 +17,11 @@
*/
package org.apache.cassandra.net;
-import java.net.InetAddress;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
/**
* Back-pressure algorithm interface.
* <p>
@@ -39,5 +40,5 @@ public interface BackPressureStrategy<S extends BackPressureState>
/**
* Creates a new {@link BackPressureState} initialized as needed by the specific implementation.
*/
- S newState(InetAddress host);
+ S newState(InetAddressAndPort host);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/CallbackInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/CallbackInfo.java b/src/java/org/apache/cassandra/net/CallbackInfo.java
index ea000ae..f2ed8a1 100644
--- a/src/java/org/apache/cassandra/net/CallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/CallbackInfo.java
@@ -17,9 +17,8 @@
*/
package org.apache.cassandra.net;
-import java.net.InetAddress;
-
import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.locator.InetAddressAndPort;
/**
* Encapsulates the callback information.
@@ -28,7 +27,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
*/
public class CallbackInfo
{
- protected final InetAddress target;
+ protected final InetAddressAndPort target;
protected final IAsyncCallback callback;
protected final IVersionedSerializer<?> serializer;
private final boolean failureCallback;
@@ -41,7 +40,7 @@ public class CallbackInfo
* @param serializer serializer to deserialize response message
* @param failureCallback True when we have a callback to handle failures
*/
- public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?> serializer, boolean failureCallback)
+ public CallbackInfo(InetAddressAndPort target, IAsyncCallback callback, IVersionedSerializer<?> serializer, boolean failureCallback)
{
this.target = target;
this.callback = callback;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java b/src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java
index 83bbbf3..b58ca47 100644
--- a/src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java
+++ b/src/java/org/apache/cassandra/net/CompactEndpointSerializationHelper.java
@@ -21,28 +21,108 @@ import java.io.*;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
+import java.nio.ByteBuffer;
-public class CompactEndpointSerializationHelper
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.streaming.messages.StreamMessage;
+
+/*
+ * As of version 4.0 the endpoint description includes a port number as an unsigned short
+ */
+public class CompactEndpointSerializationHelper implements IVersionedSerializer<InetAddressAndPort>
{
- public static void serialize(InetAddress endpoint, DataOutput out) throws IOException
+ public static final IVersionedSerializer<InetAddressAndPort> instance = new CompactEndpointSerializationHelper();
+
+ /**
+ * Streaming uses its own version numbering so we need to ignore it and always use currrent version.
+ * There is no cross version streaming so it will always use the latest address serialization.
+ **/
+ public static final IVersionedSerializer<InetAddressAndPort> streamingInstance = new IVersionedSerializer<InetAddressAndPort>()
{
- byte[] buf = endpoint.getAddress();
- out.writeByte(buf.length);
- out.write(buf);
+ public void serialize(InetAddressAndPort inetAddressAndPort, DataOutputPlus out, int version) throws IOException
+ {
+ instance.serialize(inetAddressAndPort, out, MessagingService.current_version);
+ }
+
+ public InetAddressAndPort deserialize(DataInputPlus in, int version) throws IOException
+ {
+ return instance.deserialize(in, MessagingService.current_version);
+ }
+
+ public long serializedSize(InetAddressAndPort inetAddressAndPort, int version)
+ {
+ return instance.serializedSize(inetAddressAndPort, MessagingService.current_version);
+ }
+ };
+
+ private CompactEndpointSerializationHelper() {}
+
+ public void serialize(InetAddressAndPort endpoint, DataOutputPlus out, int version) throws IOException
+ {
+ if (version >= MessagingService.VERSION_40)
+ {
+ byte[] buf = endpoint.addressBytes;
+ out.writeByte(buf.length + 2);
+ out.write(buf);
+ out.writeShort(endpoint.port);
+ }
+ else
+ {
+ byte[] buf = endpoint.addressBytes;
+ out.writeByte(buf.length);
+ out.write(buf);
+ }
}
- public static InetAddress deserialize(DataInput in) throws IOException
+ public InetAddressAndPort deserialize(DataInputPlus in, int version) throws IOException
{
- byte[] bytes = new byte[in.readByte()];
- in.readFully(bytes, 0, bytes.length);
- return InetAddress.getByAddress(bytes);
+ int size = in.readByte() & 0xFF;
+ switch(size)
+ {
+ //The original pre-4.0 serialiation of just an address
+ case 4:
+ case 16:
+ {
+ byte[] bytes = new byte[size];
+ in.readFully(bytes, 0, bytes.length);
+ return InetAddressAndPort.getByAddress(bytes);
+ }
+ //Address and one port
+ case 6:
+ case 18:
+ {
+ byte[] bytes = new byte[size - 2];
+ in.readFully(bytes);
+
+ int port = in.readShort() & 0xFFFF;
+ return InetAddressAndPort.getByAddressOverrideDefaults(InetAddress.getByAddress(bytes), bytes, port);
+ }
+ default:
+ throw new AssertionError("Unexpected size " + size);
+
+ }
}
- public static int serializedSize(InetAddress from)
+ public long serializedSize(InetAddressAndPort from, int version)
{
- if (from instanceof Inet4Address)
- return 1 + 4;
- assert from instanceof Inet6Address;
- return 1 + 16;
+ //4.0 includes a port number
+ if (version >= MessagingService.VERSION_40)
+ {
+ if (from.address instanceof Inet4Address)
+ return 1 + 4 + 2;
+ assert from.address instanceof Inet6Address;
+ return 1 + 16 + 2;
+ }
+ else
+ {
+ if (from.address instanceof Inet4Address)
+ return 1 + 4;
+ assert from.address instanceof Inet6Address;
+ return 1 + 16;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/net/ForwardToContainer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/ForwardToContainer.java b/src/java/org/apache/cassandra/net/ForwardToContainer.java
new file mode 100644
index 0000000..ac9e725
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/ForwardToContainer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.util.Collection;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+/**
+ * Contains forward to information until it can be serialized as part of a message using a version
+ * specific serialization
+ */
+public class ForwardToContainer
+{
+ public final Collection<InetAddressAndPort> targets;
+ public final int[] messageIds;
+
+ public ForwardToContainer(Collection<InetAddressAndPort> targets,
+ int[] messageIds)
+ {
+ Preconditions.checkArgument(targets.size() == messageIds.length);
+ this.targets = targets;
+ this.messageIds = messageIds;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org