You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2015/11/27 13:58:09 UTC
cassandra git commit: Warn or fail when changing cluster topology live
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 8cd13f138 -> 7650fc196
Warn or fail when changing cluster topology live
patch by Stefania; reviewed by Paulo Motta for CASSANDRA-10243
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7650fc19
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7650fc19
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7650fc19
Branch: refs/heads/cassandra-2.1
Commit: 7650fc196341bd673626054593f2ce6e895d7783
Parents: 8cd13f1
Author: Stefania <st...@datastax.com>
Authored: Fri Nov 27 13:49:15 2015 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Fri Nov 27 13:49:15 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 6 +
build.xml | 1 +
src/java/org/apache/cassandra/gms/Gossiper.java | 28 +-
.../locator/GossipingPropertyFileSnitch.java | 107 ++-----
.../cassandra/locator/PropertyFileSnitch.java | 74 ++++-
.../cassandra/locator/SnitchProperties.java | 5 +
.../locator/YamlFileNetworkTopologySnitch.java | 111 +++++--
.../cassandra/service/MigrationManager.java | 4 +-
.../apache/cassandra/service/StorageProxy.java | 10 +-
.../cassandra/service/StorageService.java | 32 +-
.../GossipingPropertyFileSnitchTest.java | 35 +-
.../locator/PropertyFileSnitchTest.java | 321 +++++++++++++++++++
.../YamlFileNetworkTopologySnitchTest.java | 293 +++++++++++++++--
14 files changed, 823 insertions(+), 205 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7650fc19/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 111852c..a2f7b6e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.12
+ * Warn or fail when changing cluster topology live (CASSANDRA-10243)
* Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213)
* Some DROP ... IF EXISTS incorrectly result in exceptions on non-existing KS (CASSANDRA-10658)
* DeletionTime.compareTo wrong in rare cases (CASSANDRA-10749)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7650fc19/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 54a6b79..cae8dfb 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -32,6 +32,12 @@ New features
- a new validate(key, cf) method is added to PerRowSecondaryIndex. A default
implementation is provided, so no changes are required to custom implementations.
+Operations
+------------
+ - Changing rack or dc of live nodes is no longer possible for PropertyFileSnitch
+ and YamlFileNetworkTopologySnitch. Reloading the configuration file of
+ GossipingPropertyFileSnitch has been disabled, CASSANDRA-10243.
+
2.1.11
=====
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7650fc19/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 28a8f74..45bb13f 100644
--- a/build.xml
+++ b/build.xml
@@ -1528,6 +1528,7 @@
<sourceFolder url="file://$MODULE_DIR$/test/long" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/test/pig" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/test/microbench" isTestSource="true" />
+ <sourceFolder url="file://$MODULE_DIR$/test/conf" type="java-test-resource" />
<sourceFolder url="file://$MODULE_DIR$/test/resources" type="java-test-resource" />
<excludeFolder url="file://$MODULE_DIR$/.idea" />
<excludeFolder url="file://$MODULE_DIR$/.settings" />
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7650fc19/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 2f69d66..09851b2 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -256,24 +256,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
subscribers.remove(subscriber);
}
- public Set<InetAddress> getLiveMembers()
+ public Set<InetAddress> getLiveEndpoints()
{
- Set<InetAddress> liveMembers = new HashSet<InetAddress>(liveEndpoints);
- if (!liveMembers.contains(FBUtilities.getBroadcastAddress()))
- liveMembers.add(FBUtilities.getBroadcastAddress());
- return liveMembers;
- }
-
- public Set<InetAddress> getLiveTokenOwners()
- {
- Set<InetAddress> tokenOwners = new HashSet<InetAddress>();
- for (InetAddress member : getLiveMembers())
- {
- EndpointState epState = endpointStateMap.get(member);
- if (epState != null && !isDeadState(epState) && StorageService.instance.getTokenMetadata().isMember(member))
- tokenOwners.add(member);
- }
- return tokenOwners;
+ Set<InetAddress> liveEndpoints = new HashSet<InetAddress>(this.liveEndpoints);
+ if (!liveEndpoints.contains(FBUtilities.getBroadcastAddress()))
+ liveEndpoints.add(FBUtilities.getBroadcastAddress());
+ return liveEndpoints;
}
/**
@@ -981,7 +969,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
MessagingService.instance().sendRR(echoMessage, addr, echoHandler);
}
- private void realMarkAlive(final InetAddress addr, final EndpointState localState)
+ @VisibleForTesting
+ public void realMarkAlive(final InetAddress addr, final EndpointState localState)
{
if (logger.isTraceEnabled())
logger.trace("marking as alive {}", addr);
@@ -998,7 +987,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
logger.trace("Notified " + subscribers);
}
- private void markDead(InetAddress addr, EndpointState localState)
+ @VisibleForTesting
+ public void markDead(InetAddress addr, EndpointState localState)
{
if (logger.isTraceEnabled())
logger.trace("marking as down {}", addr);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7650fc19/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java b/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java
index f3f38a0..e2449ae 100644
--- a/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java
+++ b/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -32,8 +32,6 @@ import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.ResourceWatcher;
-import org.apache.cassandra.utils.WrappedRunnable;
public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch// implements IEndpointStateChangeSubscriber
@@ -42,28 +40,23 @@ public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch//
private PropertyFileSnitch psnitch;
- private volatile String myDC;
- private volatile String myRack;
- private volatile boolean preferLocal;
- private AtomicReference<ReconnectableSnitchHelper> snitchHelperReference;
- private volatile boolean gossipStarted;
+ private final String myDC;
+ private final String myRack;
+ private final boolean preferLocal;
+ private final AtomicReference<ReconnectableSnitchHelper> snitchHelperReference;
private Map<InetAddress, Map<String, String>> savedEndpoints;
private static final String DEFAULT_DC = "UNKNOWN_DC";
private static final String DEFAULT_RACK = "UNKNOWN_RACK";
- private static final int DEFAULT_REFRESH_PERIOD_IN_SECONDS = 60;
-
public GossipingPropertyFileSnitch() throws ConfigurationException
{
- this(DEFAULT_REFRESH_PERIOD_IN_SECONDS);
- }
-
- public GossipingPropertyFileSnitch(int refreshPeriodInSeconds) throws ConfigurationException
- {
- snitchHelperReference = new AtomicReference<ReconnectableSnitchHelper>();
+ SnitchProperties properties = loadConfiguration();
- reloadConfiguration(false);
+ myDC = properties.get("dc", DEFAULT_DC).trim();
+ myRack = properties.get("rack", DEFAULT_RACK).trim();
+ preferLocal = Boolean.parseBoolean(properties.get("prefer_local", "false"));
+ snitchHelperReference = new AtomicReference<>();
try
{
@@ -74,23 +67,15 @@ public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch//
{
logger.info("Unable to load {}; compatibility mode disabled", PropertyFileSnitch.SNITCH_PROPERTIES_FILENAME);
}
+ }
- try
- {
- FBUtilities.resourceToFile(SnitchProperties.RACKDC_PROPERTY_FILENAME);
- Runnable runnable = new WrappedRunnable()
- {
- protected void runMayThrow() throws ConfigurationException
- {
- reloadConfiguration(true);
- }
- };
- ResourceWatcher.watch(SnitchProperties.RACKDC_PROPERTY_FILENAME, runnable, refreshPeriodInSeconds * 1000);
- }
- catch (ConfigurationException ex)
- {
- logger.error("{} found, but does not look like a plain file. Will not watch it for changes", SnitchProperties.RACKDC_PROPERTY_FILENAME);
- }
+ private static SnitchProperties loadConfiguration() throws ConfigurationException
+ {
+ final SnitchProperties properties = new SnitchProperties();
+ if (!properties.contains("dc") || !properties.contains("rack"))
+ throw new ConfigurationException("DC or rack not found in snitch properties, check your configuration in: " + SnitchProperties.RACKDC_PROPERTY_FILENAME);
+
+ return properties;
}
/**
@@ -156,56 +141,18 @@ public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch//
Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_IP,
StorageService.instance.valueFactory.internalIP(FBUtilities.getLocalAddress().getHostAddress()));
- reloadGossiperState();
-
- gossipStarted = true;
+ loadGossiperState();
}
-
- private void reloadConfiguration(boolean isUpdate) throws ConfigurationException
- {
- final SnitchProperties properties = new SnitchProperties();
-
- String newDc = properties.get("dc", null);
- String newRack = properties.get("rack", null);
- if (newDc == null || newRack == null)
- throw new ConfigurationException("DC or rack not found in snitch properties, check your configuration in: " + SnitchProperties.RACKDC_PROPERTY_FILENAME);
-
- newDc = newDc.trim();
- newRack = newRack.trim();
- final boolean newPreferLocal = Boolean.parseBoolean(properties.get("prefer_local", "false"));
- if (!newDc.equals(myDC) || !newRack.equals(myRack) || (preferLocal != newPreferLocal))
- {
- myDC = newDc;
- myRack = newRack;
- preferLocal = newPreferLocal;
-
- reloadGossiperState();
-
- if (StorageService.instance != null)
- {
- if (isUpdate)
- StorageService.instance.updateTopology(FBUtilities.getBroadcastAddress());
- else
- StorageService.instance.getTokenMetadata().invalidateCachedRings();
- }
+ private void loadGossiperState()
+ {
+ assert Gossiper.instance != null;
- if (gossipStarted)
- StorageService.instance.gossipSnitchInfo();
- }
- }
+ ReconnectableSnitchHelper pendingHelper = new ReconnectableSnitchHelper(this, myDC, preferLocal);
+ Gossiper.instance.register(pendingHelper);
- private void reloadGossiperState()
- {
- if (Gossiper.instance != null)
- {
- ReconnectableSnitchHelper pendingHelper = new ReconnectableSnitchHelper(this, myDC, preferLocal);
- Gossiper.instance.register(pendingHelper);
-
- pendingHelper = snitchHelperReference.getAndSet(pendingHelper);
- if (pendingHelper != null)
- Gossiper.instance.unregister(pendingHelper);
- }
- // else this will eventually rerun at gossiperStarting()
+ pendingHelper = snitchHelperReference.getAndSet(pendingHelper);
+ if (pendingHelper != null)
+ Gossiper.instance.unregister(pendingHelper);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7650fc19/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java b/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
index f293081..6115572 100644
--- a/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
+++ b/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
@@ -23,12 +23,16 @@ import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.Properties;
+import java.util.Set;
+import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
@@ -51,6 +55,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
private static final Logger logger = LoggerFactory.getLogger(PropertyFileSnitch.class);
public static final String SNITCH_PROPERTIES_FILENAME = "cassandra-topology.properties";
+ private static final int DEFAULT_REFRESH_PERIOD_IN_SECONDS = 5;
private static volatile Map<InetAddress, String[]> endpointMap;
private static volatile String[] defaultDCRack;
@@ -59,6 +64,11 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
public PropertyFileSnitch() throws ConfigurationException
{
+ this(DEFAULT_REFRESH_PERIOD_IN_SECONDS);
+ }
+
+ public PropertyFileSnitch(int refreshPeriodInSeconds) throws ConfigurationException
+ {
reloadConfiguration(false);
try
@@ -71,7 +81,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
reloadConfiguration(true);
}
};
- ResourceWatcher.watch(SNITCH_PROPERTIES_FILENAME, runnable, 60 * 1000);
+ ResourceWatcher.watch(SNITCH_PROPERTIES_FILENAME, runnable, refreshPeriodInSeconds * 1000);
}
catch (ConfigurationException ex)
{
@@ -85,7 +95,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
* @param endpoint endpoint to process
* @return a array of string with the first index being the data center and the second being the rack
*/
- public String[] getEndpointInfo(InetAddress endpoint)
+ public static String[] getEndpointInfo(InetAddress endpoint)
{
String[] rawEndpointInfo = getRawEndpointInfo(endpoint);
if (rawEndpointInfo == null)
@@ -93,7 +103,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
return rawEndpointInfo;
}
- private String[] getRawEndpointInfo(InetAddress endpoint)
+ private static String[] getRawEndpointInfo(InetAddress endpoint)
{
String[] value = endpointMap.get(endpoint);
if (value == null)
@@ -132,7 +142,8 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
public void reloadConfiguration(boolean isUpdate) throws ConfigurationException
{
- HashMap<InetAddress, String[]> reloadedMap = new HashMap<InetAddress, String[]>();
+ HashMap<InetAddress, String[]> reloadedMap = new HashMap<>();
+ String[] reloadedDefaultDCRack = null;
Properties properties = new Properties();
InputStream stream = null;
@@ -155,18 +166,18 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
String key = (String) entry.getKey();
String value = (String) entry.getValue();
- if (key.equals("default"))
+ if ("default".equals(key))
{
String[] newDefault = value.split(":");
if (newDefault.length < 2)
- defaultDCRack = new String[] { "default", "default" };
+ reloadedDefaultDCRack = new String[] { "default", "default" };
else
- defaultDCRack = new String[] { newDefault[0].trim(), newDefault[1].trim() };
+ reloadedDefaultDCRack = new String[] { newDefault[0].trim(), newDefault[1].trim() };
}
else
{
InetAddress host;
- String hostString = key.replace("/", "");
+ String hostString = StringUtils.remove(key, '/');
try
{
host = InetAddress.getByName(hostString);
@@ -183,18 +194,24 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
reloadedMap.put(host, token);
}
}
- if (defaultDCRack == null && !reloadedMap.containsKey(FBUtilities.getBroadcastAddress()))
- throw new ConfigurationException(String.format("Snitch definitions at %s do not define a location for this node's broadcast address %s, nor does it provides a default",
+ if (reloadedDefaultDCRack == null && !reloadedMap.containsKey(FBUtilities.getBroadcastAddress()))
+ throw new ConfigurationException(String.format("Snitch definitions at %s do not define a location for " +
+ "this node's broadcast address %s, nor does it provides a default",
SNITCH_PROPERTIES_FILENAME, FBUtilities.getBroadcastAddress()));
+ if (isUpdate && !livenessCheck(reloadedMap, reloadedDefaultDCRack))
+ return;
+
if (logger.isDebugEnabled())
{
StringBuilder sb = new StringBuilder();
for (Map.Entry<InetAddress, String[]> entry : reloadedMap.entrySet())
- sb.append(entry.getKey()).append(":").append(Arrays.toString(entry.getValue())).append(", ");
+ sb.append(entry.getKey()).append(':').append(Arrays.toString(entry.getValue())).append(", ");
logger.debug("Loaded network topology from property file: {}", StringUtils.removeEnd(sb.toString(), ", "));
}
+
+ defaultDCRack = reloadedDefaultDCRack;
endpointMap = reloadedMap;
if (StorageService.instance != null) // null check tolerates circular dependency; see CASSANDRA-4145
{
@@ -208,6 +225,41 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
StorageService.instance.gossipSnitchInfo();
}
+ /**
+ * We cannot update rack or data-center for a live node, see CASSANDRA-10243.
+ *
+ * @param reloadedMap - the new map of hosts to dc:rack properties
+ * @param reloadedDefaultDCRack - the default dc:rack or null if no default
+ * @return true if we can continue updating (no live host had dc or rack updated)
+ */
+ private static boolean livenessCheck(HashMap<InetAddress, String[]> reloadedMap, String[] reloadedDefaultDCRack)
+ {
+ // If the default has changed we must check all live hosts but hopefully we will find a live
+ // host quickly and interrupt the loop. Otherwise we only check the live hosts that were either
+ // in the old set or in the new set
+ Set<InetAddress> hosts = Arrays.equals(defaultDCRack, reloadedDefaultDCRack)
+ ? Sets.intersection(StorageService.instance.getLiveMembers(), // same default
+ Sets.union(endpointMap.keySet(), reloadedMap.keySet()))
+ : StorageService.instance.getLiveMembers(); // default updated
+
+ for (InetAddress host : hosts)
+ {
+ String[] origValue = endpointMap.containsKey(host) ? endpointMap.get(host) : defaultDCRack;
+ String[] updateValue = reloadedMap.containsKey(host) ? reloadedMap.get(host) : reloadedDefaultDCRack;
+
+ if (!Arrays.equals(origValue, updateValue))
+ {
+ logger.error("Cannot update data center or rack from {} to {} for live host {}, property file NOT RELOADED",
+ origValue,
+ updateValue,
+ host);
+ return false;
+ }
+ }
+
+ return true;
+ }
+
@Override
public void gossiperStarting()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7650fc19/src/java/org/apache/cassandra/locator/SnitchProperties.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/SnitchProperties.java b/src/java/org/apache/cassandra/locator/SnitchProperties.java
index be89fcf..8fdda7a 100644
--- a/src/java/org/apache/cassandra/locator/SnitchProperties.java
+++ b/src/java/org/apache/cassandra/locator/SnitchProperties.java
@@ -66,4 +66,9 @@ public class SnitchProperties
{
return properties.getProperty(propertyName, defaultValue);
}
+
+ public boolean contains(String propertyName)
+ {
+ return properties.containsKey(propertyName);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7650fc19/src/java/org/apache/cassandra/locator/YamlFileNetworkTopologySnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/YamlFileNetworkTopologySnitch.java b/src/java/org/apache/cassandra/locator/YamlFileNetworkTopologySnitch.java
index 4139662..870eea8 100644
--- a/src/java/org/apache/cassandra/locator/YamlFileNetworkTopologySnitch.java
+++ b/src/java/org/apache/cassandra/locator/YamlFileNetworkTopologySnitch.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.gms.ApplicationState;
@@ -40,7 +41,9 @@ import org.yaml.snakeyaml.TypeDescription;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.Constructor;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
+import com.google.common.collect.Sets;
import com.google.common.net.InetAddresses;
/**
@@ -60,10 +63,10 @@ public class YamlFileNetworkTopologySnitch
/**
* How often to check the topology configuration file, in milliseconds; defaults to one minute.
*/
- private static final int CHECK_PERIOD_IN_MS = 60 * 1000;
+ private static final int CHECK_PERIOD_IN_MS = 5 * 1000;
/** Default name for the topology configuration file. */
- private static final String DEFAULT_TOPOLOGY_CONFIG_FILENAME = "cassandra-topology.yaml";
+ static final String DEFAULT_TOPOLOGY_CONFIG_FILENAME = "cassandra-topology.yaml";
/** Node data map, keyed by broadcast address. */
private volatile Map<InetAddress, NodeData> nodeDataMap;
@@ -196,44 +199,29 @@ public class YamlFileNetworkTopologySnitch
public String dc_local_address;
}
- /**
- * Loads the topology configuration file.
- *
- * @throws ConfigurationException
- * on failure
- */
- private synchronized void loadTopologyConfiguration(boolean isUpdate)
- throws ConfigurationException
+ public TopologyConfig readConfig() throws ConfigurationException
{
- logger.debug("Loading topology configuration from {}",
- topologyConfigFilename);
+ final TypeDescription topologyConfigTypeDescription = new TypeDescription(TopologyConfig.class);
+ topologyConfigTypeDescription.putListPropertyType("topology", Datacenter.class);
- final TypeDescription topologyConfigTypeDescription = new TypeDescription(
- TopologyConfig.class);
- topologyConfigTypeDescription.putListPropertyType("topology",
- Datacenter.class);
-
- final TypeDescription topologyTypeDescription = new TypeDescription(
- Datacenter.class);
+ final TypeDescription topologyTypeDescription = new TypeDescription(Datacenter.class);
topologyTypeDescription.putListPropertyType("racks", Rack.class);
- final TypeDescription rackTypeDescription = new TypeDescription(
- Rack.class);
+ final TypeDescription rackTypeDescription = new TypeDescription(Rack.class);
rackTypeDescription.putListPropertyType("nodes", Node.class);
- final Constructor configConstructor = new Constructor(
- TopologyConfig.class);
+ final Constructor configConstructor = new Constructor(TopologyConfig.class);
configConstructor.addTypeDescription(topologyConfigTypeDescription);
configConstructor.addTypeDescription(topologyTypeDescription);
configConstructor.addTypeDescription(rackTypeDescription);
final InputStream configFileInputStream = getClass().getClassLoader()
- .getResourceAsStream(topologyConfigFilename);
+ .getResourceAsStream(topologyConfigFilename);
if (configFileInputStream == null)
{
throw new ConfigurationException(
- "Could not read topology config file "
- + topologyConfigFilename);
+ "Could not read topology config file "
+ + topologyConfigFilename);
}
Yaml yaml;
TopologyConfig topologyConfig;
@@ -246,7 +234,30 @@ public class YamlFileNetworkTopologySnitch
{
FileUtils.closeQuietly(configFileInputStream);
}
- final Map<InetAddress, NodeData> nodeDataMap = new HashMap<InetAddress, NodeData>();
+
+ return topologyConfig;
+ }
+
+ /**
+ * Loads the topology configuration file.
+ *
+ * @throws ConfigurationException
+ * on failure
+ */
+ private synchronized void loadTopologyConfiguration(boolean isUpdate)
+ throws ConfigurationException
+ {
+ logger.debug("Loading topology configuration from {}",
+ topologyConfigFilename);
+
+ loadTopologyConfiguration(isUpdate, readConfig());
+ }
+
+ @VisibleForTesting
+ synchronized void loadTopologyConfiguration(boolean isUpdate, TopologyConfig topologyConfig)
+ throws ConfigurationException
+ {
+ final Map<InetAddress, NodeData> nodeDataMap = new HashMap<>();
if (topologyConfig.topology == null)
{
@@ -336,6 +347,9 @@ public class YamlFileNetworkTopologySnitch
defaultNodeData.datacenter = topologyConfig.default_dc_name;
defaultNodeData.rack = topologyConfig.default_rack_name;
+ if (isUpdate && !livenessCheck(nodeDataMap, defaultNodeData))
+ return;
+
// YAML configuration looks good; now make the changes
this.nodeDataMap = nodeDataMap;
@@ -360,6 +374,41 @@ public class YamlFileNetworkTopologySnitch
}
/**
+ * We cannot update rack or data-center for a live node, see CASSANDRA-10243.
+ *
+ * @param reloadedMap - the new map of hosts to NodeData
+ * @param reloadedDefaultData - the default NodeData
+ * @return true if we can continue updating (no live host had dc or rack updated)
+ */
+ private boolean livenessCheck(Map<InetAddress, NodeData> reloadedMap, NodeData reloadedDefaultData)
+ {
+ // If the default has changed we must check all live hosts but hopefully we will find a live
+ // host quickly and interrupt the loop. Otherwise we only check the live hosts that were either
+ // in the old set or in the new set
+ Set<InetAddress> hosts = NodeData.isSameDcRack(defaultNodeData, reloadedDefaultData)
+ ? Sets.intersection(StorageService.instance.getLiveMembers(), // same default
+ Sets.union(nodeDataMap.keySet(), reloadedMap.keySet()))
+ : StorageService.instance.getLiveMembers(); // default updated
+
+ for (InetAddress host : hosts)
+ {
+ NodeData origValue = nodeDataMap.containsKey(host) ? nodeDataMap.get(host) : defaultNodeData;
+ NodeData updateValue = reloadedMap.containsKey(host) ? reloadedMap.get(host) : reloadedDefaultData;
+
+ if (!NodeData.isSameDcRack(origValue, updateValue))
+ {
+ logger.error("Cannot update data center or rack from {} to {} for live host {}, property file NOT RELOADED",
+ new String[] { origValue.datacenter, origValue.rack }, // same format as error in PropertyFileSnitch,
+ new String[] { updateValue.datacenter, updateValue.rack },
+ host);
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
* be careful about just blindly updating ApplicationState.INTERNAL_IP everytime we read the yaml file,
* as that can cause connections to get unnecessarily reset (via IESCS.onChange()).
*/
@@ -382,7 +431,7 @@ public class YamlFileNetworkTopologySnitch
/**
* Topology data for a node.
*/
- private class NodeData
+ private static final class NodeData
{
/** Data center name. */
public String datacenter;
@@ -402,6 +451,12 @@ public class YamlFileNetworkTopologySnitch
.add("rack", rack).add("dcLocalAddress", dcLocalAddress)
.toString();
}
+
+ public static boolean isSameDcRack(NodeData a, NodeData b)
+ {
+ return a == b ||
+ (a != null && Objects.equal(a.datacenter, b.datacenter) && Objects.equal(a.rack, b.rack));
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7650fc19/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index bebfa43..3539602 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -395,7 +395,7 @@ public class MigrationManager
}
});
- for (InetAddress endpoint : Gossiper.instance.getLiveMembers())
+ for (InetAddress endpoint : Gossiper.instance.getLiveEndpoints())
{
// only push schema to nodes with known and equal versions
if (!endpoint.equals(FBUtilities.getBroadcastAddress()) &&
@@ -439,7 +439,7 @@ public class MigrationManager
Schema.instance.clear();
- Set<InetAddress> liveEndpoints = Gossiper.instance.getLiveMembers();
+ Set<InetAddress> liveEndpoints = Gossiper.instance.getLiveEndpoints();
liveEndpoints.remove(FBUtilities.getBroadcastAddress());
// force migration if there are nodes around
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7650fc19/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index b701015..0e90ea6 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1863,7 +1863,7 @@ public class StorageProxy implements StorageProxyMBean
{
final String myVersion = Schema.instance.getVersion().toString();
final Map<InetAddress, UUID> versions = new ConcurrentHashMap<InetAddress, UUID>();
- final Set<InetAddress> liveHosts = Gossiper.instance.getLiveMembers();
+ final Set<InetAddress> liveHosts = Gossiper.instance.getLiveEndpoints();
final CountDownLatch latch = new CountDownLatch(liveHosts.size());
IAsyncCallback<UUID> cb = new IAsyncCallback<UUID>()
@@ -1897,7 +1897,7 @@ public class StorageProxy implements StorageProxyMBean
// maps versions to hosts that are on that version.
Map<String, List<String>> results = new HashMap<String, List<String>>();
- Iterable<InetAddress> allHosts = Iterables.concat(Gossiper.instance.getLiveMembers(), Gossiper.instance.getUnreachableMembers());
+ Iterable<InetAddress> allHosts = Iterables.concat(Gossiper.instance.getLiveEndpoints(), Gossiper.instance.getUnreachableMembers());
for (InetAddress host : allHosts)
{
UUID version = versions.get(host);
@@ -2125,12 +2125,12 @@ public class StorageProxy implements StorageProxyMBean
// Since the truncate operation is so aggressive and is typically only
// invoked by an admin, for simplicity we require that all nodes are up
// to perform the operation.
- int liveMembers = Gossiper.instance.getLiveMembers().size();
+ int liveMembers = Gossiper.instance.getLiveEndpoints().size();
throw new UnavailableException(ConsistencyLevel.ALL, liveMembers + Gossiper.instance.getUnreachableMembers().size(), liveMembers);
}
- Set<InetAddress> allEndpoints = Gossiper.instance.getLiveTokenOwners();
-
+ Set<InetAddress> allEndpoints = StorageService.instance.getLiveMembers(true);
+
int blockFor = allEndpoints.size();
final TruncateResponseHandler responseHandler = new TruncateResponseHandler(blockFor);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7650fc19/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 03c1960..5503123 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -17,8 +17,6 @@
*/
package org.apache.cassandra.service;
-import static java.nio.charset.StandardCharsets.ISO_8859_1;
-
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.File;
@@ -574,7 +572,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
while (true)
{
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
- for (InetAddress address : Gossiper.instance.getLiveMembers())
+ for (InetAddress address : Gossiper.instance.getLiveEndpoints())
{
if (!Gossiper.instance.isFatClient(address))
break outer;
@@ -2303,9 +2301,33 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public List<String> getLiveNodes()
{
- return stringify(Gossiper.instance.getLiveMembers());
+ return stringify(Gossiper.instance.getLiveEndpoints());
+ }
+
+ public Set<InetAddress> getLiveMembers()
+ {
+ return getLiveMembers(false);
+ }
+
+ public Set<InetAddress> getLiveMembers(boolean excludeDeadStates)
+ {
+ Set<InetAddress> ret = new HashSet<>();
+ for (InetAddress ep : Gossiper.instance.getLiveEndpoints())
+ {
+ if (excludeDeadStates)
+ {
+ EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(ep);
+ if (epState == null || Gossiper.instance.isDeadState(epState))
+ continue;
+ }
+
+ if (tokenMetadata.isMember(ep))
+ ret.add(ep);
+ }
+ return ret;
}
+
public List<String> getUnreachableNodes()
{
return stringify(Gossiper.instance.getUnreachableMembers());
@@ -3746,7 +3768,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (endpoint.equals(myAddress))
throw new UnsupportedOperationException("Cannot remove self");
- if (Gossiper.instance.getLiveMembers().contains(endpoint))
+ if (Gossiper.instance.getLiveEndpoints().contains(endpoint))
throw new UnsupportedOperationException("Node " + endpoint + " is alive and owns this ID. Use decommission command to remove it from the ring");
// A leaving endpoint that is dead is already being removed.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7650fc19/test/unit/org/apache/cassandra/locator/GossipingPropertyFileSnitchTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/GossipingPropertyFileSnitchTest.java b/test/unit/org/apache/cassandra/locator/GossipingPropertyFileSnitchTest.java
index 9026ebf..80d4559 100644
--- a/test/unit/org/apache/cassandra/locator/GossipingPropertyFileSnitchTest.java
+++ b/test/unit/org/apache/cassandra/locator/GossipingPropertyFileSnitchTest.java
@@ -17,12 +17,9 @@
*/
package org.apache.cassandra.locator;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
+import org.junit.Test;
import org.apache.cassandra.utils.FBUtilities;
-import org.junit.Test;
/**
* Unit tests for {@link GossipingPropertyFileSnitch}.
@@ -30,30 +27,12 @@ import org.junit.Test;
public class GossipingPropertyFileSnitchTest
{
@Test
- public void testAutoReloadConfig() throws Exception
+ public void testLoadConfig() throws Exception
{
- String confFile = FBUtilities.resourceToFile(SnitchProperties.RACKDC_PROPERTY_FILENAME);
-
- final GossipingPropertyFileSnitch snitch = new GossipingPropertyFileSnitch(/*refreshPeriodInSeconds*/1);
- YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, FBUtilities.getBroadcastAddress().getHostAddress(), "DC1", "RAC1");
-
- final Path effectiveFile = Paths.get(confFile);
- final Path backupFile = Paths.get(confFile + ".bak");
- final Path modifiedFile = Paths.get(confFile + ".mod");
-
- try
- {
- Files.copy(effectiveFile, backupFile);
- Files.copy(modifiedFile, effectiveFile, java.nio.file.StandardCopyOption.REPLACE_EXISTING);
-
- Thread.sleep(1500);
-
- YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, FBUtilities.getBroadcastAddress().getHostAddress(), "DC2", "RAC2");
- }
- finally
- {
- Files.copy(backupFile, effectiveFile, java.nio.file.StandardCopyOption.REPLACE_EXISTING);
- Files.delete(backupFile);
- }
+ final GossipingPropertyFileSnitch snitch = new GossipingPropertyFileSnitch();
+ YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch,
+ FBUtilities.getBroadcastAddress().getHostAddress(),
+ "DC1",
+ "RAC1");
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7650fc19/test/unit/org/apache/cassandra/locator/PropertyFileSnitchTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/PropertyFileSnitchTest.java b/test/unit/org/apache/cassandra/locator/PropertyFileSnitchTest.java
new file mode 100644
index 0000000..24b8c77
--- /dev/null
+++ b/test/unit/org/apache/cassandra/locator/PropertyFileSnitchTest.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.locator;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.RandomPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link PropertyFileSnitch}.
+ */
+public class PropertyFileSnitchTest
+{
+ private Path effectiveFile;
+ private Path backupFile;
+
+ private VersionedValue.VersionedValueFactory valueFactory;
+ private Map<InetAddress, Set<Token>> tokenMap;
+
+ @Before
+ public void setup() throws ConfigurationException, IOException
+ {
+ String confFile = FBUtilities.resourceToFile(PropertyFileSnitch.SNITCH_PROPERTIES_FILENAME);
+ effectiveFile = Paths.get(confFile);
+ backupFile = Paths.get(confFile + ".bak");
+
+ restoreOrigConfigFile();
+
+ InetAddress[] hosts = {
+ InetAddress.getByName("127.0.0.1"), // this exists in the config file
+ InetAddress.getByName("127.0.0.2"), // this exists in the config file
+ InetAddress.getByName("127.0.0.9"), // this does not exist in the config file
+ };
+
+ IPartitioner partitioner = new RandomPartitioner();
+ valueFactory = new VersionedValue.VersionedValueFactory(partitioner);
+ tokenMap = new HashMap<>();
+
+ for (InetAddress host : hosts)
+ {
+ Set<Token> tokens = Collections.singleton(partitioner.getRandomToken());
+ Gossiper.instance.initializeNodeUnsafe(host, UUID.randomUUID(), 1);
+ Gossiper.instance.injectApplicationState(host, ApplicationState.TOKENS, valueFactory.tokens(tokens));
+
+ setNodeShutdown(host);
+ tokenMap.put(host, tokens);
+ }
+ }
+
+ private void restoreOrigConfigFile() throws IOException
+ {
+ if (Files.exists(backupFile))
+ {
+ Files.copy(backupFile, effectiveFile, java.nio.file.StandardCopyOption.REPLACE_EXISTING);
+ Files.delete(backupFile);
+ }
+ }
+
+ private void replaceConfigFile(Map<String, String> replacements) throws IOException
+ {
+ List<String> lines = Files.readAllLines(effectiveFile, StandardCharsets.UTF_8);
+ List<String> newLines = new ArrayList<>(lines.size());
+ Set<String> replaced = new HashSet<>();
+
+ for (String line : lines)
+ {
+ String[] info = line.split("=");
+ if (info.length == 2 && replacements.containsKey(info[0]))
+ {
+ String replacement = replacements.get(info[0]);
+ if (!replacement.isEmpty()) // empty means remove this line
+ newLines.add(info[0] + '=' + replacement);
+
+ replaced.add(info[0]);
+ }
+ else
+ {
+ newLines.add(line);
+ }
+ }
+
+ // add any new lines that were not replaced
+ for (Map.Entry<String, String> replacement : replacements.entrySet())
+ {
+ if (replaced.contains(replacement.getKey()))
+ continue;
+
+ if (!replacement.getValue().isEmpty()) // empty means remove this line so do nothing here
+ newLines.add(replacement.getKey() + '=' + replacement.getValue());
+ }
+
+ Files.write(effectiveFile, newLines, StandardCharsets.UTF_8, StandardOpenOption.TRUNCATE_EXISTING);
+ }
+
+ private void setNodeShutdown(InetAddress host)
+ {
+ StorageService.instance.getTokenMetadata().removeEndpoint(host);
+ Gossiper.instance.injectApplicationState(host, ApplicationState.STATUS, valueFactory.shutdown(true));
+ Gossiper.instance.markDead(host, Gossiper.instance.getEndpointStateForEndpoint(host));
+ }
+
+ private void setNodeLive(InetAddress host)
+ {
+ Gossiper.instance.injectApplicationState(host, ApplicationState.STATUS, valueFactory.normal(tokenMap.get(host)));
+ Gossiper.instance.realMarkAlive(host, Gossiper.instance.getEndpointStateForEndpoint(host));
+ StorageService.instance.getTokenMetadata().updateNormalTokens(tokenMap.get(host), host);
+ }
+
+ /**
+ * Test that changing rack for a host in the configuration file is only effective if the host is not live.
+ * The original configuration file contains: 127.0.0.1=DC1:RAC1
+ */
+ @Test
+ public void testChangeHostRack() throws Exception
+ {
+ final InetAddress host = InetAddress.getByName("127.0.0.1");
+ final PropertyFileSnitch snitch = new PropertyFileSnitch(/*refreshPeriodInSeconds*/1);
+ YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC1");
+
+ try
+ {
+ setNodeLive(host);
+
+ Files.copy(effectiveFile, backupFile);
+ replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "DC1:RAC2"));
+
+ Thread.sleep(1500);
+ YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC1");
+
+ setNodeShutdown(host);
+ replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "DC1:RAC2"));
+
+ Thread.sleep(1500);
+ YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC2");
+ }
+ finally
+ {
+ restoreOrigConfigFile();
+ setNodeShutdown(host);
+ }
+ }
+
+ /**
+ * Test that changing dc for a host in the configuration file is only effective if the host is not live.
+ * The original configuration file contains: 127.0.0.1=DC1:RAC1
+ */
+ @Test
+ public void testChangeHostDc() throws Exception
+ {
+ final InetAddress host = InetAddress.getByName("127.0.0.1");
+ final PropertyFileSnitch snitch = new PropertyFileSnitch(/*refreshPeriodInSeconds*/1);
+ YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC1");
+
+ try
+ {
+ setNodeLive(host);
+
+ Files.copy(effectiveFile, backupFile);
+ replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "DC2:RAC1"));
+
+ Thread.sleep(1500);
+ YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC1");
+
+ setNodeShutdown(host);
+ replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "DC2:RAC1"));
+
+ Thread.sleep(1500);
+ YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, host.getHostAddress(), "DC2", "RAC1");
+ }
+ finally
+ {
+ restoreOrigConfigFile();
+ setNodeShutdown(host);
+ }
+ }
+
+ /**
+ * Test that adding a host to the configuration file changes the host dc and rack only if the host
+ * is not live. The original configuration file does not contain 127.0.0.9 and so it should use
+ * the default default=DC1:r1.
+ */
+ @Test
+ public void testAddHost() throws Exception
+ {
+ final InetAddress host = InetAddress.getByName("127.0.0.9");
+ final PropertyFileSnitch snitch = new PropertyFileSnitch(/*refreshPeriodInSeconds*/1);
+ YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // default
+
+ try
+ {
+ setNodeLive(host);
+
+ Files.copy(effectiveFile, backupFile);
+ replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "DC2:RAC2")); // add this line if not yet there
+
+ Thread.sleep(1500);
+ YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // unchanged
+
+ setNodeShutdown(host);
+ replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "DC2:RAC2")); // add this line if not yet there
+
+ Thread.sleep(1500);
+ YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, host.getHostAddress(), "DC2", "RAC2"); // changed
+ }
+ finally
+ {
+ restoreOrigConfigFile();
+ setNodeShutdown(host);
+ }
+ }
+
+ /**
+ * Test that removing a host from the configuration file changes the host rack only if the host
+ * is not live. The original configuration file contains 127.0.0.2=DC1:RAC2 and default=DC1:r1 so removing
+ * this host should result in a different rack if the host is not live.
+ */
+ @Test
+ public void testRemoveHost() throws Exception
+ {
+ final InetAddress host = InetAddress.getByName("127.0.0.2");
+ final PropertyFileSnitch snitch = new PropertyFileSnitch(/*refreshPeriodInSeconds*/1);
+ YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC2");
+
+ try
+ {
+ setNodeLive(host);
+
+ Files.copy(effectiveFile, backupFile);
+ replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "")); // removes line if found
+
+ Thread.sleep(1500);
+ YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC2"); // unchanged
+
+ setNodeShutdown(host);
+ replaceConfigFile(Collections.singletonMap(host.getHostAddress(), "")); // removes line if found
+
+ Thread.sleep(1500);
+ YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // default
+ }
+ finally
+ {
+ restoreOrigConfigFile();
+ setNodeShutdown(host);
+ }
+ }
+
+ /**
+ * Test that we can change the default only if this does not result in any live node changing dc or rack.
+ * The configuration file contains default=DC1:r1 and we change it to default=DC2:r2. Let's use host 127.0.0.9
+ * since it is not in the configuration file.
+ */
+ @Test
+ public void testChangeDefault() throws Exception
+ {
+ final InetAddress host = InetAddress.getByName("127.0.0.9");
+ final PropertyFileSnitch snitch = new PropertyFileSnitch(/*refreshPeriodInSeconds*/1);
+ YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // default
+
+ try
+ {
+ setNodeLive(host);
+
+ Files.copy(effectiveFile, backupFile);
+ replaceConfigFile(Collections.singletonMap("default", "DC2:r2")); // change default
+
+ Thread.sleep(1500);
+ YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // unchanged
+
+ setNodeShutdown(host);
+ replaceConfigFile(Collections.singletonMap("default", "DC2:r2")); // change default again (refresh file update)
+
+ Thread.sleep(1500);
+ YamlFileNetworkTopologySnitchTest.checkEndpoint(snitch, host.getHostAddress(), "DC2", "r2"); // default updated
+ }
+ finally
+ {
+ restoreOrigConfigFile();
+ setNodeShutdown(host);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7650fc19/test/unit/org/apache/cassandra/locator/YamlFileNetworkTopologySnitchTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/YamlFileNetworkTopologySnitchTest.java b/test/unit/org/apache/cassandra/locator/YamlFileNetworkTopologySnitchTest.java
index af1a7e9..9507f25 100644
--- a/test/unit/org/apache/cassandra/locator/YamlFileNetworkTopologySnitchTest.java
+++ b/test/unit/org/apache/cassandra/locator/YamlFileNetworkTopologySnitchTest.java
@@ -17,11 +17,28 @@
*/
package org.apache.cassandra.locator;
+import java.io.IOException;
import java.net.InetAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.RandomPartitioner;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.locator.YamlFileNetworkTopologySnitch.*;
+
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import com.google.common.net.InetAddresses;
@@ -31,54 +48,58 @@ import com.google.common.net.InetAddresses;
*/
public class YamlFileNetworkTopologySnitchTest
{
+ private String confFile;
- /**
- * Testing variant of {@link YamlFileNetworkTopologySnitch}.
- *
- */
- private class TestYamlFileNetworkTopologySnitch
- extends YamlFileNetworkTopologySnitch
+ private VersionedValue.VersionedValueFactory valueFactory;
+ private Map<InetAddress, Set<Token>> tokenMap;
+
+ @Before
+ public void setup() throws ConfigurationException, IOException
{
+ confFile = YamlFileNetworkTopologySnitch.DEFAULT_TOPOLOGY_CONFIG_FILENAME;
+
+ InetAddress[] hosts = {
+ InetAddress.getByName("127.0.0.1"), // this exists in the config file
+ InetAddress.getByName("127.0.0.2"), // this exists in the config file
+ InetAddress.getByName("127.0.0.9"), // this does not exist in the config file
+ };
- /**
- * Constructor.
- *
- * @throws ConfigurationException
- * on configuration error
- */
- public TestYamlFileNetworkTopologySnitch(
- final String topologyConfigFilename)
- throws ConfigurationException
+ IPartitioner partitioner = new RandomPartitioner();
+ valueFactory = new VersionedValue.VersionedValueFactory(partitioner);
+ tokenMap = new HashMap<>();
+
+ for (InetAddress host : hosts)
{
- super(topologyConfigFilename);
+ Set<Token> tokens = Collections.singleton(partitioner.getRandomToken());
+ Gossiper.instance.initializeNodeUnsafe(host, UUID.randomUUID(), 1);
+ Gossiper.instance.injectApplicationState(host, ApplicationState.TOKENS, valueFactory.tokens(tokens));
+
+ setNodeShutdown(host);
+ tokenMap.put(host, tokens);
}
}
/**
* A basic test case.
- *
- * @throws Exception
- * on failure
+ *
+ * @throws Exception on failure
*/
@Test
public void testBasic() throws Exception
{
- final TestYamlFileNetworkTopologySnitch snitch = new TestYamlFileNetworkTopologySnitch(
- "cassandra-topology.yaml");
- checkEndpoint(snitch, FBUtilities.getBroadcastAddress()
- .getHostAddress(), "DC1", "RAC1");
+ final YamlFileNetworkTopologySnitch snitch = new YamlFileNetworkTopologySnitch(confFile);
+ checkEndpoint(snitch, FBUtilities.getBroadcastAddress().getHostAddress(), "DC1", "RAC1");
checkEndpoint(snitch, "192.168.1.100", "DC1", "RAC1");
checkEndpoint(snitch, "10.0.0.12", "DC1", "RAC2");
checkEndpoint(snitch, "127.0.0.3", "DC1", "RAC3");
checkEndpoint(snitch, "10.20.114.10", "DC2", "RAC1");
checkEndpoint(snitch, "127.0.0.8", "DC3", "RAC8");
checkEndpoint(snitch, "6.6.6.6", "DC1", "r1");
-
}
/**
* Asserts that a snitch's determination of data center and rack for an endpoint match what we expect.
- *
+ *
* @param snitch
* snitch
* @param endpointString
@@ -89,12 +110,230 @@ public class YamlFileNetworkTopologySnitchTest
* expected rack
*/
public static void checkEndpoint(final AbstractNetworkTopologySnitch snitch,
- final String endpointString, final String expectedDatacenter,
- final String expectedRack)
+ final String endpointString, final String expectedDatacenter,
+ final String expectedRack)
{
final InetAddress endpoint = InetAddresses.forString(endpointString);
Assert.assertEquals(expectedDatacenter, snitch.getDatacenter(endpoint));
Assert.assertEquals(expectedRack, snitch.getRack(endpoint));
}
+
+ private static TopologyConfig moveNode(TopologyConfig topologyConfig,
+ String broadcastAddress, String dcLocalAddress,
+ String oldDC, String newDC,
+ String oldRack, String newRack)
+ {
+
+ for (Datacenter dc : topologyConfig.topology)
+ {
+ if (oldDC != null && oldRack != null)
+ {
+ if (dc.dc_name.equals(oldDC))
+ {
+ for (Rack rack : dc.racks)
+ {
+ if (rack.rack_name.equals(oldRack))
+ {
+ for (Node node : rack.nodes)
+ {
+ if (node.broadcast_address.equals(broadcastAddress))
+ {
+ rack.nodes.remove(node);
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ if (newDC != null && newRack != null)
+ {
+ if (dc.dc_name.equals(newDC))
+ {
+ for (Rack rack : dc.racks)
+ {
+ if (rack.rack_name.equals(newRack))
+ {
+ Node node = new Node();
+ node.broadcast_address = broadcastAddress;
+ node.dc_local_address = dcLocalAddress;
+ rack.nodes.add(node);
+ }
+ }
+ }
+ }
+ }
+
+ return topologyConfig;
+ }
+
+ private void setNodeShutdown(InetAddress host)
+ {
+ StorageService.instance.getTokenMetadata().removeEndpoint(host);
+ Gossiper.instance.injectApplicationState(host, ApplicationState.STATUS, valueFactory.shutdown(true));
+ Gossiper.instance.markDead(host, Gossiper.instance.getEndpointStateForEndpoint(host));
+ }
+
+ private void setNodeLive(InetAddress host)
+ {
+ Gossiper.instance.injectApplicationState(host, ApplicationState.STATUS, valueFactory.normal(tokenMap.get(host)));
+ Gossiper.instance.realMarkAlive(host, Gossiper.instance.getEndpointStateForEndpoint(host));
+ StorageService.instance.getTokenMetadata().updateNormalTokens(tokenMap.get(host), host);
+ }
+
+ /**
+ * Test that changing rack for a host in the configuration file is only effective if the host is not live.
+ * The original configuration file contains DC1, RAC1 for broadcast address 127.0.0.1 and dc_local_address 9.0.0.1.
+ */
+ @Test
+ public void testChangeHostRack() throws Exception
+ {
+ final InetAddress host = InetAddress.getByName("127.0.0.1");
+ final YamlFileNetworkTopologySnitch snitch = new YamlFileNetworkTopologySnitch(confFile);
+ checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC1");
+
+ try
+ {
+ final TopologyConfig topologyConfig = snitch.readConfig();
+ moveNode(topologyConfig, host.getHostAddress(), "9.0.0.1", "DC1", "DC1", "RAC1", "RAC2");
+
+ setNodeLive(host);
+ snitch.loadTopologyConfiguration(true, topologyConfig);
+ checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC1");
+
+ setNodeShutdown(host);
+ snitch.loadTopologyConfiguration(true, topologyConfig);
+ checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC2");
+ }
+ finally
+ {
+ setNodeShutdown(host);
+ }
+ }
+
+ /**
+ * Test that changing dc for a host in the configuration file is only effective if the host is not live.
+ * The original configuration file contains DC1, RAC1 for broadcast address 127.0.0.1 and dc_local_address 9.0.0.1.
+ */
+ @Test
+ public void testChangeHostDc() throws Exception
+ {
+ final InetAddress host = InetAddress.getByName("127.0.0.1");
+ final YamlFileNetworkTopologySnitch snitch = new YamlFileNetworkTopologySnitch(confFile);
+ checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC1");
+
+ try
+ {
+ final TopologyConfig topologyConfig = snitch.readConfig();
+ moveNode(topologyConfig, host.getHostAddress(), "9.0.0.1", "DC1", "DC2", "RAC1", "RAC1");
+
+ setNodeLive(host);
+ snitch.loadTopologyConfiguration(true, topologyConfig);
+ checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC1");
+
+ setNodeShutdown(host);
+ snitch.loadTopologyConfiguration(true, topologyConfig);
+ checkEndpoint(snitch, host.getHostAddress(), "DC2", "RAC1");
+ }
+ finally
+ {
+ setNodeShutdown(host);
+ }
+ }
+
+ /**
+ * Test that adding a host to the configuration file changes the host dc and rack only if the host
+ * is not live. The original configuration file does not contain 127.0.0.9 and so it should use
+ * the default data center DC1 and rack r1.
+ */
+ @Test
+ public void testAddHost() throws Exception
+ {
+ final InetAddress host = InetAddress.getByName("127.0.0.9");
+ final YamlFileNetworkTopologySnitch snitch = new YamlFileNetworkTopologySnitch(confFile);
+ checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // default
+
+ try
+ {
+ final TopologyConfig topologyConfig = snitch.readConfig();
+ moveNode(topologyConfig, host.getHostAddress(), "9.0.0.9", null, "DC2", null, "RAC2");
+
+ setNodeLive(host);
+ snitch.loadTopologyConfiguration(true, topologyConfig);
+ checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // unchanged
+
+ setNodeShutdown(host);
+ snitch.loadTopologyConfiguration(true, topologyConfig);
+ checkEndpoint(snitch, host.getHostAddress(), "DC2", "RAC2"); // changed
+ }
+ finally
+ {
+ setNodeShutdown(host);
+ }
+ }
+
+ /**
+ * Test that removing a host from the configuration file changes the host rack only if the host
+ * is not live. The original configuration file contains 127.0.0.2 in DC1, RAC2 and default DC1, r1 so removing
+ * this host should result in a different rack if the host is not live.
+ */
+ @Test
+ public void testRemoveHost() throws Exception
+ {
+ final InetAddress host = InetAddress.getByName("127.0.0.2");
+ final YamlFileNetworkTopologySnitch snitch = new YamlFileNetworkTopologySnitch(confFile);
+ checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC2");
+
+ try
+ {
+ final TopologyConfig topologyConfig = snitch.readConfig();
+ moveNode(topologyConfig, host.getHostAddress(), "9.0.0.2", "DC1", null, "RAC2", null);
+
+ setNodeLive(host);
+ snitch.loadTopologyConfiguration(true, topologyConfig);
+ checkEndpoint(snitch, host.getHostAddress(), "DC1", "RAC2"); // unchanged
+
+ setNodeShutdown(host);
+ snitch.loadTopologyConfiguration(true, topologyConfig);
+ checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // default
+ }
+ finally
+ {
+ setNodeShutdown(host);
+ }
+ }
+
+ /**
+ * Test that we can change the default only if this does not result in any live node changing dc or rack.
+ * The configuration file contains default DC1 and r1 and we change it to DC2 and r2. Let's use host 127.0.0.9
+ * since it is not in the configuration file.
+ */
+ @Test
+ public void testChangeDefault() throws Exception
+ {
+ final InetAddress host = InetAddress.getByName("127.0.0.9");
+ final YamlFileNetworkTopologySnitch snitch = new YamlFileNetworkTopologySnitch(confFile);
+ checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // default
+
+ try
+ {
+ final TopologyConfig topologyConfig = snitch.readConfig();
+ topologyConfig.default_dc_name = "DC2";
+ topologyConfig.default_rack_name = "r2";
+
+ setNodeLive(host);
+ snitch.loadTopologyConfiguration(true, topologyConfig);
+ checkEndpoint(snitch, host.getHostAddress(), "DC1", "r1"); // unchanged
+
+ setNodeShutdown(host);
+ snitch.loadTopologyConfiguration(true, topologyConfig);
+ checkEndpoint(snitch, host.getHostAddress(), "DC2", "r2"); // default updated
+ }
+ finally
+ {
+ setNodeShutdown(host);
+ }
+ }
}