You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2020/02/05 10:57:16 UTC

[cassandra] branch cassandra-2.2 updated: Add an ability to run bootstrap / streaming tests with in-JVM dtest framework.

This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch cassandra-2.2
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-2.2 by this push:
     new 9705d82  Add an ability to run bootstrap / streaming tests with in-JVM dtest framework.
9705d82 is described below

commit 9705d823cddfe24356ba4f3f083b9371cdbdeb4d
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Tue Jan 14 15:56:59 2020 +0100

    Add an ability to run bootstrap / streaming tests with in-JVM dtest framework.
    
    Patch by Alex Petrov; reviewed by Marcus Eriksson for CASSANDRA-15497.
---
 .../org/apache/cassandra/service/GCInspector.java  |   3 +-
 .../cassandra/distributed/api/IInstanceConfig.java |   4 +
 .../distributed/impl/AbstractCluster.java          | 170 +++++++++++++++------
 .../distributed/impl/DistributedTestSnitch.java    |  60 +++++++-
 .../cassandra/distributed/impl/InstanceConfig.java |  10 +-
 .../distributed/impl/NetworkTopology.java          |  60 +++++++-
 .../cassandra/distributed/test/BootstrapTest.java  | 104 +++++++++++++
 .../distributed/test/DistributedTestBase.java      |   1 +
 .../distributed/test/NetworkTopologyTest.java      |   5 +-
 9 files changed, 357 insertions(+), 60 deletions(-)

diff --git a/src/java/org/apache/cassandra/service/GCInspector.java b/src/java/org/apache/cassandra/service/GCInspector.java
index 31de151..4f93097 100644
--- a/src/java/org/apache/cassandra/service/GCInspector.java
+++ b/src/java/org/apache/cassandra/service/GCInspector.java
@@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 
 import org.apache.cassandra.io.sstable.SSTableDeletingTask;
+import org.apache.cassandra.utils.MBeanWrapper;
 import org.apache.cassandra.utils.StatusLogger;
 
 public class GCInspector implements NotificationListener, GCInspectorMXBean
@@ -147,7 +148,7 @@ public class GCInspector implements NotificationListener, GCInspectorMXBean
                 gcStates.put(gc.getName(), new GCState(gc, assumeGCIsPartiallyConcurrent(gc), assumeGCIsOldGen(gc)));
             }
 
