You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2021/10/03 21:43:42 UTC

[cassandra] branch trunk updated (72722e5 -> c3c2c7e)

This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    omit 72722e5  Merge branch 'cassandra-4.0' into trunk
    omit 0108061  Merge branch 'cassandra-3.11' into cassandra-4.0
     new 3660a58  Merge branch 'cassandra-3.11' into cassandra-4.0
     new c3c2c7e  Merge branch 'cassandra-4.0' into trunk

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (72722e5)
            \
             N -- N -- N   refs/heads/trunk (c3c2c7e)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/java/org/apache/cassandra/cql3/QueryProcessor.java | 8 --------
 1 file changed, 8 deletions(-)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 02/02: Merge branch 'cassandra-4.0' into trunk

Posted by if...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit c3c2c7efecb23d7627aa8ffda72603dc88f1ed37
Merge: 0945002 3660a58
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Sun Oct 3 23:24:22 2021 +0200

    Merge branch 'cassandra-4.0' into trunk

 .../org/apache/cassandra/cql3/CQLStatement.java    |   5 +
 .../org/apache/cassandra/cql3/QueryProcessor.java  |  50 +++-
 .../cql3/statements/ModificationStatement.java     |   3 +-
 .../cassandra/cql3/statements/SelectStatement.java |   3 +-
 .../cassandra/cql3/statements/UseStatement.java    |   5 +
 .../statements/schema/AlterSchemaStatement.java    |   8 +-
 src/java/org/apache/cassandra/gms/Gossiper.java    | 105 +++++++-
 .../gms/IEndpointStateChangeSubscriber.java        |  14 +-
 .../net/StartupClusterConnectivityChecker.java     |  24 --
 .../org/apache/cassandra/repair/RepairSession.java |   6 -
 .../apache/cassandra/service/LoadBroadcaster.java  |   8 -
 .../apache/cassandra/streaming/StreamSession.java  |   6 -
 .../cassandra/utils/RecomputingSupplier.java       | 122 +++++++++
 .../distributed/test/GossipShutdownTest.java       |   7 -
 .../distributed/test/NativeProtocolTest.java       |  14 +-
 .../test/ReprepareNewBehaviourTest.java            |  38 +++
 .../distributed/test/ReprepareTestBase.java        | 281 +++++++++++++++++++++
 .../test/ReprepareTestOldBehaviour.java            | 129 ++++++++++
 .../cassandra/cql3/PreparedStatementsTest.java     |   4 +-
 .../cassandra/cql3/PstmtPersistenceTest.java       |  36 +--
 .../apache/cassandra/metrics/CQLMetricsTest.java   |   5 +-
 .../cassandra/utils/RecomputingSupplierTest.java   | 150 +++++++++++
 22 files changed, 933 insertions(+), 90 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/02: Merge branch 'cassandra-3.11' into cassandra-4.0

Posted by if...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 3660a58ab458b90fdf9de5cda83099b87aad4c86
Merge: b22749b 32a15f0
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Sun Oct 3 23:23:31 2021 +0200

    Merge branch 'cassandra-3.11' into cassandra-4.0

 .../org/apache/cassandra/cql3/CQLStatement.java    |   5 +
 .../org/apache/cassandra/cql3/QueryProcessor.java  |  50 +++-
 .../cql3/statements/ModificationStatement.java     |   3 +-
 .../cassandra/cql3/statements/SelectStatement.java |   3 +-
 .../cassandra/cql3/statements/UseStatement.java    |   5 +
 .../statements/schema/AlterSchemaStatement.java    |   8 +-
 src/java/org/apache/cassandra/gms/Gossiper.java    | 105 +++++++-
 .../gms/IEndpointStateChangeSubscriber.java        |  14 +-
 .../net/StartupClusterConnectivityChecker.java     |  24 --
 .../org/apache/cassandra/repair/RepairSession.java |   6 -
 .../apache/cassandra/service/LoadBroadcaster.java  |   8 -
 .../apache/cassandra/streaming/StreamSession.java  |   6 -
 .../cassandra/utils/RecomputingSupplier.java       | 122 +++++++++
 .../distributed/test/GossipShutdownTest.java       |   7 -
 .../distributed/test/NativeProtocolTest.java       |  14 +-
 .../test/ReprepareNewBehaviourTest.java            |  38 +++
 .../distributed/test/ReprepareTestBase.java        | 281 +++++++++++++++++++++
 .../test/ReprepareTestOldBehaviour.java            | 129 ++++++++++
 .../cassandra/cql3/PreparedStatementsTest.java     |   4 +-
 .../cassandra/cql3/PstmtPersistenceTest.java       |  36 +--
 .../apache/cassandra/metrics/CQLMetricsTest.java   |   5 +-
 .../cassandra/utils/RecomputingSupplierTest.java   | 150 +++++++++++
 22 files changed, 933 insertions(+), 90 deletions(-)

diff --cc src/java/org/apache/cassandra/cql3/CQLStatement.java
index c34e27f,901ecd4..e78f733
--- a/src/java/org/apache/cassandra/cql3/CQLStatement.java
+++ b/src/java/org/apache/cassandra/cql3/CQLStatement.java
@@@ -84,30 -59,12 +84,35 @@@ public interface CQLStatemen
       *
       * @param state the current query state
       */
 -    public ResultMessage executeInternal(QueryState state, QueryOptions options) throws RequestValidationException, RequestExecutionException;
 +    public ResultMessage executeLocally(QueryState state, QueryOptions options);
  
      /**
 -     * Return an Iterable over all of the functions (both native and user-defined) used by any component
 -     * of the statement
 -     * @return functions all functions found (may contain duplicates)
 +     * Provides the context needed for audit logging statements.
 +     */
 +    AuditLogContext getAuditLogContext();
 +
 +    /**
 +     * Whether or not this CQL Statement has LWT conditions
       */
 -    public Iterable<Function> getFunctions();
 +    default public boolean hasConditions()
 +    {
 +        return false;
 +    }
 +
 +    public static abstract class Raw
 +    {
 +        protected VariableSpecifications bindVariables;
 +
 +        public void setBindVariables(List<ColumnIdentifier> variables)
 +        {
 +            bindVariables = new VariableSpecifications(variables);
 +        }
 +
 +        public abstract CQLStatement prepare(ClientState state);
 +    }
++
++    public static interface SingleKeyspaceCqlStatement extends CQLStatement
++    {
++        public String keyspace();
++    }
  }
