You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/01/25 20:12:06 UTC
[09/19] cassandra git commit: Allow storage port to be configurable
per node
http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 89d5358..cda575a 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -158,23 +158,23 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public Collection<Range<Token>> getLocalRanges(String keyspaceName)
{
- return getRangesForEndpoint(keyspaceName, FBUtilities.getBroadcastAddress());
+ return getRangesForEndpoint(keyspaceName, FBUtilities.getBroadcastAddressAndPort());
}
public Collection<Range<Token>> getPrimaryRanges(String keyspace)
{
- return getPrimaryRangesForEndpoint(keyspace, FBUtilities.getBroadcastAddress());
+ return getPrimaryRangesForEndpoint(keyspace, FBUtilities.getBroadcastAddressAndPort());
}
public Collection<Range<Token>> getPrimaryRangesWithinDC(String keyspace)
{
- return getPrimaryRangeForEndpointWithinDC(keyspace, FBUtilities.getBroadcastAddress());
+ return getPrimaryRangeForEndpointWithinDC(keyspace, FBUtilities.getBroadcastAddressAndPort());
}
- private final Set<InetAddress> replicatingNodes = Collections.synchronizedSet(new HashSet<InetAddress>());
+ private final Set<InetAddressAndPort> replicatingNodes = Collections.synchronizedSet(new HashSet<InetAddressAndPort>());
private CassandraDaemon daemon;
- private InetAddress removingNode;
+ private InetAddressAndPort removingNode;
/* Are we starting this node in bootstrap mode? */
private volatile boolean isBootstrapMode;
@@ -225,7 +225,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
SystemKeyspace.updateTokens(tokens);
Collection<Token> localTokens = getLocalTokens();
setGossipTokens(localTokens);
- tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
+ tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddressAndPort());
setMode(Mode.NORMAL, false);
}
@@ -233,6 +233,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
List<Pair<ApplicationState, VersionedValue>> states = new ArrayList<Pair<ApplicationState, VersionedValue>>();
states.add(Pair.create(ApplicationState.TOKENS, valueFactory.tokens(tokens)));
+ states.add(Pair.create(ApplicationState.STATUS_WITH_PORT, valueFactory.normal(tokens)));
states.add(Pair.create(ApplicationState.STATUS, valueFactory.normal(tokens)));
Gossiper.instance.addLocalApplicationStates(states);
}
@@ -407,7 +408,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
* they get the Gossip shutdown message, so even if
* we don't get time to broadcast this, it is not a problem.
*
- * See {@link Gossiper#markAsShutdown(InetAddress)}
+ * See {@link Gossiper#markAsShutdown(InetAddressAndPort)}
*/
private void shutdownClientServers()
{
@@ -463,9 +464,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
"To perform this operation, please restart with " +
"-Dcassandra.allow_unsafe_replace=true");
- InetAddress replaceAddress = DatabaseDescriptor.getReplaceAddress();
+ InetAddressAndPort replaceAddress = DatabaseDescriptor.getReplaceAddress();
logger.info("Gathering node replacement information for {}", replaceAddress);
- Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
+ Map<InetAddressAndPort, EndpointState> epStates = Gossiper.instance.doShadowRound();
// as we've completed the shadow round of gossip, we should be able to find the node we're replacing
if (epStates.get(replaceAddress) == null)
throw new RuntimeException(String.format("Cannot replace_address %s because it doesn't exist in gossip", replaceAddress));
@@ -503,25 +504,32 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
logger.debug("Starting shadow gossip round to check for endpoint collision");
- Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
+ Map<InetAddressAndPort, EndpointState> epStates = Gossiper.instance.doShadowRound();
// If bootstrapping, check whether any previously known status for the endpoint makes it unsafe to do so.
// If not bootstrapping, compare the host id for this endpoint learned from gossip (if any) with the local
// one, which was either read from system.local or generated at startup. If a learned id is present &
// doesn't match the local, then the node needs replacing
- if (!Gossiper.instance.isSafeForStartup(FBUtilities.getBroadcastAddress(), localHostId, shouldBootstrap(), epStates))
+ if (!Gossiper.instance.isSafeForStartup(FBUtilities.getBroadcastAddressAndPort(), localHostId, shouldBootstrap(), epStates))
{
throw new RuntimeException(String.format("A node with address %s already exists, cancelling join. " +
"Use cassandra.replace_address if you want to replace this node.",
- FBUtilities.getBroadcastAddress()));
+ FBUtilities.getBroadcastAddressAndPort()));
}
if (shouldBootstrap() && useStrictConsistency && !allowSimultaneousMoves())
{
- for (Map.Entry<InetAddress, EndpointState> entry : epStates.entrySet())
+ for (Map.Entry<InetAddressAndPort, EndpointState> entry : epStates.entrySet())
{
// ignore local node or empty status
- if (entry.getKey().equals(FBUtilities.getBroadcastAddress()) || entry.getValue().getApplicationState(ApplicationState.STATUS) == null)
+ if (entry.getKey().equals(FBUtilities.getBroadcastAddressAndPort()) || (entry.getValue().getApplicationState(ApplicationState.STATUS_WITH_PORT) == null & entry.getValue().getApplicationState(ApplicationState.STATUS) == null))
continue;
+
+ VersionedValue value = entry.getValue().getApplicationState(ApplicationState.STATUS_WITH_PORT);
+ if (value == null)
+ {
+ value = entry.getValue().getApplicationState(ApplicationState.STATUS);
+ }
+
String[] pieces = splitValue(entry.getValue().getApplicationState(ApplicationState.STATUS));
assert (pieces.length > 0);
String state = pieces[0];
@@ -553,10 +561,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true")))
{
logger.info("Populating token metadata from system tables");
- Multimap<InetAddress, Token> loadedTokens = SystemKeyspace.loadTokens();
+ Multimap<InetAddressAndPort, Token> loadedTokens = SystemKeyspace.loadTokens();
if (!shouldBootstrap()) // if we have not completed bootstrapping, we should not add ourselves as a normal token
- loadedTokens.putAll(FBUtilities.getBroadcastAddress(), SystemKeyspace.getSavedTokens());
- for (InetAddress ep : loadedTokens.keySet())
+ loadedTokens.putAll(FBUtilities.getBroadcastAddressAndPort(), SystemKeyspace.getSavedTokens());
+ for (InetAddressAndPort ep : loadedTokens.keySet())
tokenMetadata.updateNormalTokens(loadedTokens.get(ep), ep);
logger.info("Token metadata: {}", tokenMetadata);
@@ -640,10 +648,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
Collection<Token> tokens = SystemKeyspace.getSavedTokens();
if (!tokens.isEmpty())
{
- tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
+ tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddressAndPort());
// order is important here, the gossiper can fire in between adding these two states. It's ok to send TOKENS without STATUS, but *not* vice versa.
List<Pair<ApplicationState, VersionedValue>> states = new ArrayList<Pair<ApplicationState, VersionedValue>>();
states.add(Pair.create(ApplicationState.TOKENS, valueFactory.tokens(tokens)));
+ states.add(Pair.create(ApplicationState.STATUS_WITH_PORT, valueFactory.hibernate(true)));
states.add(Pair.create(ApplicationState.STATUS, valueFactory.hibernate(true)));
Gossiper.instance.addLocalApplicationStates(states);
}
@@ -659,11 +668,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true")))
{
logger.info("Loading persisted ring state");
- Multimap<InetAddress, Token> loadedTokens = SystemKeyspace.loadTokens();
- Map<InetAddress, UUID> loadedHostIds = SystemKeyspace.loadHostIds();
- for (InetAddress ep : loadedTokens.keySet())
+ Multimap<InetAddressAndPort, Token> loadedTokens = SystemKeyspace.loadTokens();
+ Map<InetAddressAndPort, UUID> loadedHostIds = SystemKeyspace.loadHostIds();
+ for (InetAddressAndPort ep : loadedTokens.keySet())
{
- if (ep.equals(FBUtilities.getBroadcastAddress()))
+ if (ep.equals(FBUtilities.getBroadcastAddressAndPort()))
{
// entry has been mistakenly added, delete it
SystemKeyspace.removeEndpoint(ep);
@@ -707,7 +716,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public static boolean isSeed()
{
- return DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress());
+ return DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddressAndPort());
}
private void prepareToJoin() throws ConfigurationException
@@ -753,6 +762,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
"the node to be replaced ({}). If the previous node has been down for longer than max_hint_window_in_ms, " +
"repair must be run after the replacement process in order to make this node consistent.",
DatabaseDescriptor.getReplaceAddress());
+ appStates.put(ApplicationState.STATUS_WITH_PORT, valueFactory.hibernate(true));
appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true));
}
}
@@ -765,10 +775,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
// for bootstrap to get the load info it needs.
// (we won't be part of the storage ring though until we add a counterId to our state, below.)
// Seed the host ID-to-endpoint map with our own ID.
- getTokenMetadata().updateHostId(localHostId, FBUtilities.getBroadcastAddress());
+ getTokenMetadata().updateHostId(localHostId, FBUtilities.getBroadcastAddressAndPort());
appStates.put(ApplicationState.NET_VERSION, valueFactory.networkVersion());
appStates.put(ApplicationState.HOST_ID, valueFactory.hostId(localHostId));
- appStates.put(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(FBUtilities.getBroadcastRpcAddress()));
+ appStates.put(ApplicationState.NATIVE_ADDRESS_AND_PORT, valueFactory.nativeaddressAndPort(FBUtilities.getBroadcastNativeAddressAndPort()));
+ appStates.put(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(FBUtilities.getJustBroadcastNativeAddress()));
appStates.put(ApplicationState.RELEASE_VERSION, valueFactory.releaseVersion());
// load the persisted ring state. This used to be done earlier in the init process,
@@ -826,16 +837,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
//
// We attempted to replace this with a schema-presence check, but you need a meaningful sleep
// to get schema info from gossip which defeats the purpose. See CASSANDRA-4427 for the gory details.
- Set<InetAddress> current = new HashSet<>();
+ Set<InetAddressAndPort> current = new HashSet<>();
if (logger.isDebugEnabled())
{
logger.debug("Bootstrap variables: {} {} {} {}",
DatabaseDescriptor.isAutoBootstrap(),
SystemKeyspace.bootstrapInProgress(),
SystemKeyspace.bootstrapComplete(),
- DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress()));
+ DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddressAndPort()));
}
- if (DatabaseDescriptor.isAutoBootstrap() && !SystemKeyspace.bootstrapComplete() && DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress()))
+ if (DatabaseDescriptor.isAutoBootstrap() && !SystemKeyspace.bootstrapComplete() && DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddressAndPort()))
{
logger.info("This node will not auto bootstrap because it is configured to be a seed node.");
}
@@ -873,13 +884,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
// get bootstrap tokens
if (!replacing)
{
- if (tokenMetadata.isMember(FBUtilities.getBroadcastAddress()))
+ if (tokenMetadata.isMember(FBUtilities.getBroadcastAddressAndPort()))
{
String s = "This node is already a member of the token ring; bootstrap aborted. (If replacing a dead node, remove the old one from the ring first.)";
throw new UnsupportedOperationException(s);
}
setMode(Mode.JOINING, "getting bootstrap token", true);
- bootstrapTokens = BootStrapper.getBootstrapTokens(tokenMetadata, FBUtilities.getBroadcastAddress(), delay);
+ bootstrapTokens = BootStrapper.getBootstrapTokens(tokenMetadata, FBUtilities.getBroadcastAddressAndPort(), delay);
}
else
{
@@ -899,7 +910,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
// check for operator errors...
for (Token token : bootstrapTokens)
{
- InetAddress existing = tokenMetadata.getEndpoint(token);
+ InetAddressAndPort existing = tokenMetadata.getEndpoint(token);
if (existing != null)
{
long nanoDelay = delay * 1000000L;
@@ -935,7 +946,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
bootstrapTokens = SystemKeyspace.getSavedTokens();
if (bootstrapTokens.isEmpty())
{
- bootstrapTokens = BootStrapper.getBootstrapTokens(tokenMetadata, FBUtilities.getBroadcastAddress(), delay);
+ bootstrapTokens = BootStrapper.getBootstrapTokens(tokenMetadata, FBUtilities.getBroadcastAddressAndPort(), delay);
}
else
{
@@ -958,7 +969,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
// remove the existing info about the replaced node.
if (!current.isEmpty())
{
- for (InetAddress existing : current)
+ for (InetAddressAndPort existing : current)
Gossiper.instance.replacedEndpoint(existing);
}
}
@@ -975,15 +986,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public static boolean isReplacingSameAddress()
{
- InetAddress replaceAddress = DatabaseDescriptor.getReplaceAddress();
- return replaceAddress != null && replaceAddress.equals(FBUtilities.getBroadcastAddress());
+ InetAddressAndPort replaceAddress = DatabaseDescriptor.getReplaceAddress();
+ return replaceAddress != null && replaceAddress.equals(FBUtilities.getBroadcastAddressAndPort());
}
public void gossipSnitchInfo()
{
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
- String dc = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
- String rack = snitch.getRack(FBUtilities.getBroadcastAddress());
+ String dc = snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+ String rack = snitch.getRack(FBUtilities.getBroadcastAddressAndPort());
Gossiper.instance.addLocalApplicationState(ApplicationState.DC, StorageService.instance.valueFactory.datacenter(dc));
Gossiper.instance.addLocalApplicationState(ApplicationState.RACK, StorageService.instance.valueFactory.rack(rack));
}
@@ -1111,7 +1122,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public boolean isJoined()
{
- return tokenMetadata.isMember(FBUtilities.getBroadcastAddress()) && !isSurveyMode;
+ return tokenMetadata.isMember(FBUtilities.getBroadcastAddressAndPort()) && !isSurveyMode;
}
public void rebuild(String sourceDc)
@@ -1141,7 +1152,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
RangeStreamer streamer = new RangeStreamer(tokenMetadata,
null,
- FBUtilities.getBroadcastAddress(),
+ FBUtilities.getBroadcastAddressAndPort(),
StreamOperation.REBUILD,
useStrictConsistency && !replacing,
DatabaseDescriptor.getEndpointSnitch(),
@@ -1202,13 +1213,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (specificSources != null)
{
String[] stringHosts = specificSources.split(",");
- Set<InetAddress> sources = new HashSet<>(stringHosts.length);
+ Set<InetAddressAndPort> sources = new HashSet<>(stringHosts.length);
for (String stringHost : stringHosts)
{
try
{
- InetAddress endpoint = InetAddress.getByName(stringHost);
- if (FBUtilities.getBroadcastAddress().equals(endpoint))
+ InetAddressAndPort endpoint = InetAddressAndPort.getByName(stringHost);
+ if (FBUtilities.getBroadcastAddressAndPort().equals(endpoint))
{
throw new IllegalArgumentException("This host was specified as a source for rebuilding. Sources for a rebuild can only be other nodes in the cluster.");
}
@@ -1449,8 +1460,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
// if not an existing token then bootstrap
List<Pair<ApplicationState, VersionedValue>> states = new ArrayList<>();
states.add(Pair.create(ApplicationState.TOKENS, valueFactory.tokens(tokens)));
+ states.add(Pair.create(ApplicationState.STATUS_WITH_PORT, replacing?
+ valueFactory.bootReplacingWithPort(DatabaseDescriptor.getReplaceAddress()) :
+ valueFactory.bootstrapping(tokens)));
states.add(Pair.create(ApplicationState.STATUS, replacing?
- valueFactory.bootReplacing(DatabaseDescriptor.getReplaceAddress()) :
+ valueFactory.bootReplacing(DatabaseDescriptor.getReplaceAddress().address) :
valueFactory.bootstrapping(tokens)));
Gossiper.instance.addLocalApplicationStates(states);
setMode(Mode.JOINING, "sleeping " + RING_DELAY + " ms for pending range setup", true);
@@ -1459,7 +1473,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
else
{
// Dont set any state for the node which is bootstrapping the existing token...
- tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
+ tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddressAndPort());
SystemKeyspace.removeEndpoint(DatabaseDescriptor.getReplaceAddress());
}
if (!Gossiper.instance.seenAnySeed())
@@ -1475,7 +1489,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
invalidateDiskBoundaries();
setMode(Mode.JOINING, "Starting to bootstrap...", true);
- BootStrapper bootstrapper = new BootStrapper(FBUtilities.getBroadcastAddress(), tokens, tokenMetadata);
+ BootStrapper bootstrapper = new BootStrapper(FBUtilities.getBroadcastAddressAndPort(), tokens, tokenMetadata);
bootstrapper.addProgressListener(progressSupport);
ListenableFuture<StreamState> bootstrapStream = bootstrapper.bootstrap(streamStateStore, useStrictConsistency && !replacing); // handles token update
Futures.addCallback(bootstrapStream, new FutureCallback<StreamState>()
@@ -1547,7 +1561,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
// get bootstrap tokens saved in system keyspace
final Collection<Token> tokens = SystemKeyspace.getSavedTokens();
// already bootstrapped ranges are filtered during bootstrap
- BootStrapper bootstrapper = new BootStrapper(FBUtilities.getBroadcastAddress(), tokens, tokenMetadata);
+ BootStrapper bootstrapper = new BootStrapper(FBUtilities.getBroadcastAddressAndPort(), tokens, tokenMetadata);
bootstrapper.addProgressListener(progressSupport);
ListenableFuture<StreamState> bootstrapStream = bootstrapper.bootstrap(streamStateStore, useStrictConsistency && !replacing); // handles token update
Futures.addCallback(bootstrapStream, new FutureCallback<StreamState>()
@@ -1608,35 +1622,67 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return tokenMetadata;
}
+ public Map<List<String>, List<String>> getRangeToEndpointMap(String keyspace)
+ {
+ return getRangeToEndpointMap(keyspace, false);
+ }
+
+ public Map<List<String>, List<String>> getRangeToEndpointWithPortMap(String keyspace)
+ {
+ return getRangeToEndpointMap(keyspace, true);
+ }
+
/**
* for a keyspace, return the ranges and corresponding listen addresses.
* @param keyspace
* @return the endpoint map
*/
- public Map<List<String>, List<String>> getRangeToEndpointMap(String keyspace)
+ public Map<List<String>, List<String>> getRangeToEndpointMap(String keyspace, boolean withPort)
{
/* All the ranges for the tokens */
Map<List<String>, List<String>> map = new HashMap<>();
- for (Map.Entry<Range<Token>,List<InetAddress>> entry : getRangeToAddressMap(keyspace).entrySet())
+ for (Map.Entry<Range<Token>,List<InetAddressAndPort>> entry : getRangeToAddressMap(keyspace).entrySet())
{
- map.put(entry.getKey().asList(), stringify(entry.getValue()));
+ map.put(entry.getKey().asList(), stringify(entry.getValue(), withPort));
}
return map;
}
/**
- * Return the rpc address associated with an endpoint as a string.
+ * Return the native address associated with an endpoint as a string.
* @param endpoint The endpoint to get rpc address for
- * @return the rpc address
+ * @return the native address
*/
- public String getRpcaddress(InetAddress endpoint)
+ public String getNativeaddress(InetAddressAndPort endpoint, boolean withPort)
{
- if (endpoint.equals(FBUtilities.getBroadcastAddress()))
- return FBUtilities.getBroadcastRpcAddress().getHostAddress();
+ if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
+ return FBUtilities.getBroadcastNativeAddressAndPort().toString(withPort);
+ else if (Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.NATIVE_ADDRESS_AND_PORT) != null)
+ {
+ try
+ {
+ InetAddressAndPort address = InetAddressAndPort.getByName(Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RPC_ADDRESS).value);
+ return address.getHostAddress(withPort);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
else if (Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RPC_ADDRESS) == null)
- return endpoint.getHostAddress();
+ return endpoint.address.getHostAddress() + ":" + DatabaseDescriptor.getNativeTransportPort();
else
- return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RPC_ADDRESS).value;
+ return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RPC_ADDRESS).value + ":" + DatabaseDescriptor.getNativeTransportPort();
+ }
+
+ public Map<List<String>, List<String>> getRangeToRpcaddressMap(String keyspace)
+ {
+ return getRangeToNativeaddressMap(keyspace, false);
+ }
+
+ public Map<List<String>, List<String>> getRangeToNativeaddressWithPortMap(String keyspace)
+ {
+ return getRangeToNativeaddressMap(keyspace, true);
}
/**
@@ -1644,16 +1690,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
* @param keyspace
* @return the endpoint map
*/
- public Map<List<String>, List<String>> getRangeToRpcaddressMap(String keyspace)
+ private Map<List<String>, List<String>> getRangeToNativeaddressMap(String keyspace, boolean withPort)
{
/* All the ranges for the tokens */
Map<List<String>, List<String>> map = new HashMap<>();
- for (Map.Entry<Range<Token>, List<InetAddress>> entry : getRangeToAddressMap(keyspace).entrySet())
+ for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : getRangeToAddressMap(keyspace).entrySet())
{
List<String> rpcaddrs = new ArrayList<>(entry.getValue().size());
- for (InetAddress endpoint: entry.getValue())
+ for (InetAddressAndPort endpoint: entry.getValue())
{
- rpcaddrs.add(getRpcaddress(endpoint));
+ rpcaddrs.add(getNativeaddress(endpoint, withPort));
}
map.put(entry.getKey().asList(), rpcaddrs);
}
@@ -1662,40 +1708,50 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public Map<List<String>, List<String>> getPendingRangeToEndpointMap(String keyspace)
{
+ return getPendingRangeToEndpointMap(keyspace, false);
+ }
+
+ public Map<List<String>, List<String>> getPendingRangeToEndpointWithPortMap(String keyspace)
+ {
+ return getPendingRangeToEndpointMap(keyspace, true);
+ }
+
+ private Map<List<String>, List<String>> getPendingRangeToEndpointMap(String keyspace, boolean withPort)
+ {
// some people just want to get a visual representation of things. Allow null and set it to the first
// non-system keyspace.
if (keyspace == null)
keyspace = Schema.instance.getNonLocalStrategyKeyspaces().get(0);
Map<List<String>, List<String>> map = new HashMap<>();
- for (Map.Entry<Range<Token>, Collection<InetAddress>> entry : tokenMetadata.getPendingRangesMM(keyspace).asMap().entrySet())
+ for (Map.Entry<Range<Token>, Collection<InetAddressAndPort>> entry : tokenMetadata.getPendingRangesMM(keyspace).asMap().entrySet())
{
- List<InetAddress> l = new ArrayList<>(entry.getValue());
- map.put(entry.getKey().asList(), stringify(l));
+ List<InetAddressAndPort> l = new ArrayList<>(entry.getValue());
+ map.put(entry.getKey().asList(), stringify(l, withPort));
}
return map;
}
- public Map<Range<Token>, List<InetAddress>> getRangeToAddressMap(String keyspace)
+ public Map<Range<Token>, List<InetAddressAndPort>> getRangeToAddressMap(String keyspace)
{
return getRangeToAddressMap(keyspace, tokenMetadata.sortedTokens());
}
- public Map<Range<Token>, List<InetAddress>> getRangeToAddressMapInLocalDC(String keyspace)
+ public Map<Range<Token>, List<InetAddressAndPort>> getRangeToAddressMapInLocalDC(String keyspace)
{
- Predicate<InetAddress> isLocalDC = new Predicate<InetAddress>()
+ Predicate<InetAddressAndPort> isLocalDC = new Predicate<InetAddressAndPort>()
{
- public boolean apply(InetAddress address)
+ public boolean apply(InetAddressAndPort address)
{
return isLocalDC(address);
}
};
- Map<Range<Token>, List<InetAddress>> origMap = getRangeToAddressMap(keyspace, getTokensInLocalDC());
- Map<Range<Token>, List<InetAddress>> filteredMap = Maps.newHashMap();
- for (Map.Entry<Range<Token>, List<InetAddress>> entry : origMap.entrySet())
+ Map<Range<Token>, List<InetAddressAndPort>> origMap = getRangeToAddressMap(keyspace, getTokensInLocalDC());
+ Map<Range<Token>, List<InetAddressAndPort>> filteredMap = Maps.newHashMap();
+ for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : origMap.entrySet())
{
- List<InetAddress> endpointsInLocalDC = Lists.newArrayList(Collections2.filter(entry.getValue(), isLocalDC));
+ List<InetAddressAndPort> endpointsInLocalDC = Lists.newArrayList(Collections2.filter(entry.getValue(), isLocalDC));
filteredMap.put(entry.getKey(), endpointsInLocalDC);
}
@@ -1707,21 +1763,21 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
List<Token> filteredTokens = Lists.newArrayList();
for (Token token : tokenMetadata.sortedTokens())
{
- InetAddress endpoint = tokenMetadata.getEndpoint(token);
+ InetAddressAndPort endpoint = tokenMetadata.getEndpoint(token);
if (isLocalDC(endpoint))
filteredTokens.add(token);
}
return filteredTokens;
}
- private boolean isLocalDC(InetAddress targetHost)
+ private boolean isLocalDC(InetAddressAndPort targetHost)
{
String remoteDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(targetHost);
- String localDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
+ String localDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
return remoteDC.equals(localDC);
}
- private Map<Range<Token>, List<InetAddress>> getRangeToAddressMap(String keyspace, List<Token> sortedTokens)
+ private Map<Range<Token>, List<InetAddressAndPort>> getRangeToAddressMap(String keyspace, List<Token> sortedTokens)
{
// some people just want to get a visual representation of things. Allow null and set it to the first
// non-system keyspace.
@@ -1733,6 +1789,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
+ public List<String> describeRingJMX(String keyspace) throws IOException
+ {
+ return describeRingJMX(keyspace, false);
+ }
+
+ public List<String> describeRingWithPortJMX(String keyspace) throws IOException
+ {
+ return describeRingJMX(keyspace,true);
+ }
+
/**
* The same as {@code describeRing(String)} but converts TokenRange to the String for JMX compatibility
*
@@ -1740,12 +1806,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
*
* @return a List of TokenRange(s) converted to String for the given keyspace
*/
- public List<String> describeRingJMX(String keyspace) throws IOException
+ private List<String> describeRingJMX(String keyspace, boolean withPort) throws IOException
{
List<TokenRange> tokenRanges;
try
{
- tokenRanges = describeRing(keyspace);
+ tokenRanges = describeRing(keyspace, false, withPort);
}
catch (InvalidRequestException e)
{
@@ -1754,7 +1820,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
List<String> result = new ArrayList<>(tokenRanges.size());
for (TokenRange tokenRange : tokenRanges)
- result.add(tokenRange.toString());
+ result.add(tokenRange.toString(withPort));
return result;
}
@@ -1770,7 +1836,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
*/
public List<TokenRange> describeRing(String keyspace) throws InvalidRequestException
{
- return describeRing(keyspace, false);
+ return describeRing(keyspace, false, false);
}
/**
@@ -1778,10 +1844,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
*/
public List<TokenRange> describeLocalRing(String keyspace) throws InvalidRequestException
{
- return describeRing(keyspace, true);
+ return describeRing(keyspace, true, false);
}
- private List<TokenRange> describeRing(String keyspace, boolean includeOnlyLocalDC) throws InvalidRequestException
+ private List<TokenRange> describeRing(String keyspace, boolean includeOnlyLocalDC, boolean withPort) throws InvalidRequestException
{
if (!Schema.instance.getKeyspaces().contains(keyspace))
throw new InvalidRequestException("No such keyspace: " + keyspace);
@@ -1792,39 +1858,49 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
List<TokenRange> ranges = new ArrayList<>();
Token.TokenFactory tf = getTokenFactory();
- Map<Range<Token>, List<InetAddress>> rangeToAddressMap =
+ Map<Range<Token>, List<InetAddressAndPort>> rangeToAddressMap =
includeOnlyLocalDC
? getRangeToAddressMapInLocalDC(keyspace)
: getRangeToAddressMap(keyspace);
- for (Map.Entry<Range<Token>, List<InetAddress>> entry : rangeToAddressMap.entrySet())
- ranges.add(TokenRange.create(tf, entry.getKey(), entry.getValue()));
+ for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : rangeToAddressMap.entrySet())
+ ranges.add(TokenRange.create(tf, entry.getKey(), entry.getValue(), withPort));
return ranges;
}
public Map<String, String> getTokenToEndpointMap()
{
- Map<Token, InetAddress> mapInetAddress = tokenMetadata.getNormalAndBootstrappingTokenToEndpointMap();
+ return getTokenToEndpointMap(false);
+ }
+
+ public Map<String, String> getTokenToEndpointWithPortMap()
+ {
+ return getTokenToEndpointMap(true);
+ }
+
+ private Map<String, String> getTokenToEndpointMap(boolean withPort)
+ {
+ Map<Token, InetAddressAndPort> mapInetAddress = tokenMetadata.getNormalAndBootstrappingTokenToEndpointMap();
// in order to preserve tokens in ascending order, we use LinkedHashMap here
Map<String, String> mapString = new LinkedHashMap<>(mapInetAddress.size());
List<Token> tokens = new ArrayList<>(mapInetAddress.keySet());
Collections.sort(tokens);
for (Token token : tokens)
{
- mapString.put(token.toString(), mapInetAddress.get(token).getHostAddress());
+ mapString.put(token.toString(), mapInetAddress.get(token).getHostAddress(withPort));
}
return mapString;
}
public String getLocalHostId()
{
- return getTokenMetadata().getHostId(FBUtilities.getBroadcastAddress()).toString();
+ return getTokenMetadata().getHostId(FBUtilities.getBroadcastAddressAndPort()).toString();
}
public UUID getLocalHostUUID()
{
- return getTokenMetadata().getHostId(FBUtilities.getBroadcastAddress());
+ return getTokenMetadata().getHostId(FBUtilities.getBroadcastAddressAndPort());
}
public Map<String, String> getHostIdMap()
@@ -1832,19 +1908,40 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return getEndpointToHostId();
}
+
public Map<String, String> getEndpointToHostId()
{
+ return getEndpointToHostId(false);
+ }
+
+ public Map<String, String> getEndpointWithPortToHostId()
+ {
+ return getEndpointToHostId(true);
+ }
+
+ private Map<String, String> getEndpointToHostId(boolean withPort)
+ {
Map<String, String> mapOut = new HashMap<>();
- for (Map.Entry<InetAddress, UUID> entry : getTokenMetadata().getEndpointToHostIdMapForReading().entrySet())
- mapOut.put(entry.getKey().getHostAddress(), entry.getValue().toString());
+ for (Map.Entry<InetAddressAndPort, UUID> entry : getTokenMetadata().getEndpointToHostIdMapForReading().entrySet())
+ mapOut.put(entry.getKey().getHostAddress(withPort), entry.getValue().toString());
return mapOut;
}
public Map<String, String> getHostIdToEndpoint()
{
+ return getHostIdToEndpoint(false);
+ }
+
+ public Map<String, String> getHostIdToEndpointWithPort()
+ {
+ return getHostIdToEndpoint(true);
+ }
+
+ private Map<String, String> getHostIdToEndpoint(boolean withPort)
+ {
Map<String, String> mapOut = new HashMap<>();
- for (Map.Entry<InetAddress, UUID> entry : getTokenMetadata().getEndpointToHostIdMapForReading().entrySet())
- mapOut.put(entry.getValue().toString(), entry.getKey().getHostAddress());
+ for (Map.Entry<InetAddressAndPort, UUID> entry : getTokenMetadata().getEndpointToHostIdMapForReading().entrySet())
+ mapOut.put(entry.getValue().toString(), entry.getKey().getHostAddress(withPort));
return mapOut;
}
@@ -1854,9 +1951,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
* @param ranges
* @return mapping of ranges to the replicas responsible for them.
*/
- private Map<Range<Token>, List<InetAddress>> constructRangeToEndpointMap(String keyspace, List<Range<Token>> ranges)
+ private Map<Range<Token>, List<InetAddressAndPort>> constructRangeToEndpointMap(String keyspace, List<Range<Token>> ranges)
{
- Map<Range<Token>, List<InetAddress>> rangeToEndpointMap = new HashMap<>(ranges.size());
+ Map<Range<Token>, List<InetAddressAndPort>> rangeToEndpointMap = new HashMap<>(ranges.size());
for (Range<Token> range : ranges)
{
rangeToEndpointMap.put(range, Keyspace.open(keyspace).getReplicationStrategy().getNaturalEndpoints(range.right));
@@ -1864,7 +1961,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return rangeToEndpointMap;
}
- public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue)
+ public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue)
{
// no-op
}
@@ -1901,9 +1998,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
* Note: Any time a node state changes from STATUS_NORMAL, it will not be visible to new nodes. So it follows that
* you should never bootstrap a new node during a removenode, decommission or move.
*/
- public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
+ public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value)
{
- if (state == ApplicationState.STATUS)
+ if (state == ApplicationState.STATUS | state == ApplicationState.STATUS_WITH_PORT)
{
String[] pieces = splitValue(value);
assert (pieces.length > 0);
@@ -1973,6 +2070,17 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
throw new RuntimeException(e);
}
break;
+ case NATIVE_ADDRESS_AND_PORT:
+ try
+ {
+ InetAddressAndPort address = InetAddressAndPort.getByName(value.value);
+ SystemKeyspace.updatePeerNativeAddress(endpoint, address);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ break;
case SCHEMA:
SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(value.value));
MigrationManager.instance.scheduleSchemaPull(endpoint, epState);
@@ -1996,7 +2104,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return value.value.split(VersionedValue.DELIMITER_STR, -1);
}
- private void updateNetVersion(InetAddress endpoint, VersionedValue value)
+ private void updateNetVersion(InetAddressAndPort endpoint, VersionedValue value)
{
try
{
@@ -2008,7 +2116,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
}
- public void updateTopology(InetAddress endpoint)
+ public void updateTopology(InetAddressAndPort endpoint)
{
if (getTokenMetadata().isMember(endpoint))
{
@@ -2021,9 +2129,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
getTokenMetadata().updateTopology();
}
- private void updatePeerInfo(InetAddress endpoint)
+ private void updatePeerInfo(InetAddressAndPort endpoint)
{
EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+ InetAddress native_address = null;
+ int native_port = DatabaseDescriptor.getNativeTransportPort();
+
for (Map.Entry<ApplicationState, VersionedValue> entry : epState.states())
{
switch (entry.getKey())
@@ -2040,7 +2151,19 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
case RPC_ADDRESS:
try
{
- SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(entry.getValue().value));
+ native_address = InetAddress.getByName(entry.getValue().value);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ break;
+ case NATIVE_ADDRESS_AND_PORT:
+ try
+ {
+ InetAddressAndPort address = InetAddressAndPort.getByName(entry.getValue().value);
+ native_address = address.address;
+ native_port = address.port;
}
catch (UnknownHostException e)
{
@@ -2055,9 +2178,17 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
break;
}
}
+
+ //Some tests won't set all the states
+ if (native_address != null)
+ {
+ SystemKeyspace.updatePeerNativeAddress(endpoint,
+ InetAddressAndPort.getByAddressOverrideDefaults(native_address,
+ native_port));
+ }
}
- private void notifyRpcChange(InetAddress endpoint, boolean ready)
+ private void notifyRpcChange(InetAddressAndPort endpoint, boolean ready)
{
if (ready)
notifyUp(endpoint);
@@ -2065,7 +2196,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
notifyDown(endpoint);
}
- private void notifyUp(InetAddress endpoint)
+ private void notifyUp(InetAddressAndPort endpoint)
{
if (!isRpcReady(endpoint) || !Gossiper.instance.isAlive(endpoint))
return;
@@ -2074,13 +2205,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
subscriber.onUp(endpoint);
}
- private void notifyDown(InetAddress endpoint)
+ private void notifyDown(InetAddressAndPort endpoint)
{
for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
subscriber.onDown(endpoint);
}
- private void notifyJoined(InetAddress endpoint)
+ private void notifyJoined(InetAddressAndPort endpoint)
{
if (!isStatus(endpoint, VersionedValue.STATUS_NORMAL))
return;
@@ -2089,25 +2220,25 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
subscriber.onJoinCluster(endpoint);
}
- private void notifyMoved(InetAddress endpoint)
+ private void notifyMoved(InetAddressAndPort endpoint)
{
for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
subscriber.onMove(endpoint);
}
- private void notifyLeft(InetAddress endpoint)
+ private void notifyLeft(InetAddressAndPort endpoint)
{
for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
subscriber.onLeaveCluster(endpoint);
}
- private boolean isStatus(InetAddress endpoint, String status)
+ private boolean isStatus(InetAddressAndPort endpoint, String status)
{
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
return state != null && state.getStatus().equals(status);
}
- public boolean isRpcReady(InetAddress endpoint)
+ public boolean isRpcReady(InetAddressAndPort endpoint)
{
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
return state != null && state.isRpcReady();
@@ -2123,7 +2254,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
*/
public void setRpcReady(boolean value)
{
- EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddress());
+ EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddressAndPort());
// if value is false we're OK with a null state, if it is true we are not.
assert !value || state != null;
@@ -2131,7 +2262,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
Gossiper.instance.addLocalApplicationState(ApplicationState.RPC_READY, valueFactory.rpcReady(value));
}
- private Collection<Token> getTokensFor(InetAddress endpoint)
+ private Collection<Token> getTokensFor(InetAddressAndPort endpoint)
{
try
{
@@ -2156,7 +2287,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
*
* @param endpoint bootstrapping node
*/
- private void handleStateBootstrap(InetAddress endpoint)
+ private void handleStateBootstrap(InetAddressAndPort endpoint)
{
Collection<Token> tokens;
// explicitly check for TOKENS, because a bootstrapping node might be bootstrapping in legacy mode; that is, not using vnodes and no token specified
@@ -2186,12 +2317,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
tokenMetadata.updateHostId(Gossiper.instance.getHostId(endpoint), endpoint);
}
- private void handleStateBootreplacing(InetAddress newNode, String[] pieces)
+ private void handleStateBootreplacing(InetAddressAndPort newNode, String[] pieces)
{
- InetAddress oldNode;
+ InetAddressAndPort oldNode;
try
{
- oldNode = InetAddress.getByName(pieces[1]);
+ oldNode = InetAddressAndPort.getByName(pieces[1]);
}
catch (Exception e)
{
@@ -2204,7 +2335,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
throw new RuntimeException(String.format("Node %s is trying to replace alive node %s.", newNode, oldNode));
}
- Optional<InetAddress> replacingNode = tokenMetadata.getReplacingNode(newNode);
+ Optional<InetAddressAndPort> replacingNode = tokenMetadata.getReplacingNode(newNode);
if (replacingNode.isPresent() && !replacingNode.get().equals(oldNode))
{
throw new RuntimeException(String.format("Node %s is already replacing %s but is trying to replace %s.",
@@ -2228,12 +2359,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
*
* @param endpoint node
*/
- private void handleStateNormal(final InetAddress endpoint, final String status)
+ private void handleStateNormal(final InetAddressAndPort endpoint, final String status)
{
Collection<Token> tokens = getTokensFor(endpoint);
Set<Token> tokensToUpdateInMetadata = new HashSet<>();
Set<Token> tokensToUpdateInSystemKeyspace = new HashSet<>();
- Set<InetAddress> endpointsToRemove = new HashSet<>();
+ Set<InetAddressAndPort> endpointsToRemove = new HashSet<>();
if (logger.isDebugEnabled())
logger.debug("Node {} state {}, token {}", endpoint, status, tokens);
@@ -2246,7 +2377,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
endpoint,
Gossiper.instance.getEndpointStateForEndpoint(endpoint));
- Optional<InetAddress> replacingNode = tokenMetadata.getReplacingNode(endpoint);
+ Optional<InetAddressAndPort> replacingNode = tokenMetadata.getReplacingNode(endpoint);
if (replacingNode.isPresent())
{
assert !endpoint.equals(replacingNode.get()) : "Pending replacement endpoint with same address is not supported";
@@ -2259,7 +2390,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
endpointsToRemove.add(replacingNode.get());
}
- Optional<InetAddress> replacementNode = tokenMetadata.getReplacementNode(endpoint);
+ Optional<InetAddressAndPort> replacementNode = tokenMetadata.getReplacementNode(endpoint);
if (replacementNode.isPresent())
{
logger.warn("Node {} is currently being replaced by node {}.", endpoint, replacementNode.get());
@@ -2268,7 +2399,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
updatePeerInfo(endpoint);
// Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300).
UUID hostId = Gossiper.instance.getHostId(endpoint);
- InetAddress existing = tokenMetadata.getEndpointForHostId(hostId);
+ InetAddressAndPort existing = tokenMetadata.getEndpointForHostId(hostId);
if (replacing && isReplacingSameAddress() && Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()) != null
&& (hostId.equals(Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress()))))
logger.warn("Not updating token metadata for {} because I am replacing it", endpoint);
@@ -2276,7 +2407,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
if (existing != null && !existing.equals(endpoint))
{
- if (existing.equals(FBUtilities.getBroadcastAddress()))
+ if (existing.equals(FBUtilities.getBroadcastAddressAndPort()))
{
logger.warn("Not updating host ID {} for {} because it's mine", hostId, endpoint);
tokenMetadata.removeEndpoint(endpoint);
@@ -2303,7 +2434,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
for (final Token token : tokens)
{
// we don't want to update if this node is responsible for the token and it has a later startup time than endpoint.
- InetAddress currentOwner = tokenMetadata.getEndpoint(token);
+ InetAddressAndPort currentOwner = tokenMetadata.getEndpoint(token);
if (currentOwner == null)
{
logger.debug("New node {} at token {}", endpoint, token);
@@ -2323,7 +2454,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
// currentOwner is no longer current, endpoint is. Keep track of these moves, because when
// a host no longer has any tokens, we'll want to remove it.
- Multimap<InetAddress, Token> epToTokenCopy = getTokenMetadata().getEndpointToTokenMapForReading();
+ Multimap<InetAddressAndPort, Token> epToTokenCopy = getTokenMetadata().getEndpointToTokenMapForReading();
epToTokenCopy.get(currentOwner).remove(token);
if (epToTokenCopy.get(currentOwner).size() < 1)
endpointsToRemove.add(currentOwner);
@@ -2348,7 +2479,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
boolean isMember = tokenMetadata.isMember(endpoint);
boolean isMoving = tokenMetadata.isMoving(endpoint);
tokenMetadata.updateNormalTokens(tokensToUpdateInMetadata, endpoint);
- for (InetAddress ep : endpointsToRemove)
+ for (InetAddressAndPort ep : endpointsToRemove)
{
removeEndpoint(ep);
if (replacing && DatabaseDescriptor.getReplaceAddress().equals(ep))
@@ -2375,7 +2506,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
*
* @param endpoint node
*/
- private void handleStateLeaving(InetAddress endpoint)
+ private void handleStateLeaving(InetAddressAndPort endpoint)
{
Collection<Token> tokens = getTokensFor(endpoint);
@@ -2408,7 +2539,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
* @param endpoint If reason for leaving is decommission, endpoint is the leaving node.
* @param pieces STATE_LEFT,token
*/
- private void handleStateLeft(InetAddress endpoint, String[] pieces)
+ private void handleStateLeft(InetAddressAndPort endpoint, String[] pieces)
{
assert pieces.length >= 2;
Collection<Token> tokens = getTokensFor(endpoint);
@@ -2425,7 +2556,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
* @param endpoint moving endpoint address
* @param pieces STATE_MOVING, token
*/
- private void handleStateMoving(InetAddress endpoint, String[] pieces)
+ private void handleStateMoving(InetAddressAndPort endpoint, String[] pieces)
{
assert pieces.length >= 2;
Token token = getTokenFactory().fromString(pieces[1]);
@@ -2444,11 +2575,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
* @param endpoint node
* @param pieces either REMOVED_TOKEN (node is gone) or REMOVING_TOKEN (replicas need to be restored)
*/
- private void handleStateRemoving(InetAddress endpoint, String[] pieces)
+ private void handleStateRemoving(InetAddressAndPort endpoint, String[] pieces)
{
assert (pieces.length > 0);
- if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+ if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
{
logger.info("Received removenode gossip about myself. Is this node rejoining after an explicit removenode?");
try
@@ -2494,7 +2625,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
}
- private void excise(Collection<Token> tokens, InetAddress endpoint)
+ private void excise(Collection<Token> tokens, InetAddressAndPort endpoint)
{
logger.info("Removing tokens {} for {}", tokens, endpoint);
@@ -2510,20 +2641,20 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
PendingRangeCalculatorService.instance.update();
}
- private void excise(Collection<Token> tokens, InetAddress endpoint, long expireTime)
+ private void excise(Collection<Token> tokens, InetAddressAndPort endpoint, long expireTime)
{
addExpireTimeIfFound(endpoint, expireTime);
excise(tokens, endpoint);
}
/** unlike excise we just need this endpoint gone without going through any notifications **/
- private void removeEndpoint(InetAddress endpoint)
+ private void removeEndpoint(InetAddressAndPort endpoint)
{
Gossiper.instance.removeEndpoint(endpoint);
SystemKeyspace.removeEndpoint(endpoint);
}
- protected void addExpireTimeIfFound(InetAddress endpoint, long expireTime)
+ protected void addExpireTimeIfFound(InetAddressAndPort endpoint, long expireTime)
{
if (expireTime != 0L)
{
@@ -2543,23 +2674,23 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
* @param ranges the ranges to find sources for
* @return multimap of addresses to ranges the address is responsible for
*/
- private Multimap<InetAddress, Range<Token>> getNewSourceRanges(String keyspaceName, Set<Range<Token>> ranges)
+ private Multimap<InetAddressAndPort, Range<Token>> getNewSourceRanges(String keyspaceName, Set<Range<Token>> ranges)
{
- InetAddress myAddress = FBUtilities.getBroadcastAddress();
- Multimap<Range<Token>, InetAddress> rangeAddresses = Keyspace.open(keyspaceName).getReplicationStrategy().getRangeAddresses(tokenMetadata.cloneOnlyTokenMap());
- Multimap<InetAddress, Range<Token>> sourceRanges = HashMultimap.create();
+ InetAddressAndPort myAddress = FBUtilities.getBroadcastAddressAndPort();
+ Multimap<Range<Token>, InetAddressAndPort> rangeAddresses = Keyspace.open(keyspaceName).getReplicationStrategy().getRangeAddresses(tokenMetadata.cloneOnlyTokenMap());
+ Multimap<InetAddressAndPort, Range<Token>> sourceRanges = HashMultimap.create();
IFailureDetector failureDetector = FailureDetector.instance;
// find alive sources for our new ranges
for (Range<Token> range : ranges)
{
- Collection<InetAddress> possibleRanges = rangeAddresses.get(range);
+ Collection<InetAddressAndPort> possibleRanges = rangeAddresses.get(range);
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
- List<InetAddress> sources = snitch.getSortedListByProximity(myAddress, possibleRanges);
+ List<InetAddressAndPort> sources = snitch.getSortedListByProximity(myAddress, possibleRanges);
assert (!sources.contains(myAddress));
- for (InetAddress source : sources)
+ for (InetAddressAndPort source : sources)
{
if (failureDetector.isAlive(source))
{
@@ -2576,7 +2707,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
*
* @param remote node to send notification to
*/
- private void sendReplicationNotification(InetAddress remote)
+ private void sendReplicationNotification(InetAddressAndPort remote)
{
// notify the remote token
MessageOut msg = new MessageOut(MessagingService.Verb.REPLICATION_FINISHED);
@@ -2608,23 +2739,23 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
*
* @param endpoint the node that left
*/
- private void restoreReplicaCount(InetAddress endpoint, final InetAddress notifyEndpoint)
+ private void restoreReplicaCount(InetAddressAndPort endpoint, final InetAddressAndPort notifyEndpoint)
{
- Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> rangesToFetch = HashMultimap.create();
+ Multimap<String, Map.Entry<InetAddressAndPort, Collection<Range<Token>>>> rangesToFetch = HashMultimap.create();
- InetAddress myAddress = FBUtilities.getBroadcastAddress();
+ InetAddressAndPort myAddress = FBUtilities.getBroadcastAddressAndPort();
for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces())
{
- Multimap<Range<Token>, InetAddress> changedRanges = getChangedRangesForLeaving(keyspaceName, endpoint);
+ Multimap<Range<Token>, InetAddressAndPort> changedRanges = getChangedRangesForLeaving(keyspaceName, endpoint);
Set<Range<Token>> myNewRanges = new HashSet<>();
- for (Map.Entry<Range<Token>, InetAddress> entry : changedRanges.entries())
+ for (Map.Entry<Range<Token>, InetAddressAndPort> entry : changedRanges.entries())
{
if (entry.getValue().equals(myAddress))
myNewRanges.add(entry.getKey());
}
- Multimap<InetAddress, Range<Token>> sourceRanges = getNewSourceRanges(keyspaceName, myNewRanges);
- for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : sourceRanges.asMap().entrySet())
+ Multimap<InetAddressAndPort, Range<Token>> sourceRanges = getNewSourceRanges(keyspaceName, myNewRanges);
+ for (Map.Entry<InetAddressAndPort, Collection<Range<Token>>> entry : sourceRanges.asMap().entrySet())
{
rangesToFetch.put(keyspaceName, entry);
}
@@ -2633,10 +2764,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
StreamPlan stream = new StreamPlan(StreamOperation.RESTORE_REPLICA_COUNT);
for (String keyspaceName : rangesToFetch.keySet())
{
- for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : rangesToFetch.get(keyspaceName))
+ for (Map.Entry<InetAddressAndPort, Collection<Range<Token>>> entry : rangesToFetch.get(keyspaceName))
{
- InetAddress source = entry.getKey();
- InetAddress preferred = SystemKeyspace.getPreferredIP(source);
+ InetAddressAndPort source = entry.getKey();
+ InetAddressAndPort preferred = SystemKeyspace.getPreferredIP(source);
Collection<Range<Token>> ranges = entry.getValue();
if (logger.isDebugEnabled())
logger.debug("Requesting from {} ranges {}", source, StringUtils.join(ranges, ", "));
@@ -2661,7 +2792,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
// needs to be modified to accept either a keyspace or ARS.
- private Multimap<Range<Token>, InetAddress> getChangedRangesForLeaving(String keyspaceName, InetAddress endpoint)
+ private Multimap<Range<Token>, InetAddressAndPort> getChangedRangesForLeaving(String keyspaceName, InetAddressAndPort endpoint)
{
// First get all ranges the leaving endpoint is responsible for
Collection<Range<Token>> ranges = getRangesForEndpoint(keyspaceName, endpoint);
@@ -2669,7 +2800,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (logger.isDebugEnabled())
logger.debug("Node {} ranges [{}]", endpoint, StringUtils.join(ranges, ", "));
- Map<Range<Token>, List<InetAddress>> currentReplicaEndpoints = new HashMap<>(ranges.size());
+ Map<Range<Token>, List<InetAddressAndPort>> currentReplicaEndpoints = new HashMap<>(ranges.size());
// Find (for each range) all nodes that store replicas for these ranges as well
TokenMetadata metadata = tokenMetadata.cloneOnlyTokenMap(); // don't do this in the loop! #7758
@@ -2683,7 +2814,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (temp.isMember(endpoint))
temp.removeEndpoint(endpoint);
- Multimap<Range<Token>, InetAddress> changedRanges = HashMultimap.create();
+ Multimap<Range<Token>, InetAddressAndPort> changedRanges = HashMultimap.create();
// Go through the ranges and for each range check who will be
// storing replicas for these ranges when the leaving endpoint
@@ -2692,7 +2823,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
// range.
for (Range<Token> range : ranges)
{
- Collection<InetAddress> newReplicaEndpoints = Keyspace.open(keyspaceName).getReplicationStrategy().calculateNaturalEndpoints(range.right, temp);
+ Collection<InetAddressAndPort> newReplicaEndpoints = Keyspace.open(keyspaceName).getReplicationStrategy().calculateNaturalEndpoints(range.right, temp);
newReplicaEndpoints.removeAll(currentReplicaEndpoints.get(range));
if (logger.isDebugEnabled())
if (newReplicaEndpoints.isEmpty())
@@ -2705,7 +2836,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return changedRanges;
}
- public void onJoin(InetAddress endpoint, EndpointState epState)
+ public void onJoin(InetAddressAndPort endpoint, EndpointState epState)
{
for (Map.Entry<ApplicationState, VersionedValue> entry : epState.states())
{
@@ -2714,7 +2845,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
MigrationManager.instance.scheduleSchemaPull(endpoint, epState);
}
- public void onAlive(InetAddress endpoint, EndpointState state)
+ public void onAlive(InetAddressAndPort endpoint, EndpointState state)
{
MigrationManager.instance.scheduleSchemaPull(endpoint, state);
@@ -2722,19 +2853,19 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
notifyUp(endpoint);
}
- public void onRemove(InetAddress endpoint)
+ public void onRemove(InetAddressAndPort endpoint)
{
tokenMetadata.removeEndpoint(endpoint);
PendingRangeCalculatorService.instance.update();
}
- public void onDead(InetAddress endpoint, EndpointState state)
+ public void onDead(InetAddressAndPort endpoint, EndpointState state)
{
MessagingService.instance().convict(endpoint);
notifyDown(endpoint);
}
- public void onRestart(InetAddress endpoint, EndpointState state)
+ public void onRestart(InetAddressAndPort endpoint, EndpointState state)
{
// If we have restarted before the node was even marked down, we need to reset the connection pool
if (state.isAlive())
@@ -2753,15 +2884,25 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return FileUtils.stringifyFileSize(StorageMetrics.load.getCount());
}
+ public Map<String, String> getLoadMapWithPort()
+ {
+ return getLoadMap(true);
+ }
+
public Map<String, String> getLoadMap()
{
+ return getLoadMap(false);
+ }
+
+ private Map<String, String> getLoadMap(boolean withPort)
+ {
Map<String, String> map = new HashMap<>();
- for (Map.Entry<InetAddress,Double> entry : LoadBroadcaster.instance.getLoadInfo().entrySet())
+ for (Map.Entry<InetAddressAndPort,Double> entry : LoadBroadcaster.instance.getLoadInfo().entrySet())
{
- map.put(entry.getKey().getHostAddress(), FileUtils.stringifyFileSize(entry.getValue()));
+ map.put(entry.getKey().getHostAddress(withPort), FileUtils.stringifyFileSize(entry.getValue()));
}
// gossiper doesn't see its own updates, so we need to special-case the local node
- map.put(FBUtilities.getBroadcastAddress().getHostAddress(), getLoadString());
+ map.put(withPort ? FBUtilities.getJustBroadcastAddress().getHostAddress() : FBUtilities.getBroadcastAddressAndPort().toString(), getLoadString());
return map;
}
@@ -2779,13 +2920,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
@Nullable
- public InetAddress getEndpointForHostId(UUID hostId)
+ public InetAddressAndPort getEndpointForHostId(UUID hostId)
{
return tokenMetadata.getEndpointForHostId(hostId);
}
@Nullable
- public UUID getHostIdForEndpoint(InetAddress address)
+ public UUID getHostIdForEndpoint(InetAddressAndPort address)
{
return tokenMetadata.getHostId(address);
}
@@ -2794,15 +2935,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public List<String> getTokens()
{
- return getTokens(FBUtilities.getBroadcastAddress());
+ return getTokens(FBUtilities.getBroadcastAddressAndPort());
}
public List<String> getTokens(String endpoint) throws UnknownHostException
{
- return getTokens(InetAddress.getByName(endpoint));
+ return getTokens(InetAddressAndPort.getByName(endpoint));
}
- private List<String> getTokens(InetAddress endpoint)
+ private List<String> getTokens(InetAddressAndPort endpoint)
{
List<String> strTokens = new ArrayList<>();
for (Token tok : getTokenMetadata().getTokens(endpoint))
@@ -2820,42 +2961,74 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return Schema.instance.getVersion().toString();
}
+ @Deprecated
public List<String> getLeavingNodes()
{
- return stringify(tokenMetadata.getLeavingEndpoints());
+ return stringify(tokenMetadata.getLeavingEndpoints(), false);
+ }
+
+ public List<String> getLeavingNodesWithPort()
+ {
+ return stringify(tokenMetadata.getLeavingEndpoints(), true);
}
+ @Deprecated
public List<String> getMovingNodes()
{
List<String> endpoints = new ArrayList<>();
- for (Pair<Token, InetAddress> node : tokenMetadata.getMovingEndpoints())
+ for (Pair<Token, InetAddressAndPort> node : tokenMetadata.getMovingEndpoints())
+ {
+ endpoints.add(node.right.address.getHostAddress());
+ }
+
+ return endpoints;
+ }
+
+ public List<String> getMovingNodesWithPort()
+ {
+ List<String> endpoints = new ArrayList<>();
+
+ for (Pair<Token, InetAddressAndPort> node : tokenMetadata.getMovingEndpoints())
{
- endpoints.add(node.right.getHostAddress());
+ endpoints.add(node.right.toString());
}
return endpoints;
}
+
public List<String> getJoiningNodes()
{
- return stringify(tokenMetadata.getBootstrapTokens().valueSet());
+ return stringify(tokenMetadata.getBootstrapTokens().valueSet(), false);
+ }
+
+ @Deprecated
+ public List<String> getJoiningNodesWithPort()
+ {
+ return stringify(tokenMetadata.getBootstrapTokens().valueSet(), true);
}
public List<String> getLiveNodes()
{
- return stringify(Gossiper.instance.getLiveMembers());
+ return stringify(Gossiper.instance.getLiveMembers(), false);
+ }
+
+ @Deprecated
+ public List<String> getLiveNodesWithPort()
+ {
+ return stringify(Gossiper.instance.getLiveMembers(), true);
}
- public Set<InetAddress> getLiveRingMembers()
+ public Set<InetAddressAndPort> getLiveRingMembers()
{
return getLiveRingMembers(false);
}
- public Set<InetAddress> getLiveRingMembers(boolean excludeDeadStates)
+ public Set<InetAddressAndPort> getLiveRingMembers(boolean excludeDeadStates)
{
- Set<InetAddress> ret = new HashSet<>();
- for (InetAddress ep : Gossiper.instance.getLiveMembers())
+ Set<InetAddressAndPort> ret = new HashSet<>();
+ for (InetAddressAndPort ep : Gossiper.instance.getLiveMembers())
{
if (excludeDeadStates)
{
@@ -2871,9 +3044,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
+ @Deprecated
public List<String> getUnreachableNodes()
{
- return stringify(Gossiper.instance.getUnreachableMembers());
+ return stringify(Gossiper.instance.getUnreachableMembers(), false);
+ }
+
+ public List<String> getUnreachableNodesWithPort()
+ {
+ return stringify(Gossiper.instance.getUnreachableMembers(), true);
}
public String[] getAllDataFileLocations()
@@ -2894,19 +3073,19 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return FileUtils.getCanonicalPath(DatabaseDescriptor.getSavedCachesLocation());
}
- private List<String> stringify(Iterable<InetAddress> endpoints)
+ private List<String> stringify(Iterable<InetAddressAndPort> endpoints, boolean withPort)
{
List<String> stringEndpoints = new ArrayList<>();
- for (InetAddress ep : endpoints)
+ for (InetAddressAndPort ep : endpoints)
{
- stringEndpoints.add(ep.getHostAddress());
+ stringEndpoints.add(ep.getHostAddress(withPort));
}
return stringEndpoints;
}
public int getCurrentGenerationNumber()
{
- return Gossiper.instance.getCurrentGenerationNumber(FBUtilities.getBroadcastAddress());
+ return Gossiper.instance.getCurrentGenerationNumber(FBUtilities.getBroadcastAddressAndPort());
}
public int forceKeyspaceCleanup(String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException
@@ -3436,14 +3615,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
* @param ep endpoint we are interested in.
* @return primary ranges for the specified endpoint.
*/
- public Collection<Range<Token>> getPrimaryRangesForEndpoint(String keyspace, InetAddress ep)
+ public Collection<Range<Token>> getPrimaryRangesForEndpoint(String keyspace, InetAddressAndPort ep)
{
AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy();
Collection<Range<Token>> primaryRanges = new HashSet<>();
TokenMetadata metadata = tokenMetadata.cloneOnlyTokenMap();
for (Token token : metadata.sortedTokens())
{
- List<InetAddress> endpoints = strategy.calculateNaturalEndpoints(token, metadata);
+ List<InetAddressAndPort> endpoints = strategy.calculateNaturalEndpoints(token, metadata);
if (endpoints.size() > 0 && endpoints.get(0).equals(ep))
primaryRanges.add(new Range<>(metadata.getPredecessor(token), token));
}
@@ -3453,23 +3632,23 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
/**
* Get the "primary ranges" within local DC for the specified keyspace and endpoint.
*
- * @see #getPrimaryRangesForEndpoint(String, java.net.InetAddress)
+ * @see #getPrimaryRangesForEndpoint(String, InetAddressAndPort)
* @param keyspace Keyspace name to check primary ranges
* @param referenceEndpoint endpoint we are interested in.
* @return primary ranges within local DC for the specified endpoint.
*/
- public Collection<Range<Token>> getPrimaryRangeForEndpointWithinDC(String keyspace, InetAddress referenceEndpoint)
+ public Collection<Range<Token>> getPrimaryRangeForEndpointWithinDC(String keyspace, InetAddressAndPort referenceEndpoint)
{
TokenMetadata metadata = tokenMetadata.cloneOnlyTokenMap();
String localDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(referenceEndpoint);
- Collection<InetAddress> localDcNodes = metadata.getTopology().getDatacenterEndpoints().get(localDC);
+ Collection<InetAddressAndPort> localDcNodes = metadata.getTopology().getDatacenterEndpoints().get(localDC);
AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy();
Collection<Range<Token>> localDCPrimaryRanges = new HashSet<>();
for (Token token : metadata.sortedTokens())
{
- List<InetAddress> endpoints = strategy.calculateNaturalEndpoints(token, metadata);
- for (InetAddress endpoint : endpoints)
+ List<InetAddressAndPort> endpoints = strategy.calculateNaturalEndpoints(token, metadata);
+ for (InetAddressAndPort endpoint : endpoints)
{
if (localDcNodes.contains(endpoint))
{
@@ -3490,7 +3669,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
* @param ep endpoint we are interested in.
* @return ranges for the specified endpoint.
*/
- Collection<Range<Token>> getRangesForEndpoint(String keyspaceName, InetAddress ep)
+ Collection<Range<Token>> getRangesForEndpoint(String keyspaceName, InetAddressAndPort ep)
{
return Keyspace.open(keyspaceName).getReplicationStrategy().getAddressRanges().get(ep);
}
@@ -3530,6 +3709,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
* @param key key for which we need to find the endpoint
* @return the endpoint responsible for this key
*/
+ @Deprecated
public List<InetAddress> getNaturalEndpoints(String keyspaceName, String cf, String key)
{
KeyspaceMetadata ksMetaData = Schema.instance.getKeyspaceMetadata(keyspaceName);
@@ -3540,12 +3720,32 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (metadata == null)
throw new IllegalArgumentException("Unknown table '" + cf + "' in keyspace '" + keyspaceName + "'");
- return getNaturalEndpoints(keyspaceName, tokenMetadata.partitioner.getToken(metadata.partitionKeyType.fromString(key)));
+ return getNaturalEndpoints(keyspaceName, tokenMetadata.partitioner.getToken(metadata.partitionKeyType.fromString(key))).stream().map(i -> i.address).collect(toList());
+ }
+
+ public List<String> getNaturalEndpointsWithPort(String keyspaceName, String cf, String key)
+ {
+ KeyspaceMetadata ksMetaData = Schema.instance.getKeyspaceMetadata(keyspaceName);
+ if (ksMetaData == null)
+ throw new IllegalArgumentException("Unknown keyspace '" + keyspaceName + "'");
+
+ TableMetadata metadata = ksMetaData.getTableOrViewNullable(cf);
+ if (metadata == null)
+ throw new IllegalArgumentException("Unknown table '" + cf + "' in keyspace '" + keyspaceName + "'");
+
+ return stringify(getNaturalEndpoints(keyspaceName, tokenMetadata.partitioner.getToken(metadata.partitionKeyType.fromString(key))), true);
}
+
+ @Deprecated
public List<InetAddress> getNaturalEndpoints(String keyspaceName, ByteBuffer key)
{
- return getNaturalEndpoints(keyspaceName, tokenMetadata.partitioner.getToken(key));
+ return getNaturalEndpoints(keyspaceName, tokenMetadata.partitioner.getToken(key)).stream().map(i -> i.address).collect(toList());
+ }
+
+ public List<String> getNaturalEndpointsWithPort(String keyspaceName, ByteBuffer key)
+ {
+ return stringify(getNaturalEndpoints(keyspaceName, tokenMetadata.partitioner.getToken(key)), true);
}
/**
@@ -3556,7 +3756,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
* @param pos position for which we need to find the endpoint
* @return the endpoint responsible for this token
*/
- public List<InetAddress> getNaturalEndpoints(String keyspaceName, RingPosition pos)
+ public List<InetAddressAndPort> getNaturalEndpoints(String keyspaceName, RingPosition pos)
{
return Keyspace.open(keyspaceName).getReplicationStrategy().getNaturalEndpoints(pos);
}
@@ -3564,7 +3764,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
/**
* Returns the endpoints currently responsible for storing the token plus pending ones
*/
- public Iterable<InetAddress> getNaturalAndPendingEndpoints(String keyspaceName, Token token)
+ public Iterable<InetAddressAndPort> getNaturalAndPendingEndpoints(String keyspaceName, Token token)
{
return Iterables.concat(getNaturalEndpoints(keyspaceName, token), tokenMetadata.pendingEndpointsFor(token, keyspaceName));
}
@@ -3577,14 +3777,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
* @param key key for which we need to find the endpoint
* @return the endpoint responsible for this key
*/
- public List<InetAddress> getLiveNaturalEndpoints(Keyspace keyspace, ByteBuffer key)
+ public List<InetAddressAndPort> getLiveNaturalEndpoints(Keyspace keyspace, ByteBuffer key)
{
return getLiveNaturalEndpoints(keyspace, tokenMetadata.decorateKey(key));
}
- public List<InetAddress> getLiveNaturalEndpoints(Keyspace keyspace, RingPosition pos)
+ public List<InetAddressAndPort> getLiveNaturalEndpoints(Keyspace keyspace, RingPosition pos)
{
- List<InetAddress> liveEps = new ArrayList<>();
+ List<InetAddressAndPort> liveEps = new ArrayList<>();
getLiveNaturalEndpoints(keyspace, pos, liveEps);
return liveEps;
}
@@ -3597,11 +3797,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
* @param pos position for which we need to find the endpoint
* @param liveEps the list of endpoints to mutate
*/
- public void getLiveNaturalEndpoints(Keyspace keyspace, RingPosition pos, List<InetAddress> liveEps)
+ public void getLiveNaturalEndpoints(Keyspace keyspace, RingPosition pos, List<InetAddressAndPort> liveEps)
{
- List<InetAddress> endpoints = keyspace.getReplicationStrategy().getNaturalEndpoints(pos);
+ List<InetAddressAndPort> endpoints = keyspace.getReplicationStrategy().getNaturalEndpoints(pos);
- for (InetAddress endpoint : endpoints)
+ for (InetAddressAndPort endpoint : endpoints)
{
if (FailureDetector.instance.isAlive(endpoint))
liveEps.add(endpoint);
@@ -3718,8 +3918,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
*/
private void startLeaving()
{
+ Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS_WITH_PORT, valueFactory.leaving(getLocalTokens()));
Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.leaving(getLocalTokens()));
- tokenMetadata.addLeavingEndpoint(FBUtilities.getBroadcastAddress());
+ tokenMetadata.addLeavingEndpoint(FBUtilities.getBroadcastAddressAndPort());
PendingRangeCalculatorService.instance.update();
}
@@ -3728,7 +3929,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
TokenMetadata metadata = tokenMetadata.cloneAfterAllLeft();
if (operationMode != Mode.LEAVING)
{
- if (!tokenMetadata.isMember(FBUtilities.getBroadcastAddress()))
+ if (!tokenMetadata.isMember(FBUtilities.getBroadcastAddressAndPort()))
throw new UnsupportedOperationException("local node is not a member of the token ring yet");
if (metadata.getAllEndpoints().size() < 2)
throw new UnsupportedOperationException("no other normal nodes in the ring; decommission would be pointless");
@@ -3745,7 +3946,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
PendingRangeCalculatorService.instance.blockUntilFinished();
- String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
+ String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
if (operationMode != Mode.LEAVING) // If we're already decommissioning there is no point checking RF/pending ranges
{
@@ -3772,7 +3973,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
+ keyspaceName + " (RF = " + rf + ", N = " + numNodes + ")."
+ " Perform a forceful decommission to ignore.");
}
- if (tokenMetadata.getPendingRanges(keyspaceName, FBUtilities.getBroadcastAddress()).size() > 0)
+ if (tokenMetadata.getPendingRanges(keyspaceName, FBUtilities.getBroadcastAddressAndPort()).size() > 0)
throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring");
}
}
@@ -3822,9 +4023,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private void leaveRing()
{
SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.NEEDS_BOOTSTRAP);
- tokenMetadata.removeEndpoint(FBUtilities.getBroadcastAddress());
+ tokenMetadata.removeEndpoint(FBUtilities.getBroadcastAddressAndPort());
PendingRangeCalculatorService.instance.update();
+ Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS_WITH_PORT, valueFactory.left(getLocalTokens(),Gossiper.computeExpireTime()));
Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.left(getLocalTokens(),Gossiper.computeExpireTime()));
int delay = Math.max(RING_DELAY, Gossiper.intervalInMillis * 2);
logger.info("Announcing that I have left the ring for {}ms", delay);
@@ -3833,11 +4035,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private void unbootstrap(Runnable onFinish) throws ExecutionException, InterruptedException
{
- Map<String, Multimap<Range<Token>, InetAddress>> rangesToStream = new HashMap<>();
+ Map<String, Multimap<Range<Token>, InetAddressAndPort>> rangesToStream = new HashMap<>();
for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces())
{
- Multimap<Range<Token>, InetAddress> rangesMM = getChangedRangesForLeaving(keyspaceName, FBUtilities.getBroadcastAddress());
+ Multimap<Range<Token>, InetAddressAndPort> rangesMM = getChangedRangesForLeaving(keyspaceName, FBUtilities.getBroadcastAddressAndPort());
if (logger.isDebugEnabled())
logger.debug("Ranges needing transfer are [{}]", StringUtils.join(rangesMM.keySet(), ","));
@@ -3878,11 +4080,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
*/
private UUID getPreferredHintsStreamTarget()
{
- List<InetAddress> candidates = new ArrayList<>(StorageService.instance.getTokenMetadata().cloneAfterAllLeft().getAllEndpoints());
-
<TRUNCATED>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org