-            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
+            MBeanWrapper.instance.registerMBean(this, new ObjectName(MBEAN_NAME));
         }
         catch (Exception e)
         {
diff --git a/test/distributed/org/apache/cassandra/distributed/api/IInstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/api/IInstanceConfig.java
index dd21b96..d2804c2 100644
--- a/test/distributed/org/apache/cassandra/distributed/api/IInstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/IInstanceConfig.java
@@ -47,6 +47,10 @@ public interface IInstanceConfig
      */
     void propagate(Object writeToConfig);
 
+    /**
+     * Validates whether the config properties are within range of accepted values.
+     */
+    void validate();
     Object get(String fieldName);
     String getString(String fieldName);
     int getInt(String fieldName);
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 1ee0c14..474ade8 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -59,7 +59,6 @@ import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
 /**
@@ -101,6 +100,8 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
     private final List<I> instances;
     private final Map<InetAddressAndPort, I> instanceMap;
 
+    private final Versions.Version initialVersion;
+
     // mutated by user-facing API
     private final MessageFilters filters;
 
@@ -131,7 +132,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
         private IInvokableInstance newInstance(int generation)
         {
             ClassLoader classLoader = new InstanceClassLoader(generation, config.num, version.classpath, sharedClassLoader);
-            return Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, Instance>)Instance::new, classLoader)
+            return Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, Instance>) Instance::new, classLoader)
                            .apply(config, classLoader);
         }
 
@@ -210,18 +211,19 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
         }
     }
 
-    protected AbstractCluster(File root, Versions.Version version, List<InstanceConfig> configs,
+    protected AbstractCluster(File root, Versions.Version initialVersion, List<InstanceConfig> configs,
                               ClassLoader sharedClassLoader)
     {
         this.root = root;
         this.sharedClassLoader = sharedClassLoader;
         this.instances = new ArrayList<>();
         this.instanceMap = new HashMap<>();
+        this.initialVersion = initialVersion;
         int generation = AbstractCluster.generation.incrementAndGet();
 
         for (InstanceConfig config : configs)
         {
-            I instance = newInstanceWrapper(generation, version, config);
+            I instance = newInstanceWrapperInternal(generation, initialVersion, config);
             instances.add(instance);
             // we use the config().broadcastAddressAndPort() here because we have not initialised the Instance
             I prev = instanceMap.put(instance.broadcastAddressAndPort(), instance);
@@ -233,6 +235,32 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
 
     protected abstract I newInstanceWrapper(int generation, Versions.Version version, InstanceConfig config);
 
+    protected I newInstanceWrapperInternal(int generation, Versions.Version version, InstanceConfig config)
+    {
+        config.validate();
+        return newInstanceWrapper(generation, version, config);
+    }
+
+    public I bootstrap(InstanceConfig config)
+    {
+        if (!config.has(Feature.GOSSIP) || !config.has(Feature.NETWORK))
+            throw new IllegalStateException("New nodes can only be bootstrapped when gossip and networking is enabled.");
+
+        I instance = newInstanceWrapperInternal(0, initialVersion, config);
+
+        instances.add(instance);
+        I prev = instanceMap.put(config.broadcastAddressAndPort(), instance);
+
+        if (null != prev)
+        {
+            throw new IllegalStateException(String.format("This cluster already contains a node (%d) with with same address and port: %s",
+                                                          config.num,
+                                                          instance));
+        }
+
+        return instance;
+    }
+
     /**
      * WARNING: we index from 1 here, for consistency with inet address!
      */
@@ -240,18 +268,29 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
     {
         return instances.get(node - 1).coordinator();
     }
+
     /**
      * WARNING: we index from 1 here, for consistency with inet address!
      */
-    public I get(int node) { return instances.get(node - 1); }
-    public I get(InetAddressAndPort addr) { return instanceMap.get(addr); }
+    public I get(int node)
+    {
+        return instances.get(node - 1);
+    }
+
+    public I get(InetAddressAndPort addr)
+    {
+        return instanceMap.get(addr);
+    }
 
     public int size()
     {
         return instances.size();
     }
 
-    public Stream<I> stream() { return instances.stream(); }
+    public Stream<I> stream()
+    {
+        return instances.stream();
+    }
 
     public Stream<I> stream(String dcName)
     {
@@ -264,13 +303,24 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
                                               i.config().localRack().equals(rackName));
     }
 
-    public void forEach(IIsolatedExecutor.SerializableRunnable runnable) { forEach(i -> i.sync(runnable)); }
-    public void forEach(Consumer<? super I> consumer) { forEach(instances, consumer); }
-    public void forEach(List<I> instancesForOp, Consumer<? super I> consumer) { instancesForOp.forEach(consumer); }
+    public void forEach(IIsolatedExecutor.SerializableRunnable runnable)
+    {
+        forEach(i -> i.sync(runnable));
+    }
+
+    public void forEach(Consumer<? super I> consumer)
+    {
+        forEach(instances, consumer);
+    }
+
+    public void forEach(List<I> instancesForOp, Consumer<? super I> consumer)
+    {
+        instancesForOp.forEach(consumer);
+    }
 
     public void parallelForEach(IIsolatedExecutor.SerializableConsumer<? super I> consumer, long timeout, TimeUnit unit)
     {
-            parallelForEach(instances, consumer, timeout, unit);
+        parallelForEach(instances, consumer, timeout, unit);
     }
 
     public void parallelForEach(List<I> instances, IIsolatedExecutor.SerializableConsumer<? super I> consumer, long timeout, TimeUnit unit)
@@ -316,12 +366,12 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
 
     private void updateMessagingVersions()
     {
-        for (IInstance reportTo: instances)
+        for (IInstance reportTo : instances)
         {
             if (reportTo.isShutdown())
                 continue;
 
-            for (IInstance reportFrom: instances)
+            for (IInstance reportFrom : instances)
             {
                 if (reportFrom == reportTo || reportFrom.isShutdown())
                     continue;
@@ -391,15 +441,14 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
     }
 
 
-
     /**
      * Will wait for a schema change AND agreement that occurs after it is created
      * (and precedes the invocation to waitForAgreement)
-     *
+     * <p>
      * Works by simply checking if all UUIDs agree after any schema version change event,
      * so long as the waitForAgreement method has been entered (indicating the change has
      * taken place on the coordinator)
-     *
+     * <p>
      * This could perhaps be made a little more robust, but this should more than suffice.
      */
     public class SchemaChangeMonitor extends ChangeMonitor
@@ -462,9 +511,11 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
             // and then start any instances with it disabled in parallel.
             List<I> startSequentially = new ArrayList<>();
             List<I> startParallel = new ArrayList<>();
-            for (I instance : instances)
+            for (int i = 0; i < instances.size(); i++)
             {
-                if ((boolean) instance.config().get("auto_bootstrap"))
+                I instance = instances.get(i);
+
+                if (i == 0 || (boolean) instance.config().get("auto_bootstrap"))
                     startSequentially.add(instance);
                 else
                     startParallel.add(instance);
@@ -486,9 +537,10 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
         private final Factory<I, C> factory;
         private int nodeCount;
         private int subnet;
-        private Map<Integer, Pair<String,String>> nodeIdTopology;
+        private Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology;
+        private TokenSupplier tokenSupplier;
         private File root;
-        private Versions.Version version;
+        private Versions.Version version = Versions.CURRENT;
         private Consumer<InstanceConfig> configUpdater;
 
         public Builder(Factory<I, C> factory)
@@ -496,13 +548,20 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
             this.factory = factory;
         }
 
+        public Builder<I, C> withTokenSupplier(TokenSupplier tokenSupplier)
+        {
+            this.tokenSupplier = tokenSupplier;
+            return this;
+        }
+
         public Builder<I, C> withSubnet(int subnet)
         {
             this.subnet = subnet;
             return this;
         }
 
-        public Builder<I, C> withNodes(int nodeCount) {
+        public Builder<I, C> withNodes(int nodeCount)
+        {
             this.nodeCount = nodeCount;
             return this;
         }
@@ -534,7 +593,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
                 for (int rack = 1; rack <= racksPerDC; rack++)
                 {
                     for (int rackNodeIdx = 0; rackNodeIdx < nodesPerRack; rackNodeIdx++)
-                        nodeIdTopology.put(nodeId++, Pair.create(dcName(dc), rackName(rack)));
+                        nodeIdTopology.put(nodeId++, NetworkTopology.dcAndRack(dcName(dc), rackName(rack)));
                 }
             }
             // adjust the node count to match the allocatation
@@ -564,14 +623,14 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
                 nodeIdTopology = new HashMap<>();
             }
             for (int nodeId = nodeCount + 1; nodeId <= nodeCount + nodesInRack; nodeId++)
-                nodeIdTopology.put(nodeId, Pair.create(dcName, rackName));
+                nodeIdTopology.put(nodeId, NetworkTopology.dcAndRack(dcName, rackName));
 
             nodeCount += nodesInRack;
             return this;
         }
 
         // Map of node ids to dc and rack - must be contiguous with an entry nodeId 1 to nodeCount
-        public Builder<I, C> withNodeIdTopology(Map<Integer,Pair<String,String>> nodeIdTopology)
+        public Builder<I, C> withNodeIdTopology(Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology)
         {
             if (nodeIdTopology.isEmpty())
                 throw new IllegalStateException("Topology is empty. It must have an entry for every nodeId.");
@@ -585,7 +644,6 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
             {
                 nodeCount = nodeIdTopology.size();
                 logger.info("Adjusting node count to {} for supplied network topology", nodeCount);
-
             }
 
             this.nodeIdTopology = new HashMap<>(nodeIdTopology);
@@ -613,22 +671,18 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
 
         public C createWithoutStarting() throws IOException
         {
-            File root = this.root;
-            Versions.Version version = this.version;
-
             if (root == null)
                 root = Files.createTempDirectory("dtests").toFile();
 
-            if (version == null)
-                version = Versions.CURRENT;
-
             if (nodeCount <= 0)
                 throw new IllegalStateException("Cluster must have at least one node");
 
             if (nodeIdTopology == null)
+            {
                 nodeIdTopology = IntStream.rangeClosed(1, nodeCount).boxed()
                                           .collect(Collectors.toMap(nodeId -> nodeId,
-                                                                    nodeId -> Pair.create(dcName(0), rackName(0))));
+                                                                    nodeId -> NetworkTopology.dcAndRack(dcName(0), rackName(0))));
+            }
 
             root.mkdirs();
             setupLogging(root);