diff --cc src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 87829ab,589a25a..3c04c2d
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@@ -61,8 -61,19 +62,18 @@@ import static org.apache.cassandra.cql3
  
  public class QueryProcessor implements QueryHandler
  {
 -    public static final CassandraVersion CQL_VERSION = new CassandraVersion("3.4.4");
 +    public static final CassandraVersion CQL_VERSION = new CassandraVersion("3.4.5");
  
+     /**
+      * If a query is prepared with a fully qualified name, but the user also uses USE (specifically when USE keyspace
+      * is different) then the IDs generated could change over time; invalidating the assumption that IDs won't ever
+      * change.  In the version defined below, the USE keyspace is ignored when a fully-qualified name is used as an
+      * attempt to make IDs stable.
+      */
+     private static final CassandraVersion PREPARE_ID_BEHAVIOR_CHANGE_30 = new CassandraVersion("3.0.26");
+     private static final CassandraVersion PREPARE_ID_BEHAVIOR_CHANGE_3X = new CassandraVersion("3.11.12");
+     private static final CassandraVersion PREPARE_ID_BEHAVIOR_CHANGE_40 = new CassandraVersion("4.0.1");
+ 
 -
      public static final QueryProcessor instance = new QueryProcessor();
  
      private static final Logger logger = LoggerFactory.getLogger(QueryProcessor.class);
@@@ -436,14 -430,36 +447,37 @@@
          if (existing != null)
              return existing;
  
 -        ParsedStatement.Prepared prepared = getStatement(queryString, clientState);
 -        prepared.rawCQLStatement = queryString;
 -        int boundTerms = prepared.statement.getBoundTerms();
 +        CQLStatement statement = getStatement(queryString, clientState);
 +        Prepared prepared = new Prepared(statement, queryString);
 +
 +        int boundTerms = statement.getBindVariables().size();
          if (boundTerms > FBUtilities.MAX_UNSIGNED_SHORT)
              throw new InvalidRequestException(String.format("Too many markers(?). %d markers exceed the allowed maximum of %d", boundTerms, FBUtilities.MAX_UNSIGNED_SHORT));
 -        assert boundTerms == prepared.boundNames.size();
  
-         return storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared);
 -        if (prepared.keyspace != null)
++        if (statement instanceof CQLStatement.SingleKeyspaceCqlStatement)
+         {
+             // Edge-case of CASSANDRA-15252 in mixed-mode cluster. We accept that 15252 itself can manifest in a
+             // cluster that has both old and new nodes, but we would like to avoid a situation when the fix adds
+             // a new behaviour that can break which, in addition, can get triggered more frequently.
+             // If statement ID was generated on the old node _with_ use, when attempting to execute on the new node,
+             // we may fall into infinite loop. To break out of this loop, we put a prepared statement that client
+             // expects into cache, so that it could get PREPARED response on the second try.
 -            ResultMessage.Prepared newBehavior = storePreparedStatement(queryString, null, prepared, forThrift);
 -            ResultMessage.Prepared oldBehavior = clientState.getRawKeyspace() != null ? storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared, forThrift) : newBehavior;
++            ResultMessage.Prepared newBehavior = storePreparedStatement(queryString, null, prepared);
++            ResultMessage.Prepared oldBehavior = clientState.getRawKeyspace() != null ? storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared) : newBehavior;
+             CassandraVersion minVersion = Gossiper.instance.getMinVersion(20, TimeUnit.MILLISECONDS);
+ 
+             // Default to old behaviour in case we're not sure about the version. Even if we ever flip back to the old
+             // behaviour due to the gossip bug or incorrect version string, we'll end up with two re-prepare round-trips.
++
+             return minVersion != null &&
+                    ((minVersion.major == 3 && minVersion.minor == 0 && minVersion.compareTo(PREPARE_ID_BEHAVIOR_CHANGE_30) >= 0) ||
+                     (minVersion.major == 3 && minVersion.minor != 0 && minVersion.compareTo(PREPARE_ID_BEHAVIOR_CHANGE_3X) >= 0) ||
+                     (minVersion.major == 4 && minVersion.compareTo(PREPARE_ID_BEHAVIOR_CHANGE_40) >= 0)) ? newBehavior : oldBehavior;
+         }
+         else
+         {
 -            return storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared, forThrift);
++            return storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared);
+         }
      }
  
      private static MD5Digest computeId(String queryString, String keyspace)
@@@ -452,23 -468,42 +486,25 @@@
          return MD5Digest.compute(toHash);
      }
  
-     private static ResultMessage.Prepared getStoredPreparedStatement(String queryString, String keyspace)
 -    private static Integer computeThriftId(String queryString, String keyspace)
 -    {
 -        String toHash = keyspace == null ? queryString : keyspace + queryString;
 -        return toHash.hashCode();
 -    }
 -
+     @VisibleForTesting
 -    public static ResultMessage.Prepared getStoredPreparedStatement(String queryString, String clientKeyspace, boolean forThrift)
++    public static ResultMessage.Prepared getStoredPreparedStatement(String queryString, String clientKeyspace)
      throws InvalidRequestException
      {
-         MD5Digest statementId = computeId(queryString, keyspace);
 -        if (forThrift)
 -        {
 -            Integer thriftStatementId = computeThriftId(queryString, clientKeyspace);
 -            ParsedStatement.Prepared existing = thriftPreparedStatements.get(thriftStatementId);
 -            if (existing == null)
 -                return null;
++        MD5Digest statementId = computeId(queryString, clientKeyspace);
 +        Prepared existing = preparedStatements.getIfPresent(statementId);
 +        if (existing == null)
 +            return null;
  
 -            checkTrue(queryString.equals(existing.rawCQLStatement),
 -                      String.format("MD5 hash collision: query with the same MD5 hash was already prepared. \n Existing: '%s'", existing.rawCQLStatement));
 -            return ResultMessage.Prepared.forThrift(thriftStatementId, existing.boundNames);
 -        }
 -        else
 -        {
 -            MD5Digest statementId = computeId(queryString, clientKeyspace);
 -            ParsedStatement.Prepared existing = preparedStatements.get(statementId);
 -            if (existing == null)
 -                return null;
 +        checkTrue(queryString.equals(existing.rawCQLStatement),
 +                String.format("MD5 hash collision: query with the same MD5 hash was already prepared. \n Existing: '%s'", existing.rawCQLStatement));
  
 -            checkTrue(queryString.equals(existing.rawCQLStatement),
 -                      String.format("MD5 hash collision: query with the same MD5 hash was already prepared. \n Existing: '%s'", existing.rawCQLStatement));
 -            return new ResultMessage.Prepared(statementId, existing);
 -        }
 +        ResultSet.PreparedMetadata preparedMetadata = ResultSet.PreparedMetadata.fromPrepared(existing.statement);
 +        ResultSet.ResultMetadata resultMetadata = ResultSet.ResultMetadata.fromPrepared(existing.statement);
 +        return new ResultMessage.Prepared(statementId, resultMetadata.getResultMetadataId(), preparedMetadata, resultMetadata);
      }
  
-     private static ResultMessage.Prepared storePreparedStatement(String queryString, String keyspace, Prepared prepared)
+     @VisibleForTesting
 -    public static ResultMessage.Prepared storePreparedStatement(String queryString, String keyspace, ParsedStatement.Prepared prepared, boolean forThrift)
++    public static ResultMessage.Prepared storePreparedStatement(String queryString, String keyspace, Prepared prepared)
      throws InvalidRequestException
      {
          // Concatenate the current keyspace so we don't mix prepared statements between keyspace (#5352).
@@@ -608,7 -656,14 +644,13 @@@
          internalStatements.clear();
      }
  
