You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2020/11/13 20:39:17 UTC

[cassandra] branch trunk updated (94663c3 -> e4fac35)

This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 94663c3  Relax < check to <= for NodeToolGossipInfoTest
     new 17ebee3  CASSANDRA-15158 fixed SCHEMA_DELAY to use getSchemaDelay and no longer convert it from secones to millis (since its already millis)
     new 50d8245  Merge branch 'cassandra-3.0' into cassandra-3.11
     new e4fac35  Merge branch 'cassandra-3.11' into trunk

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/cassandra/config/CassandraRelevantProperties.java | 11 +++++++++++
 src/java/org/apache/cassandra/service/StorageService.java    | 12 +++++++-----
 .../apache/cassandra/distributed/action/GossipHelper.java    |  7 ++++++-
 .../cassandra/distributed/test/ring/BootstrapTest.java       |  6 ++++--
 4 files changed, 28 insertions(+), 8 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.11' into trunk

Posted by dc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit e4fac3582e0a9dda182313a3aa784be35d965f4e
Merge: 94663c3 50d8245
Author: David Capwell <dc...@apache.org>
AuthorDate: Fri Nov 13 12:37:46 2020 -0800

    Merge branch 'cassandra-3.11' into trunk

 .../apache/cassandra/config/CassandraRelevantProperties.java | 11 +++++++++++
 src/java/org/apache/cassandra/service/StorageService.java    | 12 +++++++-----
 .../apache/cassandra/distributed/action/GossipHelper.java    |  7 ++++++-
 .../cassandra/distributed/test/ring/BootstrapTest.java       |  6 ++++--
 4 files changed, 28 insertions(+), 8 deletions(-)

diff --cc src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index 881b7d9,0000000..7402aa1
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@@ -1,240 -1,0 +1,251 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.config;
 +
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +
 +/** A class that extracts system properties for the cassandra node it runs within. */
 +public enum CassandraRelevantProperties
 +{
 +    //base JVM properties
 +    JAVA_HOME("java.home"),
 +    CASSANDRA_PID_FILE ("cassandra-pidfile"),
 +
 +    /**
 +     * Indicates the temporary directory used by the Java Virtual Machine (JVM)
 +     * to create and store temporary files.
 +     */
 +    JAVA_IO_TMPDIR ("java.io.tmpdir"),
 +
 +    /**
 +     * Path from which to load native libraries.
 +     * Default is absolute path to lib directory.
 +     */
 +    JAVA_LIBRARY_PATH ("java.library.path"),
 +
 +    JAVA_SECURITY_EGD ("java.security.egd"),
 +
 +    /** Java Runtime Environment version */
 +    JAVA_VERSION ("java.version"),
 +
 +    /** Java Virtual Machine implementation name */
 +    JAVA_VM_NAME ("java.vm.name"),
 +
 +    /** Line separator ("\n" on UNIX). */
 +    LINE_SEPARATOR ("line.separator"),
 +
 +    /** Java class path. */
 +    JAVA_CLASS_PATH ("java.class.path"),
 +
 +    /** Operating system architecture. */
 +    OS_ARCH ("os.arch"),
 +
 +    /** Operating system name. */
 +    OS_NAME ("os.name"),
 +
 +    /** User's home directory. */
 +    USER_HOME ("user.home"),
 +
 +    /** Platform word size sun.arch.data.model. Examples: "32", "64", "unknown"*/
 +    SUN_ARCH_DATA_MODEL ("sun.arch.data.model"),
 +
 +    //JMX properties
 +    /**
 +     * The value of this property represents the host name string
 +     * that should be associated with remote stubs for locally created remote objects,
 +     * in order to allow clients to invoke methods on the remote object.
 +     */
 +    JAVA_RMI_SERVER_HOSTNAME ("java.rmi.server.hostname"),
 +
 +    /**
 +     * If this value is true, object identifiers for remote objects exported by this VM will be generated by using
 +     * a cryptographically secure random number generator. The default value is false.
 +     */
 +    JAVA_RMI_SERVER_RANDOM_ID ("java.rmi.server.randomIDs"),
 +
 +    /**
 +     * This property indicates whether password authentication for remote monitoring is
 +     * enabled. By default it is disabled - com.sun.management.jmxremote.authenticate
 +     */
 +    COM_SUN_MANAGEMENT_JMXREMOTE_AUTHENTICATE ("com.sun.management.jmxremote.authenticate"),
 +
 +    /**
 +     * The port number to which the RMI connector will be bound - com.sun.management.jmxremote.rmi.port.
 +     * An Integer object that represents the value of the second argument is returned
 +     * if there is no port specified, if the port does not have the correct numeric format,
 +     * or if the specified name is empty or null.
 +     */
 +    COM_SUN_MANAGEMENT_JMXREMOTE_RMI_PORT ("com.sun.management.jmxremote.rmi.port", "0"),
 +
 +    /** Cassandra jmx remote port */
 +    CASSANDRA_JMX_REMOTE_PORT("cassandra.jmx.remote.port"),
 +
 +    /** This property  indicates whether SSL is enabled for monitoring remotely. Default is set to false. */
 +    COM_SUN_MANAGEMENT_JMXREMOTE_SSL ("com.sun.management.jmxremote.ssl"),
 +
 +    /**
 +     * This property indicates whether SSL client authentication is enabled - com.sun.management.jmxremote.ssl.need.client.auth.
 +     * Default is set to false.
 +     */
 +    COM_SUN_MANAGEMENT_JMXREMOTE_SSL_NEED_CLIENT_AUTH ("com.sun.management.jmxremote.ssl.need.client.auth"),
 +
 +    /**
 +     * This property indicates the location for the access file. If com.sun.management.jmxremote.authenticate is false,
 +     * then this property and the password and access files, are ignored. Otherwise, the access file must exist and
 +     * be in the valid format. If the access file is empty or nonexistent, then no access is allowed.
 +     */
 +    COM_SUN_MANAGEMENT_JMXREMOTE_ACCESS_FILE ("com.sun.management.jmxremote.access.file"),
 +
 +    /** This property indicates the path to the password file - com.sun.management.jmxremote.password.file */
 +    COM_SUN_MANAGEMENT_JMXREMOTE_PASSWORD_FILE ("com.sun.management.jmxremote.password.file"),
 +
 +    /** Port number to enable JMX RMI connections - com.sun.management.jmxremote.port */
 +    COM_SUN_MANAGEMENT_JMXREMOTE_PORT ("com.sun.management.jmxremote.port"),
 +
 +    /**
 +     * A comma-delimited list of SSL/TLS protocol versions to enable.
 +     * Used in conjunction with com.sun.management.jmxremote.ssl - com.sun.management.jmxremote.ssl.enabled.protocols
 +     */
 +    COM_SUN_MANAGEMENT_JMXREMOTE_SSL_ENABLED_PROTOCOLS ("com.sun.management.jmxremote.ssl.enabled.protocols"),
 +
 +    /**
 +     * A comma-delimited list of SSL/TLS cipher suites to enable.
 +     * Used in conjunction with com.sun.management.jmxremote.ssl - com.sun.management.jmxremote.ssl.enabled.cipher.suites
 +     */
 +    COM_SUN_MANAGEMENT_JMXREMOTE_SSL_ENABLED_CIPHER_SUITES ("com.sun.management.jmxremote.ssl.enabled.cipher.suites"),
 +
 +    /** mx4jaddress */
 +    MX4JADDRESS ("mx4jaddress"),
 +
 +    /** mx4jport */
 +    MX4JPORT ("mx4jport"),
 +
++    /**
++     * When bootstraping we wait for all schema versions found in gossip to be seen, and if not seen in time we fail
++     * the bootstrap; this property will avoid failing and allow bootstrap to continue if set to true.
++     */
++    BOOTSTRAP_SKIP_SCHEMA_CHECK("cassandra.skip_schema_check"),
++
++    /**
++     * When bootstraping how long to wait for schema versions to be seen.
++     */
++    BOOTSTRAP_SCHEMA_DELAY_MS("cassandra.schema_delay_ms"),
++
 +    //cassandra properties (without the "cassandra." prefix)
 +
 +    /**
 +     * The cassandra-foreground option will tell CassandraDaemon whether
 +     * to close stdout/stderr, but it's up to us not to background.
 +     * yes/null
 +     */
 +    CASSANDRA_FOREGROUND ("cassandra-foreground"),
 +
 +    DEFAULT_PROVIDE_OVERLAPPING_TOMBSTONES ("default.provide.overlapping.tombstones"),
 +    ORG_APACHE_CASSANDRA_DISABLE_MBEAN_REGISTRATION ("org.apache.cassandra.disable_mbean_registration"),
 +    //only for testing
 +    ORG_APACHE_CASSANDRA_CONF_CASSANDRA_RELEVANT_PROPERTIES_TEST("org.apache.cassandra.conf.CassandraRelevantPropertiesTest"),
 +    ORG_APACHE_CASSANDRA_DB_VIRTUAL_SYSTEM_PROPERTIES_TABLE_TEST("org.apache.cassandra.db.virtual.SystemPropertiesTableTest"),
 +
 +    /** This property indicates whether disable_mbean_registration is true */
 +    IS_DISABLED_MBEAN_REGISTRATION("org.apache.cassandra.disable_mbean_registration");
 +
 +    CassandraRelevantProperties(String key, String defaultVal)
 +    {
 +        this.key = key;
 +        this.defaultVal = defaultVal;
 +    }
 +
 +    CassandraRelevantProperties(String key)
 +    {
 +        this.key = key;
 +        this.defaultVal = null;
 +    }
 +
 +    private final String key;
 +    private final String defaultVal;
 +
 +    public String getKey()
 +    {
 +        return key;
 +    }
 +
 +    /**
 +     * Gets the value of the indicated system property.
 +     * @return system property value if it exists, defaultValue otherwise.
 +     */
 +    public String getString()
 +    {
 +        String value = System.getProperty(key);
 +
 +        return value == null ? defaultVal : STRING_CONVERTER.convert(value);
 +    }
 +
 +    /**
 +     * Gets the value of a system property as a boolean.
 +     * @return system property boolean value if it exists, false otherwise().
 +     */
 +    public boolean getBoolean()
 +    {
 +        String value = System.getProperty(key);
 +
 +        return BOOLEAN_CONVERTER.convert(value == null ? defaultVal : value);
 +    }
 +
 +    /**
 +     * Gets the value of a system property as a int.
 +     * @return system property int value if it exists, defaultValue otherwise.
 +     */
 +    public int getInt()
 +    {
 +        String value = System.getProperty(key);
 +
 +        return INTEGER_CONVERTER.convert(value == null ? defaultVal : value);
 +    }
 +
 +    private interface PropertyConverter<T>
 +    {
 +        T convert(String value);
 +    }
 +
 +    private static final PropertyConverter<String> STRING_CONVERTER = value -> value;
 +
 +    private static final PropertyConverter<Boolean> BOOLEAN_CONVERTER = Boolean::parseBoolean;
 +
 +    private static final PropertyConverter<Integer> INTEGER_CONVERTER = value ->
 +    {
 +        try
 +        {
 +            return Integer.decode(value);
 +        }
 +        catch (NumberFormatException e)
 +        {
 +            throw new ConfigurationException(String.format("Invalid value for system property: " +
 +                                                           "expected integer value but got '%s'", value));
 +        }
 +    };
 +
 +    /**
 +     * @return whether a system property is present or not.
 +     */
 +    public boolean isPresent()
 +    {
 +        return System.getProperties().containsKey(key);
 +    }
 +}
 +
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 3201d80,eb13df1..7d27163
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -108,24 -96,15 +108,26 @@@ import org.apache.cassandra.utils.*
  import org.apache.cassandra.utils.logging.LoggingSupportFactory;
  import org.apache.cassandra.utils.progress.ProgressEvent;
  import org.apache.cassandra.utils.progress.ProgressEventType;
 +import org.apache.cassandra.utils.progress.ProgressListener;
 +import org.apache.cassandra.utils.progress.jmx.JMXBroadcastExecutor;
  import org.apache.cassandra.utils.progress.jmx.JMXProgressSupport;
 -import org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport;
  
 +import static com.google.common.collect.Iterables.transform;
 +import static com.google.common.collect.Iterables.tryFind;
  import static java.util.Arrays.asList;
 +import static java.util.Arrays.stream;
  import static java.util.concurrent.TimeUnit.MINUTES;
 +import static java.util.concurrent.TimeUnit.MILLISECONDS;
 +import static java.util.concurrent.TimeUnit.NANOSECONDS;
  import static java.util.stream.Collectors.toList;
 +import static java.util.stream.Collectors.toMap;
++import static org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SCHEMA_DELAY_MS;
++import static org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK;
  import static org.apache.cassandra.index.SecondaryIndexManager.getIndexName;
  import static org.apache.cassandra.index.SecondaryIndexManager.isIndexColumnFamily;
 -import static org.apache.cassandra.service.MigrationManager.evolveSystemKeyspace;
 +import static org.apache.cassandra.net.NoPayload.noPayload;
 +import static org.apache.cassandra.net.Verb.REPLICATION_DONE_REQ;
 +import static org.apache.cassandra.schema.MigrationManager.evolveSystemKeyspace;
  
  /**
   * This abstraction contains the token/identifier of this node
@@@ -137,11 -116,10 +139,11 @@@ public class StorageService extends Not
  {
      private static final Logger logger = LoggerFactory.getLogger(StorageService.class);
  
 +    public static final int INDEFINITE = -1;
      public static final int RING_DELAY = getRingDelay(); // delay after which we assume ring has stablized
-     public static final int SCHEMA_DELAY = getRingDelay(); // delay after which we assume ring has stablized
+     public static final int SCHEMA_DELAY_MILLIS = getSchemaDelay();
  
--    private static final boolean REQUIRE_SCHEMAS = !Boolean.getBoolean("cassandra.skip_schema_check");
++    private static final boolean REQUIRE_SCHEMAS = !BOOTSTRAP_SKIP_SCHEMA_CHECK.getBoolean();
  
      private final JMXProgressSupport progressSupport = new JMXProgressSupport(this);
  
@@@ -161,10 -146,10 +163,10 @@@
  
      private static int getSchemaDelay()
      {
--        String newdelay = System.getProperty("cassandra.schema_delay_ms");
++        String newdelay = BOOTSTRAP_SCHEMA_DELAY_MS.getString();
          if (newdelay != null)
          {
-             logger.info("Overriding SCHEMA_DELAY to {}ms", newdelay);
+             logger.info("Overriding SCHEMA_DELAY_MILLIS to {}ms", newdelay);
              return Integer.parseInt(newdelay);
          }
          else
diff --cc test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
index 6dda98e,0000000..229cb39
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
+++ b/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
@@@ -1,461 -1,0 +1,466 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.action;
 +
 +import java.io.IOException;
 +import java.io.Serializable;
 +import java.net.InetSocketAddress;
 +import java.time.Duration;
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.UUID;
 +import java.util.concurrent.TimeUnit;
 +import java.util.function.Supplier;
 +
 +import org.apache.cassandra.db.SystemKeyspace;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.distributed.api.IInstance;
 +import org.apache.cassandra.distributed.api.IInvokableInstance;
 +import org.apache.cassandra.distributed.api.IIsolatedExecutor;
 +import org.apache.cassandra.distributed.shared.VersionedApplicationState;
 +import org.apache.cassandra.gms.ApplicationState;
 +import org.apache.cassandra.gms.EndpointState;
 +import org.apache.cassandra.gms.Gossiper;
 +import org.apache.cassandra.gms.VersionedValue;
 +import org.apache.cassandra.io.util.DataInputBuffer;
 +import org.apache.cassandra.io.util.DataOutputBuffer;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.schema.MigrationCoordinator;
 +import org.apache.cassandra.schema.MigrationManager;
 +import org.apache.cassandra.service.PendingRangeCalculatorService;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +import static org.apache.cassandra.distributed.impl.DistributedTestSnitch.toCassandraInetAddressAndPort;
 +
 +public class GossipHelper
 +{
 +    public static InstanceAction statusToBootstrap(IInvokableInstance newNode)
 +    {
 +        return (instance) ->
 +        {
 +            changeGossipState(instance,
 +                              newNode,
 +                              Arrays.asList(tokens(newNode),
 +                                            statusBootstrapping(newNode),
 +                                            statusWithPortBootstrapping(newNode)));
 +        };
 +    }
 +
 +    public static InstanceAction statusToNormal(IInvokableInstance peer)
 +    {
 +        return (target) ->
 +        {
 +            changeGossipState(target,
 +                              peer,
 +                              Arrays.asList(tokens(peer),
 +                                            statusNormal(peer),
 +                                            releaseVersion(peer),
 +                                            netVersion(peer),
 +                                            statusWithPortNormal(peer)));
 +        };
 +    }
 +
 +    /**
 +     * This method is unsafe and should be used _only_ when gossip is not used or available: it creates versioned values on the
 +     * target instance, which means Gossip versioning gets out of sync. Use a safe couterpart at all times when performing _any_
 +     * ring movement operations _or_ if Gossip is used.
 +     */
 +    public static void unsafeStatusToNormal(IInvokableInstance target, IInstance peer)
 +    {
 +        int messagingVersion = peer.getMessagingVersion();
 +        changeGossipState(target,
 +                          peer,
 +                          Arrays.asList(unsafeVersionedValue(target,
 +                                                             ApplicationState.TOKENS,
 +                                                             (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).tokens(tokens),
 +                                                             peer.config().getString("partitioner"),
 +                                                             peer.config().getString("initial_token")),
 +                                        unsafeVersionedValue(target,
 +                                                             ApplicationState.STATUS,
 +                                                             (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).normal(tokens),
 +                                                             peer.config().getString("partitioner"),
 +                                                             peer.config().getString("initial_token")),
 +                                        unsafeVersionedValue(target,
 +                                                             ApplicationState.STATUS_WITH_PORT,
 +                                                             (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).normal(tokens),
 +                                                             peer.config().getString("partitioner"),
 +                                                             peer.config().getString("initial_token")),
 +                                        unsafeVersionedValue(target,
 +                                                             ApplicationState.NET_VERSION,
 +                                                             (partitioner) -> new VersionedValue.VersionedValueFactory(partitioner).networkVersion(messagingVersion),
 +                                                             peer.config().getString("partitioner")),
 +                                        unsafeReleaseVersion(target,
 +                                                             peer.config().getString("partitioner"),
 +                                                             peer.getReleaseVersionString())));
 +    }
 +
 +    public static InstanceAction statusToLeaving(IInvokableInstance newNode)
 +    {
 +        return (instance) -> {
 +            changeGossipState(instance,
 +                              newNode,
 +                              Arrays.asList(tokens(newNode),
 +                                            statusLeaving(newNode),
 +                                            statusWithPortLeaving(newNode)));
 +        };
 +    }
 +
 +    public static InstanceAction bootstrap()
 +    {
 +        return new BootstrapAction();
 +    }
 +
 +    public static InstanceAction bootstrap(boolean joinRing, Duration waitForBootstrap, Duration waitForSchema)
 +    {
 +        return new BootstrapAction(joinRing, waitForBootstrap, waitForSchema);
 +    }
 +
 +    public static InstanceAction disseminateGossipState(IInvokableInstance newNode)
 +    {
 +        return new DisseminateGossipState(newNode);
 +    }
 +
 +    public static InstanceAction pullSchemaFrom(IInvokableInstance pullFrom)
 +    {
 +        return new PullSchemaFrom(pullFrom);
 +    }
 +
 +    private static InstanceAction disableBinary()
 +    {
 +        return (instance) -> instance.nodetoolResult("disablebinary").asserts().success();
 +    }
 +
 +    private static class DisseminateGossipState implements InstanceAction
 +    {
 +        final Map<InetSocketAddress, byte[]> gossipState;
 +
 +        public DisseminateGossipState(IInvokableInstance... from)
 +        {
 +            gossipState = new HashMap<>();
 +            for (IInvokableInstance node : from)
 +            {
 +                byte[] epBytes = node.callsOnInstance(() -> {
 +                    EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddressAndPort());
 +                    return toBytes(epState);
 +                }).call();
 +                gossipState.put(node.broadcastAddress(), epBytes);
 +            }
 +        }
 +
 +        public void accept(IInvokableInstance instance)
 +        {
 +            instance.appliesOnInstance((IIsolatedExecutor.SerializableFunction<Map<InetSocketAddress, byte[]>, Void>)
 +                                       (map) -> {
 +                                           Map<InetAddressAndPort, EndpointState> newState = new HashMap<>();
 +                                           for (Map.Entry<InetSocketAddress, byte[]> e : map.entrySet())
 +                                               newState.put(toCassandraInetAddressAndPort(e.getKey()), fromBytes(e.getValue()));
 +
 +                                           Gossiper.runInGossipStageBlocking(() -> {
 +                                               Gossiper.instance.applyStateLocally(newState);
 +                                           });
 +                                           return null;
 +                                       }).apply(gossipState);
 +        }
 +    }
 +
 +    private static byte[] toBytes(EndpointState epState)
 +    {
 +        try (DataOutputBuffer out = new DataOutputBuffer(1024))
 +        {
 +            EndpointState.serializer.serialize(epState, out, MessagingService.current_version);
 +            return out.toByteArray();
 +        }
 +        catch (IOException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +
 +    private static EndpointState fromBytes(byte[] bytes)
 +    {
 +        try (DataInputBuffer in = new DataInputBuffer(bytes))
 +        {
 +            return EndpointState.serializer.deserialize(in, MessagingService.current_version);
 +        }
 +        catch (Throwable t)
 +        {
 +            throw new RuntimeException(t);
 +        }
 +    }
 +
 +    private static class PullSchemaFrom implements InstanceAction
 +    {
 +        final InetSocketAddress pullFrom;
 +
 +        public PullSchemaFrom(IInvokableInstance pullFrom)
 +        {
 +            this.pullFrom = pullFrom.broadcastAddress();;
 +        }
 +
 +        public void accept(IInvokableInstance pullTo)
 +        {
 +            pullTo.acceptsOnInstance((InetSocketAddress pullFrom) -> {
 +                InetAddressAndPort endpoint = toCassandraInetAddressAndPort(pullFrom);
 +                EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
 +                MigrationCoordinator.instance.reportEndpointVersion(endpoint, state);
 +                MigrationCoordinator.instance.awaitSchemaRequests(TimeUnit.SECONDS.toMillis(10));
 +            }).accept(pullFrom);
 +        }
 +    }
 +
 +    private static class BootstrapAction implements InstanceAction, Serializable
 +    {
 +        private final boolean joinRing;
 +        private final Duration waitForBootstrap;
 +        private final Duration waitForSchema;
 +
 +        public BootstrapAction()
 +        {
 +            this(true, Duration.ofMinutes(10), Duration.ofSeconds(10));
 +        }
 +
 +        public BootstrapAction(boolean joinRing, Duration waitForBootstrap, Duration waitForSchema)
 +        {
 +            this.joinRing = joinRing;
 +            this.waitForBootstrap = waitForBootstrap;
 +            this.waitForSchema = waitForSchema;
 +        }
 +
 +        public void accept(IInvokableInstance instance)
 +        {
 +            instance.appliesOnInstance((String partitionerString, String tokenString) -> {
 +                IPartitioner partitioner = FBUtilities.newPartitioner(partitionerString);
 +                List<Token> tokens = Collections.singletonList(partitioner.getTokenFactory().fromString(tokenString));
 +                try
 +                {
 +                    Collection<InetAddressAndPort> collisions = StorageService.instance.prepareForBootstrap(waitForSchema.toMillis());
 +                    assert collisions.size() == 0 : String.format("Didn't expect any replacements but got %s", collisions);
 +                    boolean isBootstrapSuccessful = StorageService.instance.bootstrap(tokens, waitForBootstrap.toMillis());
 +                    assert isBootstrapSuccessful : "Bootstrap did not complete successfully";
 +                    StorageService.instance.setUpDistributedSystemKeyspaces();
 +                    if (joinRing)
 +                        StorageService.instance.finishJoiningRing(true, tokens);
 +                }
 +                catch (Throwable t)
 +                {
 +                    throw new RuntimeException(t);
 +                }
 +
 +                return null;
 +            }).apply(instance.config().getString("partitioner"), instance.config().getString("initial_token"));
 +        }
 +    }
 +
 +    public static InstanceAction decomission()
 +    {
 +        return (target) -> target.nodetoolResult("decommission").asserts().success();
 +    }
 +
 +
 +    public static VersionedApplicationState tokens(IInvokableInstance instance)
 +    {
 +        return versionedToken(instance, ApplicationState.TOKENS, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).tokens(tokens));
 +    }
 +
 +    public static VersionedApplicationState netVersion(IInvokableInstance instance)
 +    {
 +        return versionedToken(instance, ApplicationState.NET_VERSION, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).networkVersion());
 +    }
 +
 +    private static VersionedApplicationState unsafeReleaseVersion(IInvokableInstance instance, String partitionerStr, String releaseVersionStr)
 +    {
 +        return unsafeVersionedValue(instance, ApplicationState.RELEASE_VERSION, (partitioner) -> new VersionedValue.VersionedValueFactory(partitioner).releaseVersion(releaseVersionStr), partitionerStr);
 +    }
 +
 +    public static VersionedApplicationState releaseVersion(IInvokableInstance instance)
 +    {
 +        return unsafeReleaseVersion(instance, instance.config().getString("partitioner"), instance.getReleaseVersionString());
 +    }
 +
 +    public static VersionedApplicationState statusNormal(IInvokableInstance instance)
 +    {
 +        return versionedToken(instance, ApplicationState.STATUS, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).normal(tokens));
 +    }
 +
 +    public static VersionedApplicationState statusWithPortNormal(IInvokableInstance instance)
 +    {
 +        return versionedToken(instance, ApplicationState.STATUS_WITH_PORT, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).normal(tokens));
 +    }
 +
 +    public static VersionedApplicationState statusBootstrapping(IInvokableInstance instance)
 +    {
 +        return versionedToken(instance, ApplicationState.STATUS, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).bootstrapping(tokens));
 +    }
 +
 +    public static VersionedApplicationState statusWithPortBootstrapping(IInvokableInstance instance)
 +    {
 +        return versionedToken(instance, ApplicationState.STATUS_WITH_PORT, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).bootstrapping(tokens));
 +    }
 +
 +    public static VersionedApplicationState statusLeaving(IInvokableInstance instance)
 +    {
 +        return versionedToken(instance, ApplicationState.STATUS, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).leaving(tokens));
 +    }
 +
 +    public static VersionedApplicationState statusLeft(IInvokableInstance instance)
 +    {
 +        return versionedToken(instance, ApplicationState.STATUS, (partitioner, tokens) -> {
 +            return new VersionedValue.VersionedValueFactory(partitioner).left(tokens, System.currentTimeMillis() + Gossiper.aVeryLongTime);
 +        });
 +    }
 +
 +    public static VersionedApplicationState statusWithPortLeft(IInvokableInstance instance)
 +    {
 +        return versionedToken(instance, ApplicationState.STATUS_WITH_PORT, (partitioner, tokens) -> {
 +            return new VersionedValue.VersionedValueFactory(partitioner).left(tokens, System.currentTimeMillis() + Gossiper.aVeryLongTime);
 +
 +        });
 +    }
 +
 +    public static VersionedApplicationState statusWithPortLeaving(IInvokableInstance instance)
 +    {
 +        return versionedToken(instance, ApplicationState.STATUS_WITH_PORT, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).leaving(tokens));
 +    }
 +
 +    public static VersionedValue toVersionedValue(VersionedApplicationState vv)
 +    {
 +        return VersionedValue.unsafeMakeVersionedValue(vv.value, vv.version);
 +    }
 +
 +    public static ApplicationState toApplicationState(VersionedApplicationState vv)
 +    {
 +        return ApplicationState.values()[vv.applicationState];
 +    }
 +
 +    private static VersionedApplicationState unsafeVersionedValue(IInvokableInstance instance,
 +                                                                 ApplicationState applicationState,
 +                                                                 IIsolatedExecutor.SerializableBiFunction<IPartitioner, Collection<Token>, VersionedValue> supplier,
 +                                                                 String partitionerStr, String initialTokenStr)
 +    {
 +        return instance.appliesOnInstance((String partitionerString, String tokenString) -> {
 +            IPartitioner partitioner = FBUtilities.newPartitioner(partitionerString);
 +            Token token = partitioner.getTokenFactory().fromString(tokenString);
 +
 +            VersionedValue versionedValue = supplier.apply(partitioner, Collections.singleton(token));
 +            return new VersionedApplicationState(applicationState.ordinal(), versionedValue.value, versionedValue.version);
 +        }).apply(partitionerStr, initialTokenStr);
 +    }
 +
 +    private static VersionedApplicationState unsafeVersionedValue(IInvokableInstance instance,
 +                                                                 ApplicationState applicationState,
 +                                                                 IIsolatedExecutor.SerializableFunction<IPartitioner, VersionedValue> supplier,
 +                                                                 String partitionerStr)
 +    {
 +        return instance.appliesOnInstance((String partitionerString) -> {
 +            IPartitioner partitioner = FBUtilities.newPartitioner(partitionerString);
 +            VersionedValue versionedValue = supplier.apply(partitioner);
 +            return new VersionedApplicationState(applicationState.ordinal(), versionedValue.value, versionedValue.version);
 +        }).apply(partitionerStr);
 +    }
 +
 +    public static VersionedApplicationState versionedToken(IInvokableInstance instance, ApplicationState applicationState, IIsolatedExecutor.SerializableBiFunction<IPartitioner, Collection<Token>, VersionedValue> supplier)
 +    {
 +        return unsafeVersionedValue(instance, applicationState, supplier, instance.config().getString("partitioner"), instance.config().getString("initial_token"));
 +    }
 +
 +    public static InstanceAction removeFromRing(IInvokableInstance peer)
 +    {
 +        return (target) -> {
 +            InetAddressAndPort endpoint = toCassandraInetAddressAndPort(peer.broadcastAddress());
 +            VersionedApplicationState newState = statusLeft(peer);
 +
 +            target.runOnInstance(() -> {
 +                // state to 'left'
 +                EndpointState currentState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
 +                ApplicationState as = toApplicationState(newState);
 +                VersionedValue vv = toVersionedValue(newState);
 +                currentState.addApplicationState(as, vv);
 +                StorageService.instance.onChange(endpoint, as, vv);
 +
 +                // remove from gossip
 +                Gossiper.runInGossipStageBlocking(() -> Gossiper.instance.unsafeAnulEndpoint(endpoint));
 +                SystemKeyspace.removeEndpoint(endpoint);
 +                PendingRangeCalculatorService.instance.update();
 +                PendingRangeCalculatorService.instance.blockUntilFinished();
 +            });
 +        };
 +    }
 +
 +    /**
 +     * Changes gossip state of the `peer` on `target`
 +     */
 +    public static void changeGossipState(IInvokableInstance target, IInstance peer, List<VersionedApplicationState> newState)
 +    {
 +        InetSocketAddress addr = peer.broadcastAddress();
 +        UUID hostId = peer.config().hostId();
 +        int netVersion = peer.getMessagingVersion();
 +        target.runOnInstance(() -> {
 +            InetAddressAndPort endpoint = toCassandraInetAddressAndPort(addr);
 +            StorageService storageService = StorageService.instance;
 +
 +            Gossiper.runInGossipStageBlocking(() -> {
 +                EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
 +                if (state == null)
 +                {
 +                    Gossiper.instance.initializeNodeUnsafe(endpoint, hostId, netVersion, 1);
 +                    state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
 +                    if (state.isAlive() && !Gossiper.instance.isDeadState(state))
 +                        Gossiper.instance.realMarkAlive(endpoint, state);
 +                }
 +
 +                for (VersionedApplicationState value : newState)
 +                {
 +                    ApplicationState as = toApplicationState(value);
 +                    VersionedValue vv = toVersionedValue(value);
 +                    state.addApplicationState(as, vv);
 +                    storageService.onChange(endpoint, as, vv);
 +                }
 +            });
 +        });
 +    }
 +
 +    public static void withProperty(String prop, boolean value, Runnable r)
 +    {
++        withProperty(prop, Boolean.toString(value), r);
++    }
++
++    public static void withProperty(String prop, String value, Runnable r)
++    {
 +        String before = System.getProperty(prop);
 +        try
 +        {
-             System.setProperty(prop, Boolean.toString(value));
++            System.setProperty(prop, value);
 +            r.run();
 +        }
 +        finally
 +        {
 +            if (before == null)
 +                System.clearProperty(prop);
 +            else
 +                System.setProperty(prop, before);
 +        }
 +    }
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
index e0c5a78,0000000..15a8d00
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
@@@ -1,130 -1,0 +1,132 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test.ring;
 +
 +import java.util.Map;
 +import java.util.stream.Collectors;
 +import java.util.stream.IntStream;
 +
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.distributed.Cluster;
 +import org.apache.cassandra.distributed.api.ConsistencyLevel;
 +import org.apache.cassandra.distributed.api.ICluster;
 +import org.apache.cassandra.distributed.api.IInstanceConfig;
 +import org.apache.cassandra.distributed.api.IInvokableInstance;
 +import org.apache.cassandra.distributed.api.TokenSupplier;
 +import org.apache.cassandra.distributed.shared.NetworkTopology;
 +import org.apache.cassandra.distributed.test.TestBaseImpl;
 +
 +import static java.util.Arrays.asList;
++import static org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SCHEMA_DELAY_MS;
 +import static org.apache.cassandra.distributed.action.GossipHelper.bootstrap;
 +import static org.apache.cassandra.distributed.action.GossipHelper.pullSchemaFrom;
 +import static org.apache.cassandra.distributed.action.GossipHelper.statusToBootstrap;
 +import static org.apache.cassandra.distributed.action.GossipHelper.withProperty;
 +import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 +import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 +
 +public class BootstrapTest extends TestBaseImpl
 +{
 +    @Test
 +    public void bootstrapTest() throws Throwable
 +    {
 +        int originalNodeCount = 2;
 +        int expandedNodeCount = originalNodeCount + 1;
 +
 +        try (Cluster cluster = builder().withNodes(originalNodeCount)
 +                                        .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
 +                                        .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
 +                                        .withConfig(config -> config.with(NETWORK, GOSSIP))
 +                                        .start())
 +        {
 +            populate(cluster,0, 100);
 +
 +            IInstanceConfig config = cluster.newInstanceConfig();
 +            IInvokableInstance newInstance = cluster.bootstrap(config);
 +            withProperty("cassandra.join_ring", false,
 +                         () -> newInstance.startup(cluster));
 +
 +            cluster.forEach(statusToBootstrap(newInstance));
 +
 +            cluster.run(asList(pullSchemaFrom(cluster.get(1)),
 +                               bootstrap()),
 +                        newInstance.config().num());
 +
 +            for (Map.Entry<Integer, Long> e : count(cluster).entrySet())
 +                Assert.assertEquals("Node " + e.getKey() + " has incorrect row state",
 +                                    100L,
 +                                    e.getValue().longValue());
 +        }
 +    }
 +
 +    @Test
 +    public void autoBootstrapTest() throws Throwable
 +    {
 +        int originalNodeCount = 2;
 +        int expandedNodeCount = originalNodeCount + 1;
 +
 +        try (Cluster cluster = builder().withNodes(originalNodeCount)
 +                                        .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
 +                                        .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
 +                                        .withConfig(config -> config.with(NETWORK, GOSSIP))
 +                                        .start())
 +        {
 +            populate(cluster,0, 100);
 +
 +            IInstanceConfig config = cluster.newInstanceConfig();
 +            config.set("auto_bootstrap", true);
 +            IInvokableInstance newInstance = cluster.bootstrap(config);
-             withProperty("cassandra.join_ring", false,
-                          () -> newInstance.startup(cluster));
++            withProperty(BOOTSTRAP_SCHEMA_DELAY_MS.getKey(), Integer.toString(90 * 1000),
++                         () -> withProperty("cassandra.join_ring", false,
++                                            () -> newInstance.startup(cluster)));
 +
 +            newInstance.nodetoolResult("join").asserts().success();
 +
 +            for (Map.Entry<Integer, Long> e : count(cluster).entrySet())
 +                Assert.assertEquals("Node " + e.getKey() + " has incorrect row state", e.getValue().longValue(), 100L);
 +        }
 +    }
 +
 +    public static void populate(ICluster cluster, int from, int to)
 +    {
 +        populate(cluster, from, to, 1, 3, ConsistencyLevel.QUORUM);
 +    }
 +
 +    public static void populate(ICluster cluster, int from, int to, int coord, int rf, ConsistencyLevel cl)
 +    {
 +        cluster.schemaChange("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + rf + "};");
 +        cluster.schemaChange("CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
 +        for (int i = from; i < to; i++)
 +        {
 +            cluster.coordinator(coord).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, ?, ?)",
 +                                               cl,
 +                                               i, i, i);
 +        }
 +    }
 +
 +    public static Map<Integer, Long> count(ICluster cluster)
 +    {
 +        return IntStream.rangeClosed(1, cluster.size())
 +                        .boxed()
 +                        .collect(Collectors.toMap(nodeId -> nodeId,
 +                                                  nodeId -> (Long) cluster.get(nodeId).executeInternal("SELECT count(*) FROM " + KEYSPACE + ".tbl")[0][0]));
 +    }
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org