@@ -636,27 +690,40 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
             ClassLoader sharedClassLoader = Thread.currentThread().getContextClassLoader();
 
             List<InstanceConfig> configs = new ArrayList<>();
-            long token = Long.MIN_VALUE + 1, increment = 2 * (Long.MAX_VALUE / nodeCount);
-
-            String ipPrefix = "127.0." + subnet + ".";
-            String seedIp = ipPrefix + "1";
 
-            NetworkTopology networkTopology = NetworkTopology.build(ipPrefix, 7012, nodeIdTopology);
+            if (tokenSupplier == null)
+                tokenSupplier = evenlyDistributedTokens(nodeCount);
 
-            for (int i = 0 ; i < nodeCount ; ++i)
+            for (int i = 0; i < nodeCount; ++i)
             {
                 int nodeNum = i + 1;
-                String ipAddress = ipPrefix + nodeNum;
-                InstanceConfig config = InstanceConfig.generate(i + 1, ipAddress, networkTopology, root, String.valueOf(token), seedIp);
-                if (configUpdater != null)
-                    configUpdater.accept(config);
-                configs.add(config);
-                token += increment;
+                configs.add(createInstanceConfig(nodeNum));
             }
 
             return factory.newCluster(root, version, configs, sharedClassLoader);
         }
 
+        public InstanceConfig newInstanceConfig(C cluster)
+        {
+            return createInstanceConfig(cluster.size() + 1);
+        }
+
+        private InstanceConfig createInstanceConfig(int nodeNum)
+        {
+            String ipPrefix = "127.0." + subnet + ".";
+            String seedIp = ipPrefix + "1";
+            String ipAddress = ipPrefix + nodeNum;
+            long token = tokenSupplier.token(nodeNum);
+
+            NetworkTopology topology = NetworkTopology.build(ipPrefix, 7012, nodeIdTopology);
+
+            InstanceConfig config = InstanceConfig.generate(nodeNum, ipAddress, topology, root, String.valueOf(token), seedIp);
+            if (configUpdater != null)
+                configUpdater.accept(config);
+
+            return config;
+        }
+
         public C start() throws IOException
         {
             C cluster = createWithoutStarting();
@@ -665,6 +732,21 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
         }
     }
 
+    public static TokenSupplier evenlyDistributedTokens(int numNodes)
+    {
+        long increment = (Long.MAX_VALUE / numNodes) * 2;
+        return (int nodeId) -> {
+            assert nodeId <= numNodes : String.format("Can not allocate a token for a node %s, since only %s nodes are allowed by the token allocation strategy",
+                                                      nodeId, numNodes);
+            return Long.MIN_VALUE + 1 + nodeId * increment;
+        };
+    }
+
+    public static interface TokenSupplier
+    {
+        public long token(int nodeId);
+    }
+
     static String dcName(int index)
     {
         return "datacenter" + index;
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/DistributedTestSnitch.java b/test/distributed/org/apache/cassandra/distributed/impl/DistributedTestSnitch.java
index 35e2903..f8f157a 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/DistributedTestSnitch.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/DistributedTestSnitch.java
@@ -19,43 +19,91 @@
 package org.apache.cassandra.distributed.impl;
 
 import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.cassandra.config.Config;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.AbstractNetworkTopologySnitch;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class DistributedTestSnitch extends AbstractNetworkTopologySnitch
 {
     private static NetworkTopology mapping = null;
 
+    private Map<InetAddressAndPort, Map<String, String>> savedEndpoints;
+    private static final String DEFAULT_DC = "UNKNOWN_DC";
+    private static final String DEFAULT_RACK = "UNKNOWN_RACK";
+
     public String getRack(InetAddress endpoint)
     {
-        assert mapping != null : "network topology must be assigned before using snitch";
         int storage_port = Config.getOverrideLoadConfig().get().storage_port;
-        return mapping.localRack(InetAddressAndPort.getByAddressOverrideDefaults(endpoint, storage_port));
+        return getRack(InetAddressAndPort.getByAddressOverrideDefaults(endpoint, storage_port));
     }
 
     public String getRack(InetAddressAndPort endpoint)
     {
         assert mapping != null : "network topology must be assigned before using snitch";
-        return mapping.localRack(endpoint);
+        return maybeGetFromEndpointState(mapping.localRack(endpoint), endpoint, ApplicationState.RACK, DEFAULT_RACK);
     }
 
     public String getDatacenter(InetAddress endpoint)
     {
-        assert mapping != null : "network topology must be assigned before using snitch";
         int storage_port = Config.getOverrideLoadConfig().get().storage_port;
-        return mapping.localDC(InetAddressAndPort.getByAddressOverrideDefaults(endpoint, storage_port));
+        return getDatacenter(InetAddressAndPort.getByAddressOverrideDefaults(endpoint, storage_port));
     }
 
     public String getDatacenter(InetAddressAndPort endpoint)
     {
         assert mapping != null : "network topology must be assigned before using snitch";
-        return mapping.localDC(endpoint);
+        return maybeGetFromEndpointState(mapping.localDC(endpoint), endpoint, ApplicationState.DC, DEFAULT_DC);
+    }
+
+    // Here, the logic is slightly different from what we have in GossipingPropertyFileSnitch since we have a different
+    // goal. Passed argument (topology that was set on the node) overrides anything that is passed elsewhere.
+    private String maybeGetFromEndpointState(String current, InetAddressAndPort endpoint, ApplicationState state, String defaultValue)
+    {
+        if (current != null)
+            return current;
+
+        EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint.address);
+        if (epState == null || epState.getApplicationState(state) == null)
+        {
+            if (savedEndpoints == null)
+            {
+                savedEndpoints = new HashMap<>();
+                int storage_port = Config.getOverrideLoadConfig().get().storage_port;
+                for (Map.Entry<InetAddress, Map<String, String>> entry : SystemKeyspace.loadDcRackInfo().entrySet())
+                {
+                    savedEndpoints.put(InetAddressAndPort.getByAddressOverrideDefaults(endpoint.address, storage_port),
+                                       entry.getValue());
+                }
+            }
+
+            if (savedEndpoints.containsKey(endpoint))
+                return savedEndpoints.get(endpoint).get("data_center");
+
+            return defaultValue;
+        }
+
+        return epState.getApplicationState(state).value;
     }
 
     static void assign(NetworkTopology newMapping)
     {
         mapping = new NetworkTopology(newMapping);
     }
+
+    public void gossiperStarting()
+    {
+        super.gossiperStarting();
+
+        Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_IP,
+                                                   StorageService.instance.valueFactory.internalIP(FBUtilities.getLocalAddress().getHostAddress()));
+    }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
index c53012b..6d668e6 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
@@ -22,6 +22,7 @@ import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.SimpleSeedProvider;
 
@@ -92,7 +93,8 @@ public class InstanceConfig implements IInstanceConfig
         this.num = num;
         this.networkTopology = networkTopology;
         this.hostId = java.util.UUID.randomUUID();
-        this    .set("broadcast_address", broadcast_address)
+        this    .set("num_tokens", 1)
+                .set("broadcast_address", broadcast_address)
                 .set("listen_address", listen_address)
                 .set("broadcast_rpc_address", broadcast_rpc_address)
                 .set("rpc_address", rpc_address)
@@ -187,6 +189,12 @@ public class InstanceConfig implements IInstanceConfig
             propagate(writeToConfig, e.getKey(), e.getValue(), true);
     }
 
+    public void validate()
+    {
+        if (((int) get("num_tokens")) > 1)
+            throw new IllegalArgumentException("In-JVM dtests do not support vnodes as of now.");
+    }
+
     private void propagate(Object writeToConfig, String fieldName, Object value, boolean ignoreMissing)
     {
         if (value == NULL)
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/NetworkTopology.java b/test/distributed/org/apache/cassandra/distributed/impl/NetworkTopology.java
index 1176b32..f7c31ff 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/NetworkTopology.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/NetworkTopology.java
@@ -22,6 +22,9 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.function.IntFunction;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -29,7 +32,24 @@ import org.apache.cassandra.utils.Pair;
 
 public class NetworkTopology
 {
-    private final Map<InetAddressAndPort, Pair<String, String>> map;
+    private final Map<InetAddressAndPort, DcAndRack> map;
+
+    public static class DcAndRack
+    {
+        private final String dc;
+        private final String rack;
+
+        private DcAndRack(String dc, String rack)
+        {
+            this.dc = dc;
+            this.rack = rack;
+        }
+    }
+
+    public static DcAndRack dcAndRack(String dc, String rack)
+    {
+        return new DcAndRack(dc, rack);
+    }
 
     private NetworkTopology() {
         map = new HashMap<>();
@@ -41,7 +61,7 @@ public class NetworkTopology
         map = new HashMap<>(networkTopology.map);
     }
 
-    public static NetworkTopology build(String ipPrefix, int broadcastPort, Map<Integer, Pair<String, String>> nodeIdTopology)
+    public static NetworkTopology build(String ipPrefix, int broadcastPort, Map<Integer, DcAndRack> nodeIdTopology)
     {
         final NetworkTopology topology = new NetworkTopology();
 
@@ -51,7 +71,7 @@ public class NetworkTopology
 
             try
             {
-                Pair<String,String> dcAndRack = nodeIdTopology.get(nodeId);
+                DcAndRack dcAndRack = nodeIdTopology.get(nodeId);
                 if (dcAndRack == null)
                     throw new IllegalStateException("nodeId " + nodeId + "not found in instanceMap");
 
@@ -67,23 +87,51 @@ public class NetworkTopology
         return topology;
     }
 
-    public Pair<String, String> put(InetAddressAndPort key, Pair<String, String> value)
+    public DcAndRack put(InetAddressAndPort key, DcAndRack value)
     {
         return map.put(key, value);
     }
 
     public String localRack(InetAddressAndPort key)
     {
-        return map.get(key).right;
+        DcAndRack p  = map.get(key);
+        if (p == null)
+            return null;
+        return p.rack;
     }
 
     public String localDC(InetAddressAndPort key)
     {
-        return map.get(key).left;
+        DcAndRack p = map.get(key);
+        if (p == null)
+            return null;
+        return p.dc;
     }
 
     public boolean contains(InetAddressAndPort key)
     {
         return map.containsKey(key);
     }
+
+    public String toString()
+    {
+        return "NetworkTopology{" + map + '}';
+    }
+
+
+    public static Map<Integer, NetworkTopology.DcAndRack> singleDcNetworkTopology(int nodeCount,
+                                                                                  String dc,
+                                                                                  String rack)
+    {
+        return networkTopology(nodeCount, (nodeid) -> NetworkTopology.dcAndRack(dc, rack));
+    }
+
+    public static Map<Integer, NetworkTopology.DcAndRack> networkTopology(int nodeCount,
+                                                                          IntFunction<DcAndRack> dcAndRackSupplier)
+    {
+
+        return IntStream.rangeClosed(1, nodeCount).boxed()
+                        .collect(Collectors.toMap(nodeId -> nodeId,
+                                                  dcAndRackSupplier::apply));
+    }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/BootstrapTest.java b/test/distributed/org/apache/cassandra/distributed/test/BootstrapTest.java
new file mode 100644
index 0000000..0af26ea
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/BootstrapTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.impl.IInvokableInstance;
+import org.apache.cassandra.distributed.impl.InstanceConfig;
+import org.apache.cassandra.distributed.impl.NetworkTopology;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class BootstrapTest extends DistributedTestBase
+{
+
+    @Test
+    public void bootstrapTest() throws Throwable
+    {
+        int originalNodeCount = 2;
+        int expandedNodeCount = originalNodeCount + 1;
+        Cluster.Builder<IInvokableInstance, Cluster> builder = Cluster.build(originalNodeCount)
+                                                                      .withTokenSupplier(Cluster.evenlyDistributedTokens(expandedNodeCount))
+                                                                      .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(originalNodeCount, "dc0", "rack0"))
+                                                                      .withConfig(config -> config.with(NETWORK, GOSSIP));
+
+        Map<Integer, Long> withBootstrap = null;
+        Map<Integer, Long> naturally = null;
+
+        try (Cluster cluster = builder.start())
+        {
+            populate(cluster);
+
+            InstanceConfig config = builder.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
+                                           .newInstanceConfig(cluster);
+            config.set("auto_bootstrap", true);
+
+            IInstance newInstance = cluster.bootstrap(config);
+            newInstance.startup();
+
+            cluster.stream().forEach(instance -> {
+                instance.nodetool("cleanup", KEYSPACE, "tbl");
+            });
+
+            withBootstrap = count(cluster);
+        }
+
+        builder = Cluster.build(expandedNodeCount)
+                         .withTokenSupplier(Cluster.evenlyDistributedTokens(expandedNodeCount))
+                         .withConfig(config -> config.with(NETWORK, GOSSIP));
+
+        try (Cluster cluster = builder.start())
+        {
+            populate(cluster);
+            naturally = count(cluster);
+        }
+
+        Assert.assertEquals(withBootstrap, naturally);
+    }
+
+    public void populate(Cluster cluster)
+    {
+        cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + 3 + "};");
+        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+        for (int i = 0; i < 1000; i++)
+            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, ?, ?)",
+                                           ConsistencyLevel.QUORUM,
+                                           i, i, i);
+    }
+
+    public Map<Integer, Long> count(Cluster cluster)
+    {
+        return IntStream.rangeClosed(1, cluster.size())
+                        .boxed()
+                        .collect(Collectors.toMap(nodeId -> nodeId,
+                                                  nodeId -> (Long) cluster.get(nodeId).executeInternal("SELECT count(*) FROM " + KEYSPACE + ".tbl")[0][0]));
+    }
+
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
index 745e1ab..7a3d52d 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
@@ -67,6 +67,7 @@ public class DistributedTestBase
     @BeforeClass
     public static void setup()
     {
+        System.setProperty("cassandra.ring_delay_ms", Integer.toString(10 * 1000));
         System.setProperty("org.apache.cassandra.disable_mbean_registration", "true");
         nativeLibraryWorkaround();
         processReaperWorkaround();
diff --git a/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java b/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java
index 2c4f9c8..a9c2cee 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.impl.NetworkTopology;
 import org.apache.cassandra.utils.Pair;
 
 public class NetworkTopologyTest extends DistributedTestBase
@@ -36,7 +37,7 @@ public class NetworkTopologyTest extends DistributedTestBase
     public void namedDcTest() throws Throwable
     {
         try (Cluster cluster = Cluster.build()
-                                      .withNodeIdTopology(Collections.singletonMap(1, Pair.create("somewhere", "rack0")))
+                                      .withNodeIdTopology(Collections.singletonMap(1, NetworkTopology.dcAndRack("somewhere", "rack0")))
                                       .withRack("elsewhere", "firstrack", 1)
                                       .withRack("elsewhere", "secondrack", 2)
                                       .withDC("nearthere", 4)
@@ -94,6 +95,6 @@ public class NetworkTopologyTest extends DistributedTestBase
     @Test(expected = IllegalStateException.class)
     public void noHolesInNodeIdTopologyTest()
     {
-        Cluster.build().withNodeIdTopology(Collections.singletonMap(2, Pair.create("doomed", "rack")));
+        Cluster.build().withNodeIdTopology(Collections.singletonMap(2, NetworkTopology.dcAndRack("doomed", "rack")));
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org