+     @VisibleForTesting
+     public static void clearPreparedStatementsCache()
+     {
 -        preparedStatements.clear();
 -        thriftPreparedStatements.clear();
++        preparedStatements.asMap().clear();
+     }
+ 
 -    private static class MigrationSubscriber extends MigrationListener
 +    private static class StatementInvalidatingListener extends SchemaChangeListener
      {
          private static void removeInvalidPreparedStatements(String ksName, String cfName)
          {
diff --cc src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 087f3b0,8376a0a..a072da5
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@@ -66,7 -59,7 +66,7 @@@ import static org.apache.cassandra.cql3
  /*
   * Abstract parent class of individual modifications, i.e. INSERT, UPDATE and DELETE.
   */
--public abstract class ModificationStatement implements CQLStatement
++public abstract class ModificationStatement implements CQLStatement.SingleKeyspaceCqlStatement
  {
      protected static final Logger logger = LoggerFactory.getLogger(ModificationStatement.class);
  
@@@ -192,13 -163,18 +192,14 @@@
          return restrictions;
      }
  
 -    public abstract void addUpdateForKey(PartitionUpdate update, Clustering clustering, UpdateParameters params);
 +    public abstract void addUpdateForKey(PartitionUpdate.Builder updateBuilder, Clustering<?> clustering, UpdateParameters params);
  
 -    public abstract void addUpdateForKey(PartitionUpdate update, Slice slice, UpdateParameters params);
 -
 -    public int getBoundTerms()
 -    {
 -        return boundTerms;
 -    }
 +    public abstract void addUpdateForKey(PartitionUpdate.Builder updateBuilder, Slice slice, UpdateParameters params);
  
++    @Override
      public String keyspace()
      {
 -        return cfm.ksName;
 +        return metadata.keyspace;
      }
  
      public String columnFamily()
diff --cc src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 774bd68,632bf94..494d9af
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@@ -85,7 -114,7 +85,7 @@@ import static org.apache.cassandra.util
   * QueryHandler implementations, so before reducing their accessibility
   * due consideration should be given.
   */
--public class SelectStatement implements CQLStatement
++public class SelectStatement implements CQLStatement.SingleKeyspaceCqlStatement
  {
      private static final Logger logger = LoggerFactory.getLogger(SelectStatement.class);
  
@@@ -474,14 -481,12 +474,15 @@@
  
      public ResultSet process(PartitionIterator partitions, int nowInSec) throws InvalidRequestException
      {
 -        return process(partitions, QueryOptions.DEFAULT, nowInSec, getLimit(QueryOptions.DEFAULT));
 +        QueryOptions options = QueryOptions.DEFAULT;
 +        Selectors selectors = selection.newSelectors(options);
 +        return process(partitions, options, selectors, nowInSec, getLimit(options));
      }
  
++    @Override
      public String keyspace()
      {
 -        return cfm.ksName;
 +        return table.keyspace;
      }
  
      public String columnFamily()
diff --cc src/java/org/apache/cassandra/cql3/statements/UseStatement.java
index 3013d9f,0242f09..0504f43
--- a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
@@@ -64,16 -65,4 +64,21 @@@ public class UseStatement extends CQLSt
          // but for some unit tests we need to set the keyspace (e.g. for tests with DROP INDEX)
          return execute(state, options, System.nanoTime());
      }
 +    
 +    @Override
 +    public String toString()
 +    {
 +        return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
 +    }
 +
 +    @Override
 +    public AuditLogContext getAuditLogContext()
 +    {
 +        return new AuditLogContext(AuditLogEntryType.USE_KEYSPACE, keyspace);
 +    }
++
++    public String keyspace()
++    {
++        return keyspace;
++    }
  }
diff --cc src/java/org/apache/cassandra/cql3/statements/schema/AlterSchemaStatement.java
index 161c9c4,0000000..124d04c
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/statements/schema/AlterSchemaStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/schema/AlterSchemaStatement.java
@@@ -1,153 -1,0 +1,159 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.statements.schema;
 +
 +import java.util.Set;
 +
 +import com.google.common.collect.ImmutableSet;
 +
 +import org.apache.cassandra.auth.AuthenticatedUser;
 +import org.apache.cassandra.auth.IResource;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.cql3.CQLStatement;
 +import org.apache.cassandra.cql3.QueryOptions;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.schema.*;
 +import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 +import org.apache.cassandra.service.ClientState;
 +import org.apache.cassandra.service.ClientWarn;
 +import org.apache.cassandra.service.QueryState;
 +import org.apache.cassandra.transport.Event.SchemaChange;
 +import org.apache.cassandra.transport.messages.ResultMessage;
 +
- abstract class AlterSchemaStatement implements CQLStatement, SchemaTransformation
++abstract public class AlterSchemaStatement implements CQLStatement.SingleKeyspaceCqlStatement, SchemaTransformation
 +{
 +    protected final String keyspaceName; // name of the keyspace affected by the statement
 +
 +    protected AlterSchemaStatement(String keyspaceName)
 +    {
 +        this.keyspaceName = keyspaceName;
 +    }
 +
 +    public final void validate(ClientState state)
 +    {
 +        // no-op; validation is performed while executing the statement, in apply()
 +    }
 +
 +    public ResultMessage execute(QueryState state, QueryOptions options, long queryStartNanoTime)
 +    {
 +        return execute(state, false);
 +    }
 +
++    @Override
++    public String keyspace()
++    {
++        return keyspaceName;
++    }
++
 +    public ResultMessage executeLocally(QueryState state, QueryOptions options)
 +    {
 +        return execute(state, true);
 +    }
 +
 +    /**
 +     * TODO: document
 +     */
 +    abstract SchemaChange schemaChangeEvent(KeyspacesDiff diff);
 +
 +    /**
 +     * Schema alteration may result in a new database object (keyspace, table, role, function) being created capable of
 +     * having permissions GRANTed on it. The creator of the object (the primary role assigned to the AuthenticatedUser
 +     * performing the operation) is automatically granted ALL applicable permissions on the object. This is a hook for
 +     * subclasses to override in order indicate which resources to to perform that grant on when the statement is executed.
 +     *
 +     * Only called if the transformation resulted in a non-empty diff.
 +     */
 +    Set<IResource> createdResources(KeyspacesDiff diff)
 +    {
 +        return ImmutableSet.of();
 +    }
 +
 +    /**
 +     * Schema alteration might produce a client warning (e.g. a warning to run full repair when increading RF of a keyspace).
 +     * This method should be used to generate them instead of calling warn() in transformation code.
 +     *
 +     * Only called if the transformation resulted in a non-empty diff.
 +     */
 +    Set<String> clientWarnings(KeyspacesDiff diff)
 +    {
 +        return ImmutableSet.of();
 +    }
 +
 +    public ResultMessage execute(QueryState state, boolean locally)
 +    {
 +        if (SchemaConstants.isLocalSystemKeyspace(keyspaceName))
 +            throw ire("System keyspace '%s' is not user-modifiable", keyspaceName);
 +
 +        KeyspaceMetadata keyspace = Schema.instance.getKeyspaceMetadata(keyspaceName);
 +        if (null != keyspace && keyspace.isVirtual())
 +            throw ire("Virtual keyspace '%s' is not user-modifiable", keyspaceName);
 +
 +        validateKeyspaceName();
 +
 +        KeyspacesDiff diff = MigrationManager.announce(this, locally);
 +
 +        clientWarnings(diff).forEach(ClientWarn.instance::warn);
 +
 +        if (diff.isEmpty())
 +            return new ResultMessage.Void();
 +
 +        /*
 +         * When a schema alteration results in a new db object being created, we grant permissions on the new
 +         * object to the user performing the request if:
 +         * - the user is not anonymous
 +         * - the configured IAuthorizer supports granting of permissions (not all do, AllowAllAuthorizer doesn't and
 +         *   custom external implementations may not)
 +         */
 +        AuthenticatedUser user = state.getClientState().getUser();
 +        if (null != user && !user.isAnonymous())
 +            createdResources(diff).forEach(r -> grantPermissionsOnResource(r, user));
 +
 +        return new ResultMessage.SchemaChange(schemaChangeEvent(diff));
 +    }
 +
 +    private void validateKeyspaceName()
 +    {
 +        if (!SchemaConstants.isValidName(keyspaceName))
 +        {
 +            throw ire("Keyspace name must not be empty, more than %d characters long, " +
 +                      "or contain non-alphanumeric-underscore characters (got '%s')",
 +                      SchemaConstants.NAME_LENGTH, keyspaceName);
 +        }
 +    }
 +
 +    private void grantPermissionsOnResource(IResource resource, AuthenticatedUser user)
 +    {
 +        try
 +        {
 +            DatabaseDescriptor.getAuthorizer()
 +                              .grant(AuthenticatedUser.SYSTEM_USER,
 +                                     resource.applicablePermissions(),
 +                                     resource,
 +                                     user.getPrimaryRole());
 +        }
 +        catch (UnsupportedOperationException e)
 +        {
 +            // not a problem - grant is an optional method on IAuthorizer
 +        }
 +    }
 +
 +    static InvalidRequestException ire(String format, Object... args)
 +    {
 +        return new InvalidRequestException(String.format(format, args));
 +    }
 +}
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index 6eb674f,363db5f..c147434
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -33,7 -34,6 +33,8 @@@ import com.google.common.base.Throwable
  import com.google.common.collect.ImmutableList;
  import com.google.common.collect.ImmutableMap;
  import com.google.common.collect.Iterables;
 +import com.google.common.collect.ImmutableSet;
++import com.google.common.collect.Iterables;
  import com.google.common.collect.Sets;
  import com.google.common.util.concurrent.ListenableFutureTask;
  import com.google.common.util.concurrent.Uninterruptibles;
@@@ -51,23 -46,26 +52,30 @@@ import org.apache.cassandra.utils.Pair
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
++import io.netty.util.concurrent.FastThreadLocal;
  import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
  import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
  import org.apache.cassandra.concurrent.Stage;
 -import org.apache.cassandra.concurrent.StageManager;
  import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.db.SystemKeyspace;
  import org.apache.cassandra.dht.Token;
 -import org.apache.cassandra.net.IAsyncCallback;
 -import org.apache.cassandra.net.MessageIn;
 -import org.apache.cassandra.net.MessageOut;
 +import org.apache.cassandra.net.RequestCallback;
 +import org.apache.cassandra.net.Message;
  import org.apache.cassandra.net.MessagingService;
  import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.utils.CassandraVersion;
++import org.apache.cassandra.utils.ExecutorUtils;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.JVMStabilityInspector;
++import org.apache.cassandra.utils.MBeanWrapper;
++import org.apache.cassandra.utils.NoSpamLogger;
++import org.apache.cassandra.utils.Pair;
+ import org.apache.cassandra.utils.RecomputingSupplier;
  
 -import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination;
 -import static org.apache.cassandra.utils.ExecutorUtils.shutdown;
 +import static org.apache.cassandra.config.CassandraRelevantProperties.GOSSIPER_QUARANTINE_DELAY;
 +import static org.apache.cassandra.net.NoPayload.noPayload;
 +import static org.apache.cassandra.net.Verb.ECHO_REQ;
 +import static org.apache.cassandra.net.Verb.GOSSIP_DIGEST_SYN;
  
  /**
   * This module is responsible for Gossiping information for the local endpoint. This abstraction
@@@ -333,7 -259,9 +341,10 @@@ public class Gossiper implements IFailu
          }
      }
  
-     Gossiper(boolean registerJmx)
+     private final RecomputingSupplier<CassandraVersion> minVersionSupplier = new RecomputingSupplier<>(this::computeMinVersion, executor);
+ 
 -    private Gossiper()
++    @VisibleForTesting
++    public Gossiper(boolean registerJmx)
      {
          // half of QUARATINE_DELAY, to ensure justRemovedEndpoints has enough leeway to prevent re-gossip
          fatClientTimeout = (QUARANTINE_DELAY / 2);
@@@ -341,10 -269,33 +352,35 @@@
          FailureDetector.instance.registerFailureDetectionEventListener(this);
  
          // Register this instance with JMX
 -        MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
 -
 +        if (registerJmx)
 +        {
 +            MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
 +        }
+ 
+         subscribers.add(new IEndpointStateChangeSubscriber()
+         {
 -            public void onJoin(InetAddress endpoint, EndpointState state)
++            public void onJoin(InetAddressAndPort endpoint, EndpointState state)
+ 	    {
+                 maybeRecompute(state);
+             }
+ 
 -            public void onAlive(InetAddress endpoint, EndpointState state)
++            public void onAlive(InetAddressAndPort endpoint, EndpointState state)
+ 	    {
+                 maybeRecompute(state);
+             }
+ 
+             private void maybeRecompute(EndpointState state)
+ 	    {
+                 if (state.getApplicationState(ApplicationState.RELEASE_VERSION) != null)
+                     minVersionSupplier.recompute();
+             }
+ 
 -            public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
++            public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value)
+             {
+                 if (state == ApplicationState.RELEASE_VERSION)
+                     minVersionSupplier.recompute();
+             }
+         });
      }
  
      public void setLastProcessedMessageAt(long timeInMillis)
@@@ -1712,8 -1506,9 +1748,9 @@@
          buildSeedsList();
          /* initialize the heartbeat state for this localEndpoint */
          maybeInitializeLocalState(generationNbr);
 -        EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddress());
 +        EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort());
          localState.addApplicationStates(preloadLocalStates);
+         minVersionSupplier.recompute();
  
          //notify snitches that Gossiper is about to start
          DatabaseDescriptor.getEndpointSnitch().gossiperStarting();
@@@ -2285,4 -1906,76 +2322,70 @@@
          stop();
          ExecutorUtils.shutdownAndWait(timeout, unit, executor);
      }
+ 
+     @Nullable
+     public CassandraVersion getMinVersion(int delay, TimeUnit timeUnit)
+     {
+         try
+         {
+             return minVersionSupplier.get(delay, timeUnit);
+         }
+         catch (TimeoutException e)
+         {
+             // Timeouts here are harmless: they won't cause reprepares and may only
+             // cause the old version of the hash to be kept for longer
+             return null;
+         }
+         catch (Throwable e)
+         {
+             logger.error("Caught an exception while waiting for min version", e);
+             return null;
+         }
+     }
+ 
+     @Nullable
 -    private String getReleaseVersionString(InetAddress ep)
++    private String getReleaseVersionString(InetAddressAndPort ep)
+     {
+         EndpointState state = getEndpointStateForEndpoint(ep);
+         if (state == null)
+             return null;
+ 
+         VersionedValue value = state.getApplicationState(ApplicationState.RELEASE_VERSION);
+         return value == null ? null : value.value;
+     }
+ 
+     private CassandraVersion computeMinVersion()
+     {
+         CassandraVersion minVersion = null;
+ 
 -        for (InetAddress addr : Iterables.concat(Gossiper.instance.getLiveMembers(),
++        for (InetAddressAndPort addr : Iterables.concat(Gossiper.instance.getLiveMembers(),
+                                                  Gossiper.instance.getUnreachableMembers()))
+         {
+             String versionString = getReleaseVersionString(addr);
+             // Raced with changes to gossip state, wait until next iteration
+             if (versionString == null)
+                 return null;
+ 
+             CassandraVersion version;
+ 
+             try
+             {
+                 version = new CassandraVersion(versionString);
+             }
+             catch (Throwable t)
+             {
+                 JVMStabilityInspector.inspectThrowable(t);
+                 String message = String.format("Can't parse version string %s", versionString);
+                 logger.warn(message);
+                 if (logger.isDebugEnabled())
+                     logger.debug(message, t);
+                 return null;
+             }
+ 
+             if (minVersion == null || version.compareTo(minVersion) < 0)
+                 minVersion = version;
+         }
+ 
+         return minVersion;
+     }
 -
 -    @VisibleForTesting
 -    public void setAnyNodeOn30(boolean anyNodeOn30)
 -    {
 -        this.anyNodeOn30 = anyNodeOn30;
 -    }
  }
diff --cc src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
index dc81650,861d4ac..df04697
--- a/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
+++ b/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
@@@ -36,17 -36,17 +36,17 @@@ public interface IEndpointStateChangeSu
       * @param endpoint endpoint for which the state change occurred.
       * @param epState  state that actually changed for the above endpoint.
       */
-     public void onJoin(InetAddressAndPort endpoint, EndpointState epState);
 -    default void onJoin(InetAddress endpoint, EndpointState epState) {}
++    default void onJoin(InetAddressAndPort endpoint, EndpointState epState) {}
      
-     public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue);
 -    default void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
++    default void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
  
-     public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value);
 -    default void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
++    default void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) {}
  
-     public void onAlive(InetAddressAndPort endpoint, EndpointState state);
 -    default void onAlive(InetAddress endpoint, EndpointState state) {}
++    default void onAlive(InetAddressAndPort endpoint, EndpointState state) {}
  
-     public void onDead(InetAddressAndPort endpoint, EndpointState state);
 -    default void onDead(InetAddress endpoint, EndpointState state) {}
++    default void onDead(InetAddressAndPort endpoint, EndpointState state) {}
  
-     public void onRemove(InetAddressAndPort endpoint);
 -    default void onRemove(InetAddress endpoint) {}
++    default void onRemove(InetAddressAndPort endpoint) {}
  
      /**
       * Called whenever a node is restarted.
@@@ -54,5 -54,5 +54,5 @@@
       * previously marked down. It will have only if {@code state.isAlive() == false}
       * as {@code state} is from before the restarted node is marked up.
       */
-     public void onRestart(InetAddressAndPort endpoint, EndpointState state);
 -    default void onRestart(InetAddress endpoint, EndpointState state) {}
++    default void onRestart(InetAddressAndPort endpoint, EndpointState state) {}
  }
diff --cc src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
index b901338,0000000..8bc1e5d
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
+++ b/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
@@@ -1,275 -1,0 +1,251 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.net;
 +
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.function.Function;
 +import java.util.stream.Collectors;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.collect.HashMultimap;
 +import com.google.common.collect.SetMultimap;
 +import com.google.common.util.concurrent.Uninterruptibles;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.gms.ApplicationState;
 +import org.apache.cassandra.gms.EndpointState;
 +import org.apache.cassandra.gms.Gossiper;
 +import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
 +import org.apache.cassandra.gms.VersionedValue;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +import static org.apache.cassandra.net.Verb.PING_REQ;
 +import static org.apache.cassandra.net.ConnectionType.LARGE_MESSAGES;
 +import static org.apache.cassandra.net.ConnectionType.SMALL_MESSAGES;
 +
 +public class StartupClusterConnectivityChecker
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(StartupClusterConnectivityChecker.class);
 +
 +    private final boolean blockForRemoteDcs;
 +    private final long timeoutNanos;
 +
 +    public static StartupClusterConnectivityChecker create(long timeoutSecs, boolean blockForRemoteDcs)
 +    {
 +        if (timeoutSecs > 100)
 +            logger.warn("setting the block-for-peers timeout (in seconds) to {} might be a bit excessive, but using it nonetheless", timeoutSecs);
 +        long timeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSecs);
 +
 +        return new StartupClusterConnectivityChecker(timeoutNanos, blockForRemoteDcs);
 +    }
 +
 +    @VisibleForTesting
 +    StartupClusterConnectivityChecker(long timeoutNanos, boolean blockForRemoteDcs)
 +    {
 +        this.blockForRemoteDcs = blockForRemoteDcs;
 +        this.timeoutNanos = timeoutNanos;
 +    }
 +
 +    /**
 +     * @param peers The currently known peers in the cluster; argument is not modified.
 +     * @param getDatacenterSource A function for mapping peers to their datacenter.
 +     * @return true if the requested percentage of peers are marked ALIVE in gossip and have their connections opened;
 +     * else false.
 +     */
 +    public boolean execute(Set<InetAddressAndPort> peers, Function<InetAddressAndPort, String> getDatacenterSource)
 +    {
 +        if (peers == null || this.timeoutNanos < 0)
 +            return true;
 +
 +        // make a copy of the set, to avoid mucking with the input (in case it's a sensitive collection)
 +        peers = new HashSet<>(peers);
 +        InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
 +        String localDc = getDatacenterSource.apply(localAddress);
 +
 +        peers.remove(localAddress);
 +        if (peers.isEmpty())
 +            return true;
 +
 +        // make a copy of the datacenter mapping (in case gossip updates happen during this method or some such)
 +        Map<InetAddressAndPort, String> peerToDatacenter = new HashMap<>();
 +        SetMultimap<String, InetAddressAndPort> datacenterToPeers = HashMultimap.create();
 +
 +        for (InetAddressAndPort peer : peers)
 +        {
 +            String datacenter = getDatacenterSource.apply(peer);
 +            peerToDatacenter.put(peer, datacenter);
 +            datacenterToPeers.put(datacenter, peer);
 +        }
 +
 +        // In the case where we do not want to block startup on remote datacenters (e.g. because clients only use
 +        // LOCAL_X consistency levels), we remove all other datacenter hosts from the mapping and we only wait
 +        // on the remaining local datacenter.
 +        if (!blockForRemoteDcs)
 +        {
 +            datacenterToPeers.keySet().retainAll(Collections.singleton(localDc));
 +            logger.info("Blocking coordination until only a single peer is DOWN in the local datacenter, timeout={}s",
 +                        TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
 +        }
 +        else
 +        {
 +            logger.info("Blocking coordination until only a single peer is DOWN in each datacenter, timeout={}s",
 +                        TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
 +        }
 +
 +        AckMap acks = new AckMap(3);
 +        Map<String, CountDownLatch> dcToRemainingPeers = new HashMap<>(datacenterToPeers.size());
 +        for (String datacenter: datacenterToPeers.keys())
 +        {
 +            dcToRemainingPeers.put(datacenter,
 +                                   new CountDownLatch(Math.max(datacenterToPeers.get(datacenter).size() - 1, 0)));
 +        }
 +
 +        long startNanos = System.nanoTime();
 +
 +        // set up a listener to react to new nodes becoming alive (in gossip), and account for all the nodes that are already alive
 +        Set<InetAddressAndPort> alivePeers = Collections.newSetFromMap(new ConcurrentHashMap<>());
 +        AliveListener listener = new AliveListener(alivePeers, dcToRemainingPeers, acks, peerToDatacenter::get);
 +        Gossiper.instance.register(listener);
 +
 +        // send out a ping message to open up the non-gossip connections to all peers. Note that this sends the
 +        // ping messages to _all_ peers, not just the ones we block for in dcToRemainingPeers.
 +        sendPingMessages(peers, dcToRemainingPeers, acks, peerToDatacenter::get);
 +
 +        for (InetAddressAndPort peer : peers)
 +        {
 +            if (Gossiper.instance.isAlive(peer) && alivePeers.add(peer) && acks.incrementAndCheck(peer))
 +            {
 +                String datacenter = peerToDatacenter.get(peer);
 +                // We have to check because we might only have the local DC in the map
 +                if (dcToRemainingPeers.containsKey(datacenter))
 +                    dcToRemainingPeers.get(datacenter).countDown();
 +            }
 +        }
 +
 +        boolean succeeded = true;
 +        for (CountDownLatch countDownLatch : dcToRemainingPeers.values())
 +        {
 +            long remainingNanos = Math.max(1, timeoutNanos - (System.nanoTime() - startNanos));
 +            //noinspection UnstableApiUsage
 +            succeeded &= Uninterruptibles.awaitUninterruptibly(countDownLatch, remainingNanos, TimeUnit.NANOSECONDS);
 +        }
 +
 +        Gossiper.instance.unregister(listener);
 +
 +        Map<String, Long> numDown = dcToRemainingPeers.entrySet().stream()
 +                                                      .collect(Collectors.toMap(Map.Entry::getKey,
 +                                                                                e -> e.getValue().getCount()));
 +
 +        if (succeeded)
 +        {
 +            logger.info("Ensured sufficient healthy connections with {} after {} milliseconds",
 +                        numDown.keySet(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
 +        }
 +        else
 +        {
 +            logger.warn("Timed out after {} milliseconds, was waiting for remaining peers to connect: {}",
 +                        TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos), numDown);
 +        }
 +
 +        return succeeded;
 +    }
 +
 +    /**
 +     * Sends a "connection warmup" message to each peer in the collection, on every {@link ConnectionType}
 +     * used for internode messaging (that is not gossip).
 +     */
 +    private void sendPingMessages(Set<InetAddressAndPort> peers, Map<String, CountDownLatch> dcToRemainingPeers,
 +                                  AckMap acks, Function<InetAddressAndPort, String> getDatacenter)
 +    {
 +        RequestCallback responseHandler = msg -> {
 +            if (acks.incrementAndCheck(msg.from()))
 +            {
 +                String datacenter = getDatacenter.apply(msg.from());
 +                // We have to check because we might only have the local DC in the map
 +                if (dcToRemainingPeers.containsKey(datacenter))
 +                    dcToRemainingPeers.get(datacenter).countDown();
 +            }
 +        };
 +
 +        Message<PingRequest> small = Message.out(PING_REQ, PingRequest.forSmall);
 +        Message<PingRequest> large = Message.out(PING_REQ, PingRequest.forLarge);
 +        for (InetAddressAndPort peer : peers)
 +        {
 +            MessagingService.instance().sendWithCallback(small, peer, responseHandler, SMALL_MESSAGES);
 +            MessagingService.instance().sendWithCallback(large, peer, responseHandler, LARGE_MESSAGES);
 +        }
 +    }
 +
 +    /**
 +     * A trivial implementation of {@link IEndpointStateChangeSubscriber} that really only cares about
 +     * {@link #onAlive(InetAddressAndPort, EndpointState)} invocations.
 +     */
 +    private static final class AliveListener implements IEndpointStateChangeSubscriber
 +    {
 +        private final Map<String, CountDownLatch> dcToRemainingPeers;
 +        private final Set<InetAddressAndPort> livePeers;
 +        private final Function<InetAddressAndPort, String> getDatacenter;
 +        private final AckMap acks;
 +
 +        AliveListener(Set<InetAddressAndPort> livePeers, Map<String, CountDownLatch> dcToRemainingPeers,
 +                      AckMap acks, Function<InetAddressAndPort, String> getDatacenter)
 +        {
 +            this.livePeers = livePeers;
 +            this.dcToRemainingPeers = dcToRemainingPeers;
 +            this.acks = acks;
 +            this.getDatacenter = getDatacenter;
 +        }
 +
-         public void onJoin(InetAddressAndPort endpoint, EndpointState epState)
-         {
-         }
- 
-         public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue)
-         {
-         }
- 
-         public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value)
-         {
-         }
- 
 +        public void onAlive(InetAddressAndPort endpoint, EndpointState state)
 +        {
 +            if (livePeers.add(endpoint) && acks.incrementAndCheck(endpoint))
 +            {
 +                String datacenter = getDatacenter.apply(endpoint);
 +                if (dcToRemainingPeers.containsKey(datacenter))
 +                    dcToRemainingPeers.get(datacenter).countDown();
 +            }
 +        }
- 
-         public void onDead(InetAddressAndPort endpoint, EndpointState state)
-         {
-         }
- 
-         public void onRemove(InetAddressAndPort endpoint)
-         {
-         }
- 
-         public void onRestart(InetAddressAndPort endpoint, EndpointState state)
-         {
-         }
 +    }
 +
 +    private static final class AckMap
 +    {
 +        private final int threshold;
 +        private final Map<InetAddressAndPort, AtomicInteger> acks;
 +
 +        AckMap(int threshold)
 +        {
 +            this.threshold = threshold;
 +            acks = new ConcurrentHashMap<>();
 +        }
 +
 +        boolean incrementAndCheck(InetAddressAndPort address)
 +        {
 +            return acks.computeIfAbsent(address, addr -> new AtomicInteger(0)).incrementAndGet() == threshold;
 +        }
 +    }
 +}
diff --cc src/java/org/apache/cassandra/repair/RepairSession.java
index e13c90c,1207d36..75ed1a3
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@@ -342,13 -333,7 +342,7 @@@ public class RepairSession extends Abst
          terminate();
      }
  
-     public void onJoin(InetAddressAndPort endpoint, EndpointState epState) {}
-     public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
-     public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) {}
-     public void onAlive(InetAddressAndPort endpoint, EndpointState state) {}
-     public void onDead(InetAddressAndPort endpoint, EndpointState state) {}
- 
 -    public void onRemove(InetAddress endpoint)
 +    public void onRemove(InetAddressAndPort endpoint)
      {
          convict(endpoint, Double.MAX_VALUE);
      }
diff --cc src/java/org/apache/cassandra/service/LoadBroadcaster.java
index 113aead,0a3e1a4..ebda3cd
--- a/src/java/org/apache/cassandra/service/LoadBroadcaster.java
+++ b/src/java/org/apache/cassandra/service/LoadBroadcaster.java
@@@ -60,16 -60,8 +60,8 @@@ public class LoadBroadcaster implement
              onChange(endpoint, ApplicationState.LOAD, localValue);
          }
      }
-     
-     public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
- 
-     public void onAlive(InetAddressAndPort endpoint, EndpointState state) {}
- 
-     public void onDead(InetAddressAndPort endpoint, EndpointState state) {}
- 
-     public void onRestart(InetAddressAndPort endpoint, EndpointState state) {}
  
 -    public void onRemove(InetAddress endpoint)
 +    public void onRemove(InetAddressAndPort endpoint)
      {
          loadInfo.remove(endpoint);
      }
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index 3a32834,675304f..0b4c0cd
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -910,15 -749,9 +910,9 @@@ public class StreamSession implements I
          maybeCompleted();
      }
  
-     public void onJoin(InetAddressAndPort endpoint, EndpointState epState) {}
-     public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
-     public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) {}
-     public void onAlive(InetAddressAndPort endpoint, EndpointState state) {}
-     public void onDead(InetAddressAndPort endpoint, EndpointState state) {}
- 
 -    public void onRemove(InetAddress endpoint)
 +    public void onRemove(InetAddressAndPort endpoint)
      {
 -        logger.error("[Stream #{}] Session failed because remote peer {} has left.", planId(), peer.getHostAddress());
 +        logger.error("[Stream #{}] Session failed because remote peer {} has left.", planId(), peer.toString());
          closeSession(State.FAILED);
      }
  
diff --cc test/distributed/org/apache/cassandra/distributed/test/NativeProtocolTest.java
index 0905b92,92751ef..60fe5b6
--- a/test/distributed/org/apache/cassandra/distributed/test/NativeProtocolTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/NativeProtocolTest.java
@@@ -27,7 -26,8 +26,16 @@@ import com.datastax.driver.core.ResultS
  import com.datastax.driver.core.Session;
  import com.datastax.driver.core.SimpleStatement;
  import com.datastax.driver.core.Statement;
++import com.datastax.driver.core.exceptions.DriverInternalError;
++import com.datastax.driver.core.policies.LoadBalancingPolicy;
++import net.bytebuddy.ByteBuddy;
++import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
++import net.bytebuddy.implementation.MethodDelegation;
++import org.apache.cassandra.cql3.CQLStatement;
++import org.apache.cassandra.cql3.QueryHandler;
++import org.apache.cassandra.cql3.QueryProcessor;
  import org.apache.cassandra.distributed.api.ICluster;
+ import org.apache.cassandra.distributed.impl.RowUtil;
  
  import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
  import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
diff --cc test/distributed/org/apache/cassandra/distributed/test/ReprepareTestBase.java
index 0000000,c270144..02af6d5
mode 000000,100644..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ReprepareTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ReprepareTestBase.java
@@@ -1,0 -1,281 +1,281 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.distributed.test;
+ 
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.Comparator;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.concurrent.CopyOnWriteArrayList;
++import java.util.concurrent.TimeUnit;
+ import java.util.function.BiConsumer;
+ 
+ import com.google.common.collect.Iterators;
+ import org.junit.Assert;
+ 
+ import com.datastax.driver.core.Cluster;
+ import com.datastax.driver.core.Host;
+ import com.datastax.driver.core.HostDistance;
+ import com.datastax.driver.core.PreparedStatement;
+ import com.datastax.driver.core.Session;
+ import com.datastax.driver.core.Statement;
+ import com.datastax.driver.core.exceptions.DriverInternalError;
+ import com.datastax.driver.core.policies.LoadBalancingPolicy;
+ import net.bytebuddy.ByteBuddy;
+ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+ import net.bytebuddy.implementation.FixedValue;
+ import net.bytebuddy.implementation.MethodDelegation;
++import org.apache.cassandra.cql3.CQLStatement;
++import org.apache.cassandra.cql3.QueryHandler;
+ import org.apache.cassandra.cql3.QueryProcessor;
 -import org.apache.cassandra.cql3.statements.ParsedStatement;
+ import org.apache.cassandra.distributed.api.ICluster;
+ import org.apache.cassandra.distributed.api.IInvokableInstance;
+ import org.apache.cassandra.exceptions.InvalidRequestException;
+ import org.apache.cassandra.service.ClientState;
 -import org.apache.cassandra.service.QueryState;
+ import org.apache.cassandra.transport.messages.ResultMessage;
+ import org.apache.cassandra.utils.FBUtilities;
+ 
+ import static net.bytebuddy.matcher.ElementMatchers.named;
+ import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+ import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+ import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+ import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+ import static org.apache.cassandra.distributed.shared.AssertUtils.fail;
+ 
+ public class ReprepareTestBase extends TestBaseImpl
+ {
+     protected static ReprepareTestConfiguration cfg(boolean withUse, boolean skipBrokenBehaviours)
+     {
+         return new ReprepareTestConfiguration(withUse, skipBrokenBehaviours);
+     }
+ 
+     public void testReprepare(BiConsumer<ClassLoader, Integer> instanceInitializer, ReprepareTestConfiguration... configs) throws Throwable
+     {
+         try (ICluster<IInvokableInstance> c = init(builder().withNodes(2)
+                                                             .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))
+                                                             .withInstanceInitializer(instanceInitializer)
+                                                             .start()))
+         {
+             ForceHostLoadBalancingPolicy lbp = new ForceHostLoadBalancingPolicy();
+             c.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck));"));
+ 
+             for (ReprepareTestConfiguration config : configs)
+             {
+                 // 1 has old behaviour
+                 for (int firstContact : new int[]{ 1, 2 })
+                 {
+                     try (com.datastax.driver.core.Cluster cluster = com.datastax.driver.core.Cluster.builder()
+                                                                                                     .addContactPoint("127.0.0.1")
+                                                                                                     .addContactPoint("127.0.0.2")
+                                                                                                     .withLoadBalancingPolicy(lbp)
+                                                                                                     .build();
+                          Session session = cluster.connect())
+                     {
+                         lbp.setPrimary(firstContact);
+                         final PreparedStatement select = session.prepare(withKeyspace("SELECT * FROM %s.tbl"));
+                         session.execute(select.bind());
+ 
+                         c.stream().forEach((i) -> i.runOnInstance(QueryProcessor::clearPreparedStatementsCache));
+ 
+                         lbp.setPrimary(firstContact == 1 ? 2 : 1);
+ 
+                         if (config.withUse)
+                             session.execute(withKeyspace("USE %s"));
+ 
+                         // Re-preparing on the node
+                         if (!config.skipBrokenBehaviours && firstContact == 1)
+                             session.execute(select.bind());
+ 
+                         c.stream().forEach((i) -> i.runOnInstance(QueryProcessor::clearPreparedStatementsCache));
+ 
+                         lbp.setPrimary(firstContact);
+ 
+                         // Re-preparing on the node with old behaviour will break no matter where the statement was initially prepared
+                         if (!config.skipBrokenBehaviours)
+                             session.execute(select.bind());
+ 
+                         c.stream().forEach((i) -> i.runOnInstance(QueryProcessor::clearPreparedStatementsCache));
+                     }
+                 }
+             }
+         }
+     }
+ 
+     public void testReprepareTwoKeyspaces(BiConsumer<ClassLoader, Integer> instanceInitializer) throws Throwable
+     {
+         try (ICluster<IInvokableInstance> c = init(builder().withNodes(2)
+                                                             .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))
+                                                             .withInstanceInitializer(instanceInitializer)
+                                                             .start()))
+         {
+             c.schemaChange(withKeyspace("CREATE KEYSPACE %s2 WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2};"));
+             c.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck));"));
+ 
+             ForceHostLoadBalancingPolicy lbp = new ForceHostLoadBalancingPolicy();
+ 
+             for (int firstContact : new int[]{ 1, 2 })
+                 try (com.datastax.driver.core.Cluster cluster = com.datastax.driver.core.Cluster.builder()
+                                                                                                 .addContactPoint("127.0.0.1")
+                                                                                                 .addContactPoint("127.0.0.2")
+                                                                                                 .withLoadBalancingPolicy(lbp)
+                                                                                                 .build();
+                      Session session = cluster.connect())
+                 {
+                     {
+                         session.execute(withKeyspace("USE %s"));
+                         c.stream().forEach((i) -> i.runOnInstance(QueryProcessor::clearPreparedStatementsCache));
+ 
+                         lbp.setPrimary(firstContact);
+                         final PreparedStatement select = session.prepare(withKeyspace("SELECT * FROM %s.tbl"));
+                         session.execute(select.bind());
+ 
+                         c.stream().forEach((i) -> i.runOnInstance(QueryProcessor::clearPreparedStatementsCache));
+ 
+                         lbp.setPrimary(firstContact == 1 ? 2 : 1);
+                         session.execute(withKeyspace("USE %s2"));
+                         try
+                         {
+                             session.execute(select.bind());
+                         }
+                         catch (DriverInternalError e)
+                         {
+                             Assert.assertTrue(e.getCause().getMessage().contains("can't execute it on"));
+                             continue;
+                         }
+                         fail("Should have thrown");
+                     }
+                 }
+         }
+     }
+ 
+     protected static class ReprepareTestConfiguration
+     {
+         protected final boolean withUse;
+         protected final boolean skipBrokenBehaviours;
+ 
+         protected ReprepareTestConfiguration(boolean withUse, boolean skipBrokenBehaviours)
+         {
+             this.withUse = withUse;
+             this.skipBrokenBehaviours = skipBrokenBehaviours;
+         }
+     }
+ 
+     public static class PrepareBehaviour
+     {
+         protected static void setReleaseVersion(ClassLoader cl, String value)
+         {
+             new ByteBuddy().rebase(FBUtilities.class)
+                            .method(named("getReleaseVersionString"))
+                            .intercept(FixedValue.value(value))
+                            .make()
+                            .load(cl, ClassLoadingStrategy.Default.INJECTION);
+         }
+ 
+         static void newBehaviour(ClassLoader cl, int nodeNumber)
+         {
+             setReleaseVersion(cl, "3.0.19.63");
+         }
+ 
+         static void oldBehaviour(ClassLoader cl, int nodeNumber)
+         {
+             if (nodeNumber == 1)
+             {
+                 new ByteBuddy().rebase(QueryProcessor.class) // note that we need to `rebase` when we use @SuperCall
+                                .method(named("prepare").and(takesArguments(2)))
+                                .intercept(MethodDelegation.to(PrepareBehaviour.class))
+                                .make()
+                                .load(cl, ClassLoadingStrategy.Default.INJECTION);
+                 setReleaseVersion(cl, "3.0.19.60");
+             }
+             else
+             {
+                 setReleaseVersion(cl, "3.0.19.63");
+             }
+         }
+ 
 -        public static ResultMessage.Prepared prepare(String queryString, QueryState queryState)
++        public static ResultMessage.Prepared prepare(String queryString, ClientState clientState)
+         {
 -            ClientState clientState = queryState.getClientState();
 -            ResultMessage.Prepared existing = QueryProcessor.getStoredPreparedStatement(queryString, clientState.getRawKeyspace(), false);
++            ResultMessage.Prepared existing = QueryProcessor.getStoredPreparedStatement(queryString, clientState.getRawKeyspace());
+             if (existing != null)
+                 return existing;
+ 
 -            ParsedStatement.Prepared prepared = QueryProcessor.getStatement(queryString, clientState);
 -            int boundTerms = prepared.statement.getBoundTerms();
++            CQLStatement statement = QueryProcessor.getStatement(queryString, clientState);
++            QueryHandler.Prepared prepared = new QueryHandler.Prepared(statement, queryString);
++
++            int boundTerms = statement.getBindVariables().size();
+             if (boundTerms > FBUtilities.MAX_UNSIGNED_SHORT)
+                 throw new InvalidRequestException(String.format("Too many markers(?). %d markers exceed the allowed maximum of %d", boundTerms, FBUtilities.MAX_UNSIGNED_SHORT));
 -            assert boundTerms == prepared.boundNames.size();
+ 
 -            return QueryProcessor.storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared, false);
++            return QueryProcessor.storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared);
+         }
 -
+     }
+ 
+     protected static class ForceHostLoadBalancingPolicy implements LoadBalancingPolicy {
+ 
+         protected final List<Host> hosts = new CopyOnWriteArrayList<Host>();
+         protected int currentPrimary = 0;
+ 
+         public void setPrimary(int idx) {
+             this.currentPrimary = idx - 1; // arrays are 0-based
+         }
+ 
+         @Override
+         public void init(Cluster cluster, Collection<Host> hosts) {
+             this.hosts.addAll(hosts);
+             this.hosts.sort(Comparator.comparingInt(h -> h.getAddress().getAddress()[3]));
+         }
+ 
+         @Override
+         public HostDistance distance(Host host) {
+             return HostDistance.LOCAL;
+         }
+ 
+         @Override
+         public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement statement) {
+             if (hosts.isEmpty()) return Collections.emptyIterator();
+             return Iterators.singletonIterator(hosts.get(currentPrimary));
+         }
+ 
+         @Override
+         public void onAdd(Host host) {
+             onUp(host);
+         }
+ 
+         @Override
+         public void onUp(Host host) {
+             hosts.add(host);
+         }
+ 
+         @Override
+         public void onDown(Host host) {
+             // no-op
+         }
+ 
+         @Override
+         public void onRemove(Host host) {
+             // no-op
+         }
+ 
+         @Override
+         public void close() {
+             // no-op
+         }
+     }
+ }
diff --cc test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
index ef705bd,e01b812..d885071
--- a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
@@@ -218,24 -117,18 +218,24 @@@ public class PreparedStatementsTest ext
          session.execute(preparedInsert.bind(1, 1, "value"));
          assertEquals(1, session.execute(preparedSelect.bind(1)).all().size());
  
 -        cluster.close();
 -
 -        cluster = Cluster.builder().addContactPoint("127.0.0.1")
 -                                   .withPort(DatabaseDescriptor.getNativeTransportPort())
 -                                   .build();
 -        session = cluster.connect();
 -
 -        preparedInsert = session.prepare(insertCQL);
 -        preparedSelect = session.prepare(selectCQL);
 -        session.execute(preparedInsert.bind(1, 1, "value"));
 +        try (Cluster newCluster = Cluster.builder()
 +                                 .addContactPoints(nativeAddr)
 +                                 .withClusterName("Test Cluster")
 +                                 .withPort(nativePort)
 +                                 .withoutJMXReporting()
 +                                 .allowBetaProtocolVersion()
 +                                 .build())
 +        {
 +            try (Session newSession = newCluster.connect())
 +            {
 +                newSession.execute("USE " + keyspace());
 +                preparedInsert = newSession.prepare(insertCQL);
 +                preparedSelect = newSession.prepare(selectCQL);
-                 session.execute(preparedInsert.bind(1, 1, "value"));
++                newSession.execute(preparedInsert.bind(1, 1, "value"));
  
-                 assertEquals(1, session.execute(preparedSelect.bind(1)).all().size());
 -        assertEquals(1, session.execute(preparedSelect.bind(1)).all().size());
++                assertEquals(1, newSession.execute(preparedSelect.bind(1)).all().size());
 +            }
 +        }
      }
  
      @Test
diff --cc test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
index eca6c20,753d6ff..c95728b
--- a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
@@@ -185,6 -187,6 +186,7 @@@ public class PstmtPersistenceTest exten
  
      private MD5Digest prepareStatement(String stmt, String keyspace, String table, ClientState clientState)
      {
 -        return QueryProcessor.prepare(String.format(stmt, keyspace + "." + table), clientState, false).statementId;
++        System.out.println(stmt + String.format(stmt, keyspace + "." + table));
 +        return QueryProcessor.prepare(String.format(stmt, keyspace + "." + table), clientState).statementId;
      }
  }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org