You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2020/02/05 10:57:16 UTC
[cassandra] branch cassandra-2.2 updated: Add an ability to run
bootstrap / streaming tests with in-JVM dtest framework.
This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch cassandra-2.2
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-2.2 by this push:
new 9705d82 Add an ability to run bootstrap / streaming tests with in-JVM dtest framework.
9705d82 is described below
commit 9705d823cddfe24356ba4f3f083b9371cdbdeb4d
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Tue Jan 14 15:56:59 2020 +0100
Add an ability to run bootstrap / streaming tests with in-JVM dtest framework.
Patch by Alex Petrov; reviewed by Marcus Eriksson for CASSANDRA-15497.
---
.../org/apache/cassandra/service/GCInspector.java | 3 +-
.../cassandra/distributed/api/IInstanceConfig.java | 4 +
.../distributed/impl/AbstractCluster.java | 170 +++++++++++++++------
.../distributed/impl/DistributedTestSnitch.java | 60 +++++++-
.../cassandra/distributed/impl/InstanceConfig.java | 10 +-
.../distributed/impl/NetworkTopology.java | 60 +++++++-
.../cassandra/distributed/test/BootstrapTest.java | 104 +++++++++++++
.../distributed/test/DistributedTestBase.java | 1 +
.../distributed/test/NetworkTopologyTest.java | 5 +-
9 files changed, 357 insertions(+), 60 deletions(-)
diff --git a/src/java/org/apache/cassandra/service/GCInspector.java b/src/java/org/apache/cassandra/service/GCInspector.java
index 31de151..4f93097 100644
--- a/src/java/org/apache/cassandra/service/GCInspector.java
+++ b/src/java/org/apache/cassandra/service/GCInspector.java
@@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.sstable.SSTableDeletingTask;
+import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.StatusLogger;
public class GCInspector implements NotificationListener, GCInspectorMXBean
@@ -147,7 +148,7 @@ public class GCInspector implements NotificationListener, GCInspectorMXBean
gcStates.put(gc.getName(), new GCState(gc, assumeGCIsPartiallyConcurrent(gc), assumeGCIsOldGen(gc)));
}
- mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
+ MBeanWrapper.instance.registerMBean(this, new ObjectName(MBEAN_NAME));
}
catch (Exception e)
{
diff --git a/test/distributed/org/apache/cassandra/distributed/api/IInstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/api/IInstanceConfig.java
index dd21b96..d2804c2 100644
--- a/test/distributed/org/apache/cassandra/distributed/api/IInstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/IInstanceConfig.java
@@ -47,6 +47,10 @@ public interface IInstanceConfig
*/
void propagate(Object writeToConfig);
+ /**
+ * Validates whether the config properties are within range of accepted values.
+ */
+ void validate();
Object get(String fieldName);
String getString(String fieldName);
int getInt(String fieldName);
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 1ee0c14..474ade8 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -59,7 +59,6 @@ import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
/**
@@ -101,6 +100,8 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
private final List<I> instances;
private final Map<InetAddressAndPort, I> instanceMap;
+ private final Versions.Version initialVersion;
+
// mutated by user-facing API
private final MessageFilters filters;
@@ -131,7 +132,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
private IInvokableInstance newInstance(int generation)
{
ClassLoader classLoader = new InstanceClassLoader(generation, config.num, version.classpath, sharedClassLoader);
- return Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, Instance>)Instance::new, classLoader)
+ return Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, Instance>) Instance::new, classLoader)
.apply(config, classLoader);
}
@@ -210,18 +211,19 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
}
}
- protected AbstractCluster(File root, Versions.Version version, List<InstanceConfig> configs,
+ protected AbstractCluster(File root, Versions.Version initialVersion, List<InstanceConfig> configs,
ClassLoader sharedClassLoader)
{
this.root = root;
this.sharedClassLoader = sharedClassLoader;
this.instances = new ArrayList<>();
this.instanceMap = new HashMap<>();
+ this.initialVersion = initialVersion;
int generation = AbstractCluster.generation.incrementAndGet();
for (InstanceConfig config : configs)
{
- I instance = newInstanceWrapper(generation, version, config);
+ I instance = newInstanceWrapperInternal(generation, initialVersion, config);
instances.add(instance);
// we use the config().broadcastAddressAndPort() here because we have not initialised the Instance
I prev = instanceMap.put(instance.broadcastAddressAndPort(), instance);
@@ -233,6 +235,32 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
protected abstract I newInstanceWrapper(int generation, Versions.Version version, InstanceConfig config);
+ protected I newInstanceWrapperInternal(int generation, Versions.Version version, InstanceConfig config)
+ {
+ config.validate();
+ return newInstanceWrapper(generation, version, config);
+ }
+
+ public I bootstrap(InstanceConfig config)
+ {
+ if (!config.has(Feature.GOSSIP) || !config.has(Feature.NETWORK))
+ throw new IllegalStateException("New nodes can only be bootstrapped when gossip and networking is enabled.");
+
+ I instance = newInstanceWrapperInternal(0, initialVersion, config);
+
+ instances.add(instance);
+ I prev = instanceMap.put(config.broadcastAddressAndPort(), instance);
+
+ if (null != prev)
+ {
+ throw new IllegalStateException(String.format("This cluster already contains a node (%d) with with same address and port: %s",
+ config.num,
+ instance));
+ }
+
+ return instance;
+ }
+
/**
* WARNING: we index from 1 here, for consistency with inet address!
*/
@@ -240,18 +268,29 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
{
return instances.get(node - 1).coordinator();
}
+
/**
* WARNING: we index from 1 here, for consistency with inet address!
*/
- public I get(int node) { return instances.get(node - 1); }
- public I get(InetAddressAndPort addr) { return instanceMap.get(addr); }
+ public I get(int node)
+ {
+ return instances.get(node - 1);
+ }
+
+ public I get(InetAddressAndPort addr)
+ {
+ return instanceMap.get(addr);
+ }
public int size()
{
return instances.size();
}
- public Stream<I> stream() { return instances.stream(); }
+ public Stream<I> stream()
+ {
+ return instances.stream();
+ }
public Stream<I> stream(String dcName)
{
@@ -264,13 +303,24 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
i.config().localRack().equals(rackName));
}
- public void forEach(IIsolatedExecutor.SerializableRunnable runnable) { forEach(i -> i.sync(runnable)); }
- public void forEach(Consumer<? super I> consumer) { forEach(instances, consumer); }
- public void forEach(List<I> instancesForOp, Consumer<? super I> consumer) { instancesForOp.forEach(consumer); }
+ public void forEach(IIsolatedExecutor.SerializableRunnable runnable)
+ {
+ forEach(i -> i.sync(runnable));
+ }
+
+ public void forEach(Consumer<? super I> consumer)
+ {
+ forEach(instances, consumer);
+ }
+
+ public void forEach(List<I> instancesForOp, Consumer<? super I> consumer)
+ {
+ instancesForOp.forEach(consumer);
+ }
public void parallelForEach(IIsolatedExecutor.SerializableConsumer<? super I> consumer, long timeout, TimeUnit unit)
{
- parallelForEach(instances, consumer, timeout, unit);
+ parallelForEach(instances, consumer, timeout, unit);
}
public void parallelForEach(List<I> instances, IIsolatedExecutor.SerializableConsumer<? super I> consumer, long timeout, TimeUnit unit)
@@ -316,12 +366,12 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
private void updateMessagingVersions()
{
- for (IInstance reportTo: instances)
+ for (IInstance reportTo : instances)
{
if (reportTo.isShutdown())
continue;
- for (IInstance reportFrom: instances)
+ for (IInstance reportFrom : instances)
{
if (reportFrom == reportTo || reportFrom.isShutdown())
continue;
@@ -391,15 +441,14 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
}
-
/**
* Will wait for a schema change AND agreement that occurs after it is created
* (and precedes the invocation to waitForAgreement)
- *
+ * <p>
* Works by simply checking if all UUIDs agree after any schema version change event,
* so long as the waitForAgreement method has been entered (indicating the change has
* taken place on the coordinator)
- *
+ * <p>
* This could perhaps be made a little more robust, but this should more than suffice.
*/
public class SchemaChangeMonitor extends ChangeMonitor
@@ -462,9 +511,11 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
// and then start any instances with it disabled in parallel.
List<I> startSequentially = new ArrayList<>();
List<I> startParallel = new ArrayList<>();
- for (I instance : instances)
+ for (int i = 0; i < instances.size(); i++)
{
- if ((boolean) instance.config().get("auto_bootstrap"))
+ I instance = instances.get(i);
+
+ if (i == 0 || (boolean) instance.config().get("auto_bootstrap"))
startSequentially.add(instance);
else
startParallel.add(instance);
@@ -486,9 +537,10 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
private final Factory<I, C> factory;
private int nodeCount;
private int subnet;
- private Map<Integer, Pair<String,String>> nodeIdTopology;
+ private Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology;
+ private TokenSupplier tokenSupplier;
private File root;
- private Versions.Version version;
+ private Versions.Version version = Versions.CURRENT;
private Consumer<InstanceConfig> configUpdater;
public Builder(Factory<I, C> factory)
@@ -496,13 +548,20 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
this.factory = factory;
}
+ public Builder<I, C> withTokenSupplier(TokenSupplier tokenSupplier)
+ {
+ this.tokenSupplier = tokenSupplier;
+ return this;
+ }
+
public Builder<I, C> withSubnet(int subnet)
{
this.subnet = subnet;
return this;
}
- public Builder<I, C> withNodes(int nodeCount) {
+ public Builder<I, C> withNodes(int nodeCount)
+ {
this.nodeCount = nodeCount;
return this;
}
@@ -534,7 +593,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
for (int rack = 1; rack <= racksPerDC; rack++)
{
for (int rackNodeIdx = 0; rackNodeIdx < nodesPerRack; rackNodeIdx++)
- nodeIdTopology.put(nodeId++, Pair.create(dcName(dc), rackName(rack)));
+ nodeIdTopology.put(nodeId++, NetworkTopology.dcAndRack(dcName(dc), rackName(rack)));
}
}
// adjust the node count to match the allocatation
@@ -564,14 +623,14 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
nodeIdTopology = new HashMap<>();
}
for (int nodeId = nodeCount + 1; nodeId <= nodeCount + nodesInRack; nodeId++)
- nodeIdTopology.put(nodeId, Pair.create(dcName, rackName));
+ nodeIdTopology.put(nodeId, NetworkTopology.dcAndRack(dcName, rackName));
nodeCount += nodesInRack;
return this;
}
// Map of node ids to dc and rack - must be contiguous with an entry nodeId 1 to nodeCount
- public Builder<I, C> withNodeIdTopology(Map<Integer,Pair<String,String>> nodeIdTopology)
+ public Builder<I, C> withNodeIdTopology(Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology)
{
if (nodeIdTopology.isEmpty())
throw new IllegalStateException("Topology is empty. It must have an entry for every nodeId.");
@@ -585,7 +644,6 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
{
nodeCount = nodeIdTopology.size();
logger.info("Adjusting node count to {} for supplied network topology", nodeCount);
-
}
this.nodeIdTopology = new HashMap<>(nodeIdTopology);
@@ -613,22 +671,18 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
public C createWithoutStarting() throws IOException
{
- File root = this.root;
- Versions.Version version = this.version;
-
if (root == null)
root = Files.createTempDirectory("dtests").toFile();
- if (version == null)
- version = Versions.CURRENT;
-
if (nodeCount <= 0)
throw new IllegalStateException("Cluster must have at least one node");
if (nodeIdTopology == null)
+ {
nodeIdTopology = IntStream.rangeClosed(1, nodeCount).boxed()
.collect(Collectors.toMap(nodeId -> nodeId,
- nodeId -> Pair.create(dcName(0), rackName(0))));
+ nodeId -> NetworkTopology.dcAndRack(dcName(0), rackName(0))));
+ }
root.mkdirs();
setupLogging(root);
@@ -636,27 +690,40 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
ClassLoader sharedClassLoader = Thread.currentThread().getContextClassLoader();
List<InstanceConfig> configs = new ArrayList<>();
- long token = Long.MIN_VALUE + 1, increment = 2 * (Long.MAX_VALUE / nodeCount);
-
- String ipPrefix = "127.0." + subnet + ".";
- String seedIp = ipPrefix + "1";
- NetworkTopology networkTopology = NetworkTopology.build(ipPrefix, 7012, nodeIdTopology);
+ if (tokenSupplier == null)
+ tokenSupplier = evenlyDistributedTokens(nodeCount);
- for (int i = 0 ; i < nodeCount ; ++i)
+ for (int i = 0; i < nodeCount; ++i)
{
int nodeNum = i + 1;
- String ipAddress = ipPrefix + nodeNum;
- InstanceConfig config = InstanceConfig.generate(i + 1, ipAddress, networkTopology, root, String.valueOf(token), seedIp);
- if (configUpdater != null)
- configUpdater.accept(config);
- configs.add(config);
- token += increment;
+ configs.add(createInstanceConfig(nodeNum));
}
return factory.newCluster(root, version, configs, sharedClassLoader);
}
+ public InstanceConfig newInstanceConfig(C cluster)
+ {
+ return createInstanceConfig(cluster.size() + 1);
+ }
+
+ private InstanceConfig createInstanceConfig(int nodeNum)
+ {
+ String ipPrefix = "127.0." + subnet + ".";
+ String seedIp = ipPrefix + "1";
+ String ipAddress = ipPrefix + nodeNum;
+ long token = tokenSupplier.token(nodeNum);
+
+ NetworkTopology topology = NetworkTopology.build(ipPrefix, 7012, nodeIdTopology);
+
+ InstanceConfig config = InstanceConfig.generate(nodeNum, ipAddress, topology, root, String.valueOf(token), seedIp);
+ if (configUpdater != null)
+ configUpdater.accept(config);
+
+ return config;
+ }
+
public C start() throws IOException
{
C cluster = createWithoutStarting();
@@ -665,6 +732,21 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
}
}
+ public static TokenSupplier evenlyDistributedTokens(int numNodes)
+ {
+ long increment = (Long.MAX_VALUE / numNodes) * 2;
+ return (int nodeId) -> {
+ assert nodeId <= numNodes : String.format("Can not allocate a token for a node %s, since only %s nodes are allowed by the token allocation strategy",
+ nodeId, numNodes);
+ return Long.MIN_VALUE + 1 + nodeId * increment;
+ };
+ }
+
+ public static interface TokenSupplier
+ {
+ public long token(int nodeId);
+ }
+
static String dcName(int index)
{
return "datacenter" + index;
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/DistributedTestSnitch.java b/test/distributed/org/apache/cassandra/distributed/impl/DistributedTestSnitch.java
index 35e2903..f8f157a 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/DistributedTestSnitch.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/DistributedTestSnitch.java
@@ -19,43 +19,91 @@
package org.apache.cassandra.distributed.impl;
import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.cassandra.config.Config;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.AbstractNetworkTopologySnitch;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
public class DistributedTestSnitch extends AbstractNetworkTopologySnitch
{
private static NetworkTopology mapping = null;
+ private Map<InetAddressAndPort, Map<String, String>> savedEndpoints;
+ private static final String DEFAULT_DC = "UNKNOWN_DC";
+ private static final String DEFAULT_RACK = "UNKNOWN_RACK";
+
public String getRack(InetAddress endpoint)
{
- assert mapping != null : "network topology must be assigned before using snitch";
int storage_port = Config.getOverrideLoadConfig().get().storage_port;
- return mapping.localRack(InetAddressAndPort.getByAddressOverrideDefaults(endpoint, storage_port));
+ return getRack(InetAddressAndPort.getByAddressOverrideDefaults(endpoint, storage_port));
}
public String getRack(InetAddressAndPort endpoint)
{
assert mapping != null : "network topology must be assigned before using snitch";
- return mapping.localRack(endpoint);
+ return maybeGetFromEndpointState(mapping.localRack(endpoint), endpoint, ApplicationState.RACK, DEFAULT_RACK);
}
public String getDatacenter(InetAddress endpoint)
{
- assert mapping != null : "network topology must be assigned before using snitch";
int storage_port = Config.getOverrideLoadConfig().get().storage_port;
- return mapping.localDC(InetAddressAndPort.getByAddressOverrideDefaults(endpoint, storage_port));
+ return getDatacenter(InetAddressAndPort.getByAddressOverrideDefaults(endpoint, storage_port));
}
public String getDatacenter(InetAddressAndPort endpoint)
{
assert mapping != null : "network topology must be assigned before using snitch";
- return mapping.localDC(endpoint);
+ return maybeGetFromEndpointState(mapping.localDC(endpoint), endpoint, ApplicationState.DC, DEFAULT_DC);
+ }
+
+ // Here, the logic is slightly different from what we have in GossipingPropertyFileSnitch since we have a different
+ // goal. Passed argument (topology that was set on the node) overrides anything that is passed elsewhere.
+ private String maybeGetFromEndpointState(String current, InetAddressAndPort endpoint, ApplicationState state, String defaultValue)
+ {
+ if (current != null)
+ return current;
+
+ EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint.address);
+ if (epState == null || epState.getApplicationState(state) == null)
+ {
+ if (savedEndpoints == null)
+ {
+ savedEndpoints = new HashMap<>();
+ int storage_port = Config.getOverrideLoadConfig().get().storage_port;
+ for (Map.Entry<InetAddress, Map<String, String>> entry : SystemKeyspace.loadDcRackInfo().entrySet())
+ {
+ savedEndpoints.put(InetAddressAndPort.getByAddressOverrideDefaults(endpoint.address, storage_port),
+ entry.getValue());
+ }
+ }
+
+ if (savedEndpoints.containsKey(endpoint))
+ return savedEndpoints.get(endpoint).get("data_center");
+
+ return defaultValue;
+ }
+
+ return epState.getApplicationState(state).value;
}
static void assign(NetworkTopology newMapping)
{
mapping = new NetworkTopology(newMapping);
}
+
+ public void gossiperStarting()
+ {
+ super.gossiperStarting();
+
+ Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_IP,
+ StorageService.instance.valueFactory.internalIP(FBUtilities.getLocalAddress().getHostAddress()));
+ }
}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
index c53012b..6d668e6 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
@@ -22,6 +22,7 @@ import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.SimpleSeedProvider;
@@ -92,7 +93,8 @@ public class InstanceConfig implements IInstanceConfig
this.num = num;
this.networkTopology = networkTopology;
this.hostId = java.util.UUID.randomUUID();
- this .set("broadcast_address", broadcast_address)
+ this .set("num_tokens", 1)
+ .set("broadcast_address", broadcast_address)
.set("listen_address", listen_address)
.set("broadcast_rpc_address", broadcast_rpc_address)
.set("rpc_address", rpc_address)
@@ -187,6 +189,12 @@ public class InstanceConfig implements IInstanceConfig
propagate(writeToConfig, e.getKey(), e.getValue(), true);
}
+ public void validate()
+ {
+ if (((int) get("num_tokens")) > 1)
+ throw new IllegalArgumentException("In-JVM dtests do not support vnodes as of now.");
+ }
+
private void propagate(Object writeToConfig, String fieldName, Object value, boolean ignoreMissing)
{
if (value == NULL)
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/NetworkTopology.java b/test/distributed/org/apache/cassandra/distributed/impl/NetworkTopology.java
index 1176b32..f7c31ff 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/NetworkTopology.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/NetworkTopology.java
@@ -22,6 +22,9 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
+import java.util.function.IntFunction;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.InetAddressAndPort;
@@ -29,7 +32,24 @@ import org.apache.cassandra.utils.Pair;
public class NetworkTopology
{
- private final Map<InetAddressAndPort, Pair<String, String>> map;
+ private final Map<InetAddressAndPort, DcAndRack> map;
+
+ public static class DcAndRack
+ {
+ private final String dc;
+ private final String rack;
+
+ private DcAndRack(String dc, String rack)
+ {
+ this.dc = dc;
+ this.rack = rack;
+ }
+ }
+
+ public static DcAndRack dcAndRack(String dc, String rack)
+ {
+ return new DcAndRack(dc, rack);
+ }
private NetworkTopology() {
map = new HashMap<>();
@@ -41,7 +61,7 @@ public class NetworkTopology
map = new HashMap<>(networkTopology.map);
}
- public static NetworkTopology build(String ipPrefix, int broadcastPort, Map<Integer, Pair<String, String>> nodeIdTopology)
+ public static NetworkTopology build(String ipPrefix, int broadcastPort, Map<Integer, DcAndRack> nodeIdTopology)
{
final NetworkTopology topology = new NetworkTopology();
@@ -51,7 +71,7 @@ public class NetworkTopology
try
{
- Pair<String,String> dcAndRack = nodeIdTopology.get(nodeId);
+ DcAndRack dcAndRack = nodeIdTopology.get(nodeId);
if (dcAndRack == null)
throw new IllegalStateException("nodeId " + nodeId + "not found in instanceMap");
@@ -67,23 +87,51 @@ public class NetworkTopology
return topology;
}
- public Pair<String, String> put(InetAddressAndPort key, Pair<String, String> value)
+ public DcAndRack put(InetAddressAndPort key, DcAndRack value)
{
return map.put(key, value);
}
public String localRack(InetAddressAndPort key)
{
- return map.get(key).right;
+ DcAndRack p = map.get(key);
+ if (p == null)
+ return null;
+ return p.rack;
}
public String localDC(InetAddressAndPort key)
{
- return map.get(key).left;
+ DcAndRack p = map.get(key);
+ if (p == null)
+ return null;
+ return p.dc;
}
public boolean contains(InetAddressAndPort key)
{
return map.containsKey(key);
}
+
+ public String toString()
+ {
+ return "NetworkTopology{" + map + '}';
+ }
+
+
+ public static Map<Integer, NetworkTopology.DcAndRack> singleDcNetworkTopology(int nodeCount,
+ String dc,
+ String rack)
+ {
+ return networkTopology(nodeCount, (nodeid) -> NetworkTopology.dcAndRack(dc, rack));
+ }
+
+ public static Map<Integer, NetworkTopology.DcAndRack> networkTopology(int nodeCount,
+ IntFunction<DcAndRack> dcAndRackSupplier)
+ {
+
+ return IntStream.rangeClosed(1, nodeCount).boxed()
+ .collect(Collectors.toMap(nodeId -> nodeId,
+ dcAndRackSupplier::apply));
+ }
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/BootstrapTest.java b/test/distributed/org/apache/cassandra/distributed/test/BootstrapTest.java
new file mode 100644
index 0000000..0af26ea
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/BootstrapTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.impl.IInvokableInstance;
+import org.apache.cassandra.distributed.impl.InstanceConfig;
+import org.apache.cassandra.distributed.impl.NetworkTopology;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class BootstrapTest extends DistributedTestBase
+{
+
+ @Test
+ public void bootstrapTest() throws Throwable
+ {
+ int originalNodeCount = 2;
+ int expandedNodeCount = originalNodeCount + 1;
+ Cluster.Builder<IInvokableInstance, Cluster> builder = Cluster.build(originalNodeCount)
+ .withTokenSupplier(Cluster.evenlyDistributedTokens(expandedNodeCount))
+ .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(originalNodeCount, "dc0", "rack0"))
+ .withConfig(config -> config.with(NETWORK, GOSSIP));
+
+ Map<Integer, Long> withBootstrap = null;
+ Map<Integer, Long> naturally = null;
+
+ try (Cluster cluster = builder.start())
+ {
+ populate(cluster);
+
+ InstanceConfig config = builder.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
+ .newInstanceConfig(cluster);
+ config.set("auto_bootstrap", true);
+
+ IInstance newInstance = cluster.bootstrap(config);
+ newInstance.startup();
+
+ cluster.stream().forEach(instance -> {
+ instance.nodetool("cleanup", KEYSPACE, "tbl");
+ });
+
+ withBootstrap = count(cluster);
+ }
+
+ builder = Cluster.build(expandedNodeCount)
+ .withTokenSupplier(Cluster.evenlyDistributedTokens(expandedNodeCount))
+ .withConfig(config -> config.with(NETWORK, GOSSIP));
+
+ try (Cluster cluster = builder.start())
+ {
+ populate(cluster);
+ naturally = count(cluster);
+ }
+
+ Assert.assertEquals(withBootstrap, naturally);
+ }
+
+ public void populate(Cluster cluster)
+ {
+ cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + 3 + "};");
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+ for (int i = 0; i < 1000; i++)
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, ?, ?)",
+ ConsistencyLevel.QUORUM,
+ i, i, i);
+ }
+
+ public Map<Integer, Long> count(Cluster cluster)
+ {
+ return IntStream.rangeClosed(1, cluster.size())
+ .boxed()
+ .collect(Collectors.toMap(nodeId -> nodeId,
+ nodeId -> (Long) cluster.get(nodeId).executeInternal("SELECT count(*) FROM " + KEYSPACE + ".tbl")[0][0]));
+ }
+
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
index 745e1ab..7a3d52d 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
@@ -67,6 +67,7 @@ public class DistributedTestBase
@BeforeClass
public static void setup()
{
+ System.setProperty("cassandra.ring_delay_ms", Integer.toString(10 * 1000));
System.setProperty("org.apache.cassandra.disable_mbean_registration", "true");
nativeLibraryWorkaround();
processReaperWorkaround();
diff --git a/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java b/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java
index 2c4f9c8..a9c2cee 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.impl.NetworkTopology;
import org.apache.cassandra.utils.Pair;
public class NetworkTopologyTest extends DistributedTestBase
@@ -36,7 +37,7 @@ public class NetworkTopologyTest extends DistributedTestBase
public void namedDcTest() throws Throwable
{
try (Cluster cluster = Cluster.build()
- .withNodeIdTopology(Collections.singletonMap(1, Pair.create("somewhere", "rack0")))
+ .withNodeIdTopology(Collections.singletonMap(1, NetworkTopology.dcAndRack("somewhere", "rack0")))
.withRack("elsewhere", "firstrack", 1)
.withRack("elsewhere", "secondrack", 2)
.withDC("nearthere", 4)
@@ -94,6 +95,6 @@ public class NetworkTopologyTest extends DistributedTestBase
@Test(expected = IllegalStateException.class)
public void noHolesInNodeIdTopologyTest()
{
- Cluster.build().withNodeIdTopology(Collections.singletonMap(2, Pair.create("doomed", "rack")));
+ Cluster.build().withNodeIdTopology(Collections.singletonMap(2, NetworkTopology.dcAndRack("doomed", "rack")));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org