You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2021/10/03 21:43:43 UTC

[cassandra] 01/02: Merge branch 'cassandra-3.11' into cassandra-4.0

This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 3660a58ab458b90fdf9de5cda83099b87aad4c86
Merge: b22749b 32a15f0
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Sun Oct 3 23:23:31 2021 +0200

    Merge branch 'cassandra-3.11' into cassandra-4.0

 .../org/apache/cassandra/cql3/CQLStatement.java    |   5 +
 .../org/apache/cassandra/cql3/QueryProcessor.java  |  50 +++-
 .../cql3/statements/ModificationStatement.java     |   3 +-
 .../cassandra/cql3/statements/SelectStatement.java |   3 +-
 .../cassandra/cql3/statements/UseStatement.java    |   5 +
 .../statements/schema/AlterSchemaStatement.java    |   8 +-
 src/java/org/apache/cassandra/gms/Gossiper.java    | 105 +++++++-
 .../gms/IEndpointStateChangeSubscriber.java        |  14 +-
 .../net/StartupClusterConnectivityChecker.java     |  24 --
 .../org/apache/cassandra/repair/RepairSession.java |   6 -
 .../apache/cassandra/service/LoadBroadcaster.java  |   8 -
 .../apache/cassandra/streaming/StreamSession.java  |   6 -
 .../cassandra/utils/RecomputingSupplier.java       | 122 +++++++++
 .../distributed/test/GossipShutdownTest.java       |   7 -
 .../distributed/test/NativeProtocolTest.java       |  14 +-
 .../test/ReprepareNewBehaviourTest.java            |  38 +++
 .../distributed/test/ReprepareTestBase.java        | 281 +++++++++++++++++++++
 .../test/ReprepareTestOldBehaviour.java            | 129 ++++++++++
 .../cassandra/cql3/PreparedStatementsTest.java     |   4 +-
 .../cassandra/cql3/PstmtPersistenceTest.java       |  36 +--
 .../apache/cassandra/metrics/CQLMetricsTest.java   |   5 +-
 .../cassandra/utils/RecomputingSupplierTest.java   | 150 +++++++++++
 22 files changed, 933 insertions(+), 90 deletions(-)

diff --cc src/java/org/apache/cassandra/cql3/CQLStatement.java
index c34e27f,901ecd4..e78f733
--- a/src/java/org/apache/cassandra/cql3/CQLStatement.java
+++ b/src/java/org/apache/cassandra/cql3/CQLStatement.java
@@@ -84,30 -59,12 +84,35 @@@ public interface CQLStatemen
       *
       * @param state the current query state
       */
 -    public ResultMessage executeInternal(QueryState state, QueryOptions options) throws RequestValidationException, RequestExecutionException;
 +    public ResultMessage executeLocally(QueryState state, QueryOptions options);
  
      /**
 -     * Return an Iterable over all of the functions (both native and user-defined) used by any component
 -     * of the statement
 -     * @return functions all functions found (may contain duplicates)
 +     * Provides the context needed for audit logging statements.
 +     */
 +    AuditLogContext getAuditLogContext();
 +
 +    /**
 +     * Whether or not this CQL Statement has LWT conditions
       */
 -    public Iterable<Function> getFunctions();
 +    default public boolean hasConditions()
 +    {
 +        return false;
 +    }
 +
 +    public static abstract class Raw
 +    {
 +        protected VariableSpecifications bindVariables;
 +
 +        public void setBindVariables(List<ColumnIdentifier> variables)
 +        {
 +            bindVariables = new VariableSpecifications(variables);
 +        }
 +
 +        public abstract CQLStatement prepare(ClientState state);
 +    }
++
++    public static interface SingleKeyspaceCqlStatement extends CQLStatement
++    {
++        public String keyspace();
++    }
  }
diff --cc src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 87829ab,589a25a..3c04c2d
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@@ -61,8 -61,19 +62,18 @@@ import static org.apache.cassandra.cql3
  
  public class QueryProcessor implements QueryHandler
  {
 -    public static final CassandraVersion CQL_VERSION = new CassandraVersion("3.4.4");
 +    public static final CassandraVersion CQL_VERSION = new CassandraVersion("3.4.5");
  
+     /**
+      * If a query is prepared with a fully qualified name, but the user also uses USE (specifically when USE keyspace
+      * is different) then the IDs generated could change over time; invalidating the assumption that IDs won't ever
+      * change.  In the version defined below, the USE keyspace is ignored when a fully-qualified name is used as an
+      * attempt to make IDs stable.
+      */
+     private static final CassandraVersion PREPARE_ID_BEHAVIOR_CHANGE_30 = new CassandraVersion("3.0.26");
+     private static final CassandraVersion PREPARE_ID_BEHAVIOR_CHANGE_3X = new CassandraVersion("3.11.12");
+     private static final CassandraVersion PREPARE_ID_BEHAVIOR_CHANGE_40 = new CassandraVersion("4.0.1");
+ 
 -
      public static final QueryProcessor instance = new QueryProcessor();
  
      private static final Logger logger = LoggerFactory.getLogger(QueryProcessor.class);
@@@ -436,14 -430,36 +447,37 @@@
          if (existing != null)
              return existing;
  
 -        ParsedStatement.Prepared prepared = getStatement(queryString, clientState);
 -        prepared.rawCQLStatement = queryString;
 -        int boundTerms = prepared.statement.getBoundTerms();
 +        CQLStatement statement = getStatement(queryString, clientState);
 +        Prepared prepared = new Prepared(statement, queryString);
 +
 +        int boundTerms = statement.getBindVariables().size();
          if (boundTerms > FBUtilities.MAX_UNSIGNED_SHORT)
              throw new InvalidRequestException(String.format("Too many markers(?). %d markers exceed the allowed maximum of %d", boundTerms, FBUtilities.MAX_UNSIGNED_SHORT));
 -        assert boundTerms == prepared.boundNames.size();
  
-         return storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared);
 -        if (prepared.keyspace != null)
++        if (statement instanceof CQLStatement.SingleKeyspaceCqlStatement)
+         {
+             // Edge-case of CASSANDRA-15252 in mixed-mode cluster. We accept that 15252 itself can manifest in a
+             // cluster that has both old and new nodes, but we would like to avoid a situation when the fix adds
+             // a new behaviour that can break which, in addition, can get triggered more frequently.
+             // If statement ID was generated on the old node _with_ use, when attempting to execute on the new node,
+             // we may fall into infinite loop. To break out of this loop, we put a prepared statement that client
+             // expects into cache, so that it could get PREPARED response on the second try.
 -            ResultMessage.Prepared newBehavior = storePreparedStatement(queryString, null, prepared, forThrift);
 -            ResultMessage.Prepared oldBehavior = clientState.getRawKeyspace() != null ? storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared, forThrift) : newBehavior;
++            ResultMessage.Prepared newBehavior = storePreparedStatement(queryString, null, prepared);
++            ResultMessage.Prepared oldBehavior = clientState.getRawKeyspace() != null ? storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared) : newBehavior;
+             CassandraVersion minVersion = Gossiper.instance.getMinVersion(20, TimeUnit.MILLISECONDS);
+ 
+             // Default to old behaviour in case we're not sure about the version. Even if we ever flip back to the old
+             // behaviour due to the gossip bug or incorrect version string, we'll end up with two re-prepare round-trips.
++
+             return minVersion != null &&
+                    ((minVersion.major == 3 && minVersion.minor == 0 && minVersion.compareTo(PREPARE_ID_BEHAVIOR_CHANGE_30) >= 0) ||
+                     (minVersion.major == 3 && minVersion.minor != 0 && minVersion.compareTo(PREPARE_ID_BEHAVIOR_CHANGE_3X) >= 0) ||
+                     (minVersion.major == 4 && minVersion.compareTo(PREPARE_ID_BEHAVIOR_CHANGE_40) >= 0)) ? newBehavior : oldBehavior;
+         }
+         else
+         {
 -            return storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared, forThrift);
++            return storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared);
+         }
      }
  
      private static MD5Digest computeId(String queryString, String keyspace)
@@@ -452,23 -468,42 +486,25 @@@
          return MD5Digest.compute(toHash);
      }
  
-     private static ResultMessage.Prepared getStoredPreparedStatement(String queryString, String keyspace)
 -    private static Integer computeThriftId(String queryString, String keyspace)
 -    {
 -        String toHash = keyspace == null ? queryString : keyspace + queryString;
 -        return toHash.hashCode();
 -    }
 -
+     @VisibleForTesting
 -    public static ResultMessage.Prepared getStoredPreparedStatement(String queryString, String clientKeyspace, boolean forThrift)
++    public static ResultMessage.Prepared getStoredPreparedStatement(String queryString, String clientKeyspace)
      throws InvalidRequestException
      {
-         MD5Digest statementId = computeId(queryString, keyspace);
 -        if (forThrift)
 -        {
 -            Integer thriftStatementId = computeThriftId(queryString, clientKeyspace);
 -            ParsedStatement.Prepared existing = thriftPreparedStatements.get(thriftStatementId);
 -            if (existing == null)
 -                return null;
++        MD5Digest statementId = computeId(queryString, clientKeyspace);
 +        Prepared existing = preparedStatements.getIfPresent(statementId);
 +        if (existing == null)
 +            return null;
  
 -            checkTrue(queryString.equals(existing.rawCQLStatement),
 -                      String.format("MD5 hash collision: query with the same MD5 hash was already prepared. \n Existing: '%s'", existing.rawCQLStatement));
 -            return ResultMessage.Prepared.forThrift(thriftStatementId, existing.boundNames);
 -        }
 -        else
 -        {
 -            MD5Digest statementId = computeId(queryString, clientKeyspace);
 -            ParsedStatement.Prepared existing = preparedStatements.get(statementId);
 -            if (existing == null)
 -                return null;
 +        checkTrue(queryString.equals(existing.rawCQLStatement),
 +                String.format("MD5 hash collision: query with the same MD5 hash was already prepared. \n Existing: '%s'", existing.rawCQLStatement));
  
 -            checkTrue(queryString.equals(existing.rawCQLStatement),
 -                      String.format("MD5 hash collision: query with the same MD5 hash was already prepared. \n Existing: '%s'", existing.rawCQLStatement));
 -            return new ResultMessage.Prepared(statementId, existing);
 -        }
 +        ResultSet.PreparedMetadata preparedMetadata = ResultSet.PreparedMetadata.fromPrepared(existing.statement);
 +        ResultSet.ResultMetadata resultMetadata = ResultSet.ResultMetadata.fromPrepared(existing.statement);
 +        return new ResultMessage.Prepared(statementId, resultMetadata.getResultMetadataId(), preparedMetadata, resultMetadata);
      }
  
-     private static ResultMessage.Prepared storePreparedStatement(String queryString, String keyspace, Prepared prepared)
+     @VisibleForTesting
 -    public static ResultMessage.Prepared storePreparedStatement(String queryString, String keyspace, ParsedStatement.Prepared prepared, boolean forThrift)
++    public static ResultMessage.Prepared storePreparedStatement(String queryString, String keyspace, Prepared prepared)
      throws InvalidRequestException
      {
          // Concatenate the current keyspace so we don't mix prepared statements between keyspace (#5352).
@@@ -608,7 -656,14 +644,13 @@@
          internalStatements.clear();
      }
  
+     @VisibleForTesting
+     public static void clearPreparedStatementsCache()
+     {
 -        preparedStatements.clear();
 -        thriftPreparedStatements.clear();
++        preparedStatements.asMap().clear();
+     }
+ 
 -    private static class MigrationSubscriber extends MigrationListener
 +    private static class StatementInvalidatingListener extends SchemaChangeListener
      {
          private static void removeInvalidPreparedStatements(String ksName, String cfName)
          {
diff --cc src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 087f3b0,8376a0a..a072da5
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@@ -66,7 -59,7 +66,7 @@@ import static org.apache.cassandra.cql3
  /*
   * Abstract parent class of individual modifications, i.e. INSERT, UPDATE and DELETE.
   */
--public abstract class ModificationStatement implements CQLStatement
++public abstract class ModificationStatement implements CQLStatement.SingleKeyspaceCqlStatement
  {
      protected static final Logger logger = LoggerFactory.getLogger(ModificationStatement.class);
  
@@@ -192,13 -163,18 +192,14 @@@
          return restrictions;
      }
  
 -    public abstract void addUpdateForKey(PartitionUpdate update, Clustering clustering, UpdateParameters params);
 +    public abstract void addUpdateForKey(PartitionUpdate.Builder updateBuilder, Clustering<?> clustering, UpdateParameters params);
  
 -    public abstract void addUpdateForKey(PartitionUpdate update, Slice slice, UpdateParameters params);
 -
 -    public int getBoundTerms()
 -    {
 -        return boundTerms;
 -    }
 +    public abstract void addUpdateForKey(PartitionUpdate.Builder updateBuilder, Slice slice, UpdateParameters params);
  
++    @Override
      public String keyspace()
      {
 -        return cfm.ksName;
 +        return metadata.keyspace;
      }
  
      public String columnFamily()
diff --cc src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 774bd68,632bf94..494d9af
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@@ -85,7 -114,7 +85,7 @@@ import static org.apache.cassandra.util
   * QueryHandler implementations, so before reducing their accessibility
   * due consideration should be given.
   */
--public class SelectStatement implements CQLStatement
++public class SelectStatement implements CQLStatement.SingleKeyspaceCqlStatement
  {
      private static final Logger logger = LoggerFactory.getLogger(SelectStatement.class);
  
@@@ -474,14 -481,12 +474,15 @@@
  
      public ResultSet process(PartitionIterator partitions, int nowInSec) throws InvalidRequestException
      {
 -        return process(partitions, QueryOptions.DEFAULT, nowInSec, getLimit(QueryOptions.DEFAULT));
 +        QueryOptions options = QueryOptions.DEFAULT;
 +        Selectors selectors = selection.newSelectors(options);
 +        return process(partitions, options, selectors, nowInSec, getLimit(options));
      }
  
++    @Override
      public String keyspace()
      {
 -        return cfm.ksName;
 +        return table.keyspace;
      }
  
      public String columnFamily()
diff --cc src/java/org/apache/cassandra/cql3/statements/UseStatement.java
index 3013d9f,0242f09..0504f43
--- a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
@@@ -64,16 -65,4 +64,21 @@@ public class UseStatement extends CQLSt
          // but for some unit tests we need to set the keyspace (e.g. for tests with DROP INDEX)
          return execute(state, options, System.nanoTime());
      }
 +    
 +    @Override
 +    public String toString()
 +    {
 +        return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
 +    }
 +
 +    @Override
 +    public AuditLogContext getAuditLogContext()
 +    {
 +        return new AuditLogContext(AuditLogEntryType.USE_KEYSPACE, keyspace);
 +    }
++
++    public String keyspace()
++    {
++        return keyspace;
++    }
  }
diff --cc src/java/org/apache/cassandra/cql3/statements/schema/AlterSchemaStatement.java
index 161c9c4,0000000..124d04c
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/statements/schema/AlterSchemaStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/schema/AlterSchemaStatement.java
@@@ -1,153 -1,0 +1,159 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.statements.schema;
 +
 +import java.util.Set;
 +
 +import com.google.common.collect.ImmutableSet;
 +
 +import org.apache.cassandra.auth.AuthenticatedUser;
 +import org.apache.cassandra.auth.IResource;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.cql3.CQLStatement;
 +import org.apache.cassandra.cql3.QueryOptions;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.schema.*;
 +import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 +import org.apache.cassandra.service.ClientState;
 +import org.apache.cassandra.service.ClientWarn;
 +import org.apache.cassandra.service.QueryState;
 +import org.apache.cassandra.transport.Event.SchemaChange;
 +import org.apache.cassandra.transport.messages.ResultMessage;
 +
- abstract class AlterSchemaStatement implements CQLStatement, SchemaTransformation
++abstract public class AlterSchemaStatement implements CQLStatement.SingleKeyspaceCqlStatement, SchemaTransformation
 +{
 +    protected final String keyspaceName; // name of the keyspace affected by the statement
 +
 +    protected AlterSchemaStatement(String keyspaceName)
 +    {
 +        this.keyspaceName = keyspaceName;
 +    }
 +
 +    public final void validate(ClientState state)
 +    {
 +        // no-op; validation is performed while executing the statement, in apply()
 +    }
 +
 +    public ResultMessage execute(QueryState state, QueryOptions options, long queryStartNanoTime)
 +    {
 +        return execute(state, false);
 +    }
 +
++    @Override
++    public String keyspace()
++    {
++        return keyspaceName;
++    }
++
 +    public ResultMessage executeLocally(QueryState state, QueryOptions options)
 +    {
 +        return execute(state, true);
 +    }
 +
 +    /**
 +     * TODO: document
 +     */
 +    abstract SchemaChange schemaChangeEvent(KeyspacesDiff diff);
 +
 +    /**
 +     * Schema alteration may result in a new database object (keyspace, table, role, function) being created capable of
 +     * having permissions GRANTed on it. The creator of the object (the primary role assigned to the AuthenticatedUser
 +     * performing the operation) is automatically granted ALL applicable permissions on the object. This is a hook for
 +     * subclasses to override in order indicate which resources to to perform that grant on when the statement is executed.
 +     *
 +     * Only called if the transformation resulted in a non-empty diff.
 +     */
 +    Set<IResource> createdResources(KeyspacesDiff diff)
 +    {
 +        return ImmutableSet.of();
 +    }
 +
 +    /**
 +     * Schema alteration might produce a client warning (e.g. a warning to run full repair when increading RF of a keyspace).
 +     * This method should be used to generate them instead of calling warn() in transformation code.
 +     *
 +     * Only called if the transformation resulted in a non-empty diff.
 +     */
 +    Set<String> clientWarnings(KeyspacesDiff diff)
 +    {
 +        return ImmutableSet.of();
 +    }
 +
 +    public ResultMessage execute(QueryState state, boolean locally)
 +    {
 +        if (SchemaConstants.isLocalSystemKeyspace(keyspaceName))
 +            throw ire("System keyspace '%s' is not user-modifiable", keyspaceName);
 +
 +        KeyspaceMetadata keyspace = Schema.instance.getKeyspaceMetadata(keyspaceName);
 +        if (null != keyspace && keyspace.isVirtual())
 +            throw ire("Virtual keyspace '%s' is not user-modifiable", keyspaceName);
 +
 +        validateKeyspaceName();
 +
 +        KeyspacesDiff diff = MigrationManager.announce(this, locally);
 +
 +        clientWarnings(diff).forEach(ClientWarn.instance::warn);
 +
 +        if (diff.isEmpty())
 +            return new ResultMessage.Void();
 +
 +        /*
 +         * When a schema alteration results in a new db object being created, we grant permissions on the new
 +         * object to the user performing the request if:
 +         * - the user is not anonymous
 +         * - the configured IAuthorizer supports granting of permissions (not all do, AllowAllAuthorizer doesn't and
 +         *   custom external implementations may not)
 +         */
 +        AuthenticatedUser user = state.getClientState().getUser();
 +        if (null != user && !user.isAnonymous())
 +            createdResources(diff).forEach(r -> grantPermissionsOnResource(r, user));
 +
 +        return new ResultMessage.SchemaChange(schemaChangeEvent(diff));
 +    }
 +
 +    private void validateKeyspaceName()
 +    {
 +        if (!SchemaConstants.isValidName(keyspaceName))
 +        {
 +            throw ire("Keyspace name must not be empty, more than %d characters long, " +
 +                      "or contain non-alphanumeric-underscore characters (got '%s')",
 +                      SchemaConstants.NAME_LENGTH, keyspaceName);
 +        }
 +    }
 +
 +    private void grantPermissionsOnResource(IResource resource, AuthenticatedUser user)
 +    {
 +        try
 +        {
 +            DatabaseDescriptor.getAuthorizer()
 +                              .grant(AuthenticatedUser.SYSTEM_USER,
 +                                     resource.applicablePermissions(),
 +                                     resource,
 +                                     user.getPrimaryRole());
 +        }
 +        catch (UnsupportedOperationException e)
 +        {
 +            // not a problem - grant is an optional method on IAuthorizer
 +        }
 +    }
 +
 +    static InvalidRequestException ire(String format, Object... args)
 +    {
 +        return new InvalidRequestException(String.format(format, args));
 +    }
 +}
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index 6eb674f,363db5f..c147434
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -33,7 -34,6 +33,8 @@@ import com.google.common.base.Throwable
  import com.google.common.collect.ImmutableList;
  import com.google.common.collect.ImmutableMap;
  import com.google.common.collect.Iterables;
 +import com.google.common.collect.ImmutableSet;
++import com.google.common.collect.Iterables;
  import com.google.common.collect.Sets;
  import com.google.common.util.concurrent.ListenableFutureTask;
  import com.google.common.util.concurrent.Uninterruptibles;
@@@ -51,23 -46,26 +52,30 @@@ import org.apache.cassandra.utils.Pair
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
++import io.netty.util.concurrent.FastThreadLocal;
  import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
  import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
  import org.apache.cassandra.concurrent.Stage;
 -import org.apache.cassandra.concurrent.StageManager;
  import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.db.SystemKeyspace;
  import org.apache.cassandra.dht.Token;
 -import org.apache.cassandra.net.IAsyncCallback;
 -import org.apache.cassandra.net.MessageIn;
 -import org.apache.cassandra.net.MessageOut;
 +import org.apache.cassandra.net.RequestCallback;
 +import org.apache.cassandra.net.Message;
  import org.apache.cassandra.net.MessagingService;
  import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.utils.CassandraVersion;
++import org.apache.cassandra.utils.ExecutorUtils;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.JVMStabilityInspector;
++import org.apache.cassandra.utils.MBeanWrapper;
++import org.apache.cassandra.utils.NoSpamLogger;
++import org.apache.cassandra.utils.Pair;
+ import org.apache.cassandra.utils.RecomputingSupplier;
  
 -import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination;
 -import static org.apache.cassandra.utils.ExecutorUtils.shutdown;
 +import static org.apache.cassandra.config.CassandraRelevantProperties.GOSSIPER_QUARANTINE_DELAY;
 +import static org.apache.cassandra.net.NoPayload.noPayload;
 +import static org.apache.cassandra.net.Verb.ECHO_REQ;
 +import static org.apache.cassandra.net.Verb.GOSSIP_DIGEST_SYN;
  
  /**
   * This module is responsible for Gossiping information for the local endpoint. This abstraction
@@@ -333,7 -259,9 +341,10 @@@ public class Gossiper implements IFailu
          }
      }
  
-     Gossiper(boolean registerJmx)
+     private final RecomputingSupplier<CassandraVersion> minVersionSupplier = new RecomputingSupplier<>(this::computeMinVersion, executor);
+ 
 -    private Gossiper()
++    @VisibleForTesting
++    public Gossiper(boolean registerJmx)
      {
          // half of QUARATINE_DELAY, to ensure justRemovedEndpoints has enough leeway to prevent re-gossip
          fatClientTimeout = (QUARANTINE_DELAY / 2);
@@@ -341,10 -269,33 +352,35 @@@
          FailureDetector.instance.registerFailureDetectionEventListener(this);
  
          // Register this instance with JMX
 -        MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
 -
 +        if (registerJmx)
 +        {
 +            MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
 +        }
+ 
+         subscribers.add(new IEndpointStateChangeSubscriber()
+         {
 -            public void onJoin(InetAddress endpoint, EndpointState state)
++            public void onJoin(InetAddressAndPort endpoint, EndpointState state)
+ 	    {
+                 maybeRecompute(state);
+             }
+ 
 -            public void onAlive(InetAddress endpoint, EndpointState state)
++            public void onAlive(InetAddressAndPort endpoint, EndpointState state)
+ 	    {
+                 maybeRecompute(state);
+             }
+ 
+             private void maybeRecompute(EndpointState state)
+ 	    {
+                 if (state.getApplicationState(ApplicationState.RELEASE_VERSION) != null)
+                     minVersionSupplier.recompute();
+             }
+ 
 -            public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
++            public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value)
+             {
+                 if (state == ApplicationState.RELEASE_VERSION)
+                     minVersionSupplier.recompute();
+             }
+         });
      }
  
      public void setLastProcessedMessageAt(long timeInMillis)
@@@ -1712,8 -1506,9 +1748,9 @@@
          buildSeedsList();
          /* initialize the heartbeat state for this localEndpoint */
          maybeInitializeLocalState(generationNbr);
 -        EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddress());
 +        EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort());
          localState.addApplicationStates(preloadLocalStates);
+         minVersionSupplier.recompute();
  
          //notify snitches that Gossiper is about to start
          DatabaseDescriptor.getEndpointSnitch().gossiperStarting();
@@@ -2285,4 -1906,76 +2322,70 @@@
          stop();
          ExecutorUtils.shutdownAndWait(timeout, unit, executor);
      }
+ 
+     @Nullable
+     public CassandraVersion getMinVersion(int delay, TimeUnit timeUnit)
+     {
+         try
+         {
+             return minVersionSupplier.get(delay, timeUnit);
+         }
+         catch (TimeoutException e)
+         {
+             // Timeouts here are harmless: they won't cause reprepares and may only
+             // cause the old version of the hash to be kept for longer
+             return null;
+         }
+         catch (Throwable e)
+         {
+             logger.error("Caught an exception while waiting for min version", e);
+             return null;
+         }
+     }
+ 
+     @Nullable
 -    private String getReleaseVersionString(InetAddress ep)
++    private String getReleaseVersionString(InetAddressAndPort ep)
+     {
+         EndpointState state = getEndpointStateForEndpoint(ep);
+         if (state == null)
+             return null;
+ 
+         VersionedValue value = state.getApplicationState(ApplicationState.RELEASE_VERSION);
+         return value == null ? null : value.value;
+     }
+ 
+     private CassandraVersion computeMinVersion()
+     {
+         CassandraVersion minVersion = null;
+ 
 -        for (InetAddress addr : Iterables.concat(Gossiper.instance.getLiveMembers(),
++        for (InetAddressAndPort addr : Iterables.concat(Gossiper.instance.getLiveMembers(),
+                                                  Gossiper.instance.getUnreachableMembers()))
+         {
+             String versionString = getReleaseVersionString(addr);
+             // Raced with changes to gossip state, wait until next iteration
+             if (versionString == null)
+                 return null;
+ 
+             CassandraVersion version;
+ 
+             try
+             {
+                 version = new CassandraVersion(versionString);
+             }
+             catch (Throwable t)
+             {
+                 JVMStabilityInspector.inspectThrowable(t);
+                 String message = String.format("Can't parse version string %s", versionString);
+                 logger.warn(message);
+                 if (logger.isDebugEnabled())
+                     logger.debug(message, t);
+                 return null;
+             }
+ 
+             if (minVersion == null || version.compareTo(minVersion) < 0)
+                 minVersion = version;
+         }
+ 
+         return minVersion;
+     }
 -
 -    @VisibleForTesting
 -    public void setAnyNodeOn30(boolean anyNodeOn30)
 -    {
 -        this.anyNodeOn30 = anyNodeOn30;
 -    }
  }
diff --cc src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
index dc81650,861d4ac..df04697
--- a/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
+++ b/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
@@@ -36,17 -36,17 +36,17 @@@ public interface IEndpointStateChangeSu
       * @param endpoint endpoint for which the state change occurred.
       * @param epState  state that actually changed for the above endpoint.
       */
-     public void onJoin(InetAddressAndPort endpoint, EndpointState epState);
 -    default void onJoin(InetAddress endpoint, EndpointState epState) {}
++    default void onJoin(InetAddressAndPort endpoint, EndpointState epState) {}
      
-     public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue);
 -    default void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
++    default void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
  
-     public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value);
 -    default void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
++    default void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) {}
  
-     public void onAlive(InetAddressAndPort endpoint, EndpointState state);
 -    default void onAlive(InetAddress endpoint, EndpointState state) {}
++    default void onAlive(InetAddressAndPort endpoint, EndpointState state) {}
  
-     public void onDead(InetAddressAndPort endpoint, EndpointState state);
 -    default void onDead(InetAddress endpoint, EndpointState state) {}
++    default void onDead(InetAddressAndPort endpoint, EndpointState state) {}
  
-     public void onRemove(InetAddressAndPort endpoint);
 -    default void onRemove(InetAddress endpoint) {}
++    default void onRemove(InetAddressAndPort endpoint) {}
  
      /**
       * Called whenever a node is restarted.
@@@ -54,5 -54,5 +54,5 @@@
       * previously marked down. It will have only if {@code state.isAlive() == false}
       * as {@code state} is from before the restarted node is marked up.
       */
-     public void onRestart(InetAddressAndPort endpoint, EndpointState state);
 -    default void onRestart(InetAddress endpoint, EndpointState state) {}
++    default void onRestart(InetAddressAndPort endpoint, EndpointState state) {}
  }
diff --cc src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
index b901338,0000000..8bc1e5d
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
+++ b/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
@@@ -1,275 -1,0 +1,251 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.net;
 +
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.function.Function;
 +import java.util.stream.Collectors;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.collect.HashMultimap;
 +import com.google.common.collect.SetMultimap;
 +import com.google.common.util.concurrent.Uninterruptibles;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.gms.ApplicationState;
 +import org.apache.cassandra.gms.EndpointState;
 +import org.apache.cassandra.gms.Gossiper;
 +import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
 +import org.apache.cassandra.gms.VersionedValue;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +import static org.apache.cassandra.net.Verb.PING_REQ;
 +import static org.apache.cassandra.net.ConnectionType.LARGE_MESSAGES;
 +import static org.apache.cassandra.net.ConnectionType.SMALL_MESSAGES;
 +
 +public class StartupClusterConnectivityChecker
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(StartupClusterConnectivityChecker.class);
 +
 +    private final boolean blockForRemoteDcs;
 +    private final long timeoutNanos;
 +
 +    public static StartupClusterConnectivityChecker create(long timeoutSecs, boolean blockForRemoteDcs)
 +    {
 +        if (timeoutSecs > 100)
 +            logger.warn("setting the block-for-peers timeout (in seconds) to {} might be a bit excessive, but using it nonetheless", timeoutSecs);
 +        long timeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSecs);
 +
 +        return new StartupClusterConnectivityChecker(timeoutNanos, blockForRemoteDcs);
 +    }
 +
 +    @VisibleForTesting
 +    StartupClusterConnectivityChecker(long timeoutNanos, boolean blockForRemoteDcs)
 +    {
 +        this.blockForRemoteDcs = blockForRemoteDcs;
 +        this.timeoutNanos = timeoutNanos;
 +    }
 +
 +    /**
 +     * @param peers The currently known peers in the cluster; argument is not modified.
 +     * @param getDatacenterSource A function for mapping peers to their datacenter.
 +     * @return true if the requested percentage of peers are marked ALIVE in gossip and have their connections opened;
 +     * else false.
 +     */
 +    public boolean execute(Set<InetAddressAndPort> peers, Function<InetAddressAndPort, String> getDatacenterSource)
 +    {
 +        if (peers == null || this.timeoutNanos < 0)
 +            return true;
 +
 +        // make a copy of the set, to avoid mucking with the input (in case it's a sensitive collection)
 +        peers = new HashSet<>(peers);
 +        InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
 +        String localDc = getDatacenterSource.apply(localAddress);
 +
 +        peers.remove(localAddress);
 +        if (peers.isEmpty())
 +            return true;
 +
 +        // make a copy of the datacenter mapping (in case gossip updates happen during this method or some such)
 +        Map<InetAddressAndPort, String> peerToDatacenter = new HashMap<>();
 +        SetMultimap<String, InetAddressAndPort> datacenterToPeers = HashMultimap.create();
 +
 +        for (InetAddressAndPort peer : peers)
 +        {
 +            String datacenter = getDatacenterSource.apply(peer);
 +            peerToDatacenter.put(peer, datacenter);
 +            datacenterToPeers.put(datacenter, peer);
 +        }
 +
 +        // In the case where we do not want to block startup on remote datacenters (e.g. because clients only use
 +        // LOCAL_X consistency levels), we remove all other datacenter hosts from the mapping and we only wait
 +        // on the remaining local datacenter.
 +        if (!blockForRemoteDcs)
 +        {
 +            datacenterToPeers.keySet().retainAll(Collections.singleton(localDc));
 +            logger.info("Blocking coordination until only a single peer is DOWN in the local datacenter, timeout={}s",
 +                        TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
 +        }
 +        else
 +        {
 +            logger.info("Blocking coordination until only a single peer is DOWN in each datacenter, timeout={}s",
 +                        TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
 +        }
 +
 +        AckMap acks = new AckMap(3);
 +        Map<String, CountDownLatch> dcToRemainingPeers = new HashMap<>(datacenterToPeers.size());
 +        for (String datacenter: datacenterToPeers.keys())
 +        {
 +            dcToRemainingPeers.put(datacenter,
 +                                   new CountDownLatch(Math.max(datacenterToPeers.get(datacenter).size() - 1, 0)));
 +        }
 +
 +        long startNanos = System.nanoTime();
 +
 +        // set up a listener to react to new nodes becoming alive (in gossip), and account for all the nodes that are already alive
 +        Set<InetAddressAndPort> alivePeers = Collections.newSetFromMap(new ConcurrentHashMap<>());
 +        AliveListener listener = new AliveListener(alivePeers, dcToRemainingPeers, acks, peerToDatacenter::get);
 +        Gossiper.instance.register(listener);
 +
 +        // send out a ping message to open up the non-gossip connections to all peers. Note that this sends the
 +        // ping messages to _all_ peers, not just the ones we block for in dcToRemainingPeers.
 +        sendPingMessages(peers, dcToRemainingPeers, acks, peerToDatacenter::get);
 +
 +        for (InetAddressAndPort peer : peers)
 +        {
 +            if (Gossiper.instance.isAlive(peer) && alivePeers.add(peer) && acks.incrementAndCheck(peer))
 +            {
 +                String datacenter = peerToDatacenter.get(peer);
 +                // We have to check because we might only have the local DC in the map
 +                if (dcToRemainingPeers.containsKey(datacenter))
 +                    dcToRemainingPeers.get(datacenter).countDown();
 +            }
 +        }
 +
 +        boolean succeeded = true;
 +        for (CountDownLatch countDownLatch : dcToRemainingPeers.values())
 +        {
 +            long remainingNanos = Math.max(1, timeoutNanos - (System.nanoTime() - startNanos));
 +            //noinspection UnstableApiUsage
 +            succeeded &= Uninterruptibles.awaitUninterruptibly(countDownLatch, remainingNanos, TimeUnit.NANOSECONDS);
 +        }
 +
 +        Gossiper.instance.unregister(listener);
 +
 +        Map<String, Long> numDown = dcToRemainingPeers.entrySet().stream()
 +                                                      .collect(Collectors.toMap(Map.Entry::getKey,
 +                                                                                e -> e.getValue().getCount()));
 +
 +        if (succeeded)
 +        {
 +            logger.info("Ensured sufficient healthy connections with {} after {} milliseconds",
 +                        numDown.keySet(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
 +        }
 +        else
 +        {
 +            logger.warn("Timed out after {} milliseconds, was waiting for remaining peers to connect: {}",
 +                        TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos), numDown);
 +        }
 +
 +        return succeeded;
 +    }
 +
 +    /**
 +     * Sends a "connection warmup" message to each peer in the collection, on every {@link ConnectionType}
 +     * used for internode messaging (that is not gossip).
 +     */
 +    private void sendPingMessages(Set<InetAddressAndPort> peers, Map<String, CountDownLatch> dcToRemainingPeers,
 +                                  AckMap acks, Function<InetAddressAndPort, String> getDatacenter)
 +    {
 +        RequestCallback responseHandler = msg -> {
 +            if (acks.incrementAndCheck(msg.from()))
 +            {
 +                String datacenter = getDatacenter.apply(msg.from());
 +                // We have to check because we might only have the local DC in the map
 +                if (dcToRemainingPeers.containsKey(datacenter))
 +                    dcToRemainingPeers.get(datacenter).countDown();
 +            }
 +        };
 +
 +        Message<PingRequest> small = Message.out(PING_REQ, PingRequest.forSmall);
 +        Message<PingRequest> large = Message.out(PING_REQ, PingRequest.forLarge);
 +        for (InetAddressAndPort peer : peers)
 +        {
 +            MessagingService.instance().sendWithCallback(small, peer, responseHandler, SMALL_MESSAGES);
 +            MessagingService.instance().sendWithCallback(large, peer, responseHandler, LARGE_MESSAGES);
 +        }
 +    }
 +
 +    /**
 +     * A trivial implementation of {@link IEndpointStateChangeSubscriber} that really only cares about
 +     * {@link #onAlive(InetAddressAndPort, EndpointState)} invocations.
 +     */
 +    private static final class AliveListener implements IEndpointStateChangeSubscriber
 +    {
 +        private final Map<String, CountDownLatch> dcToRemainingPeers;
 +        private final Set<InetAddressAndPort> livePeers;
 +        private final Function<InetAddressAndPort, String> getDatacenter;
 +        private final AckMap acks;
 +
 +        AliveListener(Set<InetAddressAndPort> livePeers, Map<String, CountDownLatch> dcToRemainingPeers,
 +                      AckMap acks, Function<InetAddressAndPort, String> getDatacenter)
 +        {
 +            this.livePeers = livePeers;
 +            this.dcToRemainingPeers = dcToRemainingPeers;
 +            this.acks = acks;
 +            this.getDatacenter = getDatacenter;
 +        }
 +
-         public void onJoin(InetAddressAndPort endpoint, EndpointState epState)
-         {
-         }
- 
-         public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue)
-         {
-         }
- 
-         public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value)
-         {
-         }
- 
 +        public void onAlive(InetAddressAndPort endpoint, EndpointState state)
 +        {
 +            if (livePeers.add(endpoint) && acks.incrementAndCheck(endpoint))
 +            {
 +                String datacenter = getDatacenter.apply(endpoint);
 +                if (dcToRemainingPeers.containsKey(datacenter))
 +                    dcToRemainingPeers.get(datacenter).countDown();
 +            }
 +        }
- 
-         public void onDead(InetAddressAndPort endpoint, EndpointState state)
-         {
-         }
- 
-         public void onRemove(InetAddressAndPort endpoint)
-         {
-         }
- 
-         public void onRestart(InetAddressAndPort endpoint, EndpointState state)
-         {
-         }
 +    }
 +
 +    private static final class AckMap
 +    {
 +        private final int threshold;
 +        private final Map<InetAddressAndPort, AtomicInteger> acks;
 +
 +        AckMap(int threshold)
 +        {
 +            this.threshold = threshold;
 +            acks = new ConcurrentHashMap<>();
 +        }
 +
 +        boolean incrementAndCheck(InetAddressAndPort address)
 +        {
 +            return acks.computeIfAbsent(address, addr -> new AtomicInteger(0)).incrementAndGet() == threshold;
 +        }
 +    }
 +}
diff --cc src/java/org/apache/cassandra/repair/RepairSession.java
index e13c90c,1207d36..75ed1a3
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@@ -342,13 -333,7 +342,7 @@@ public class RepairSession extends Abst
          terminate();
      }
  
-     public void onJoin(InetAddressAndPort endpoint, EndpointState epState) {}
-     public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
-     public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) {}
-     public void onAlive(InetAddressAndPort endpoint, EndpointState state) {}
-     public void onDead(InetAddressAndPort endpoint, EndpointState state) {}
- 
 -    public void onRemove(InetAddress endpoint)
 +    public void onRemove(InetAddressAndPort endpoint)
      {
          convict(endpoint, Double.MAX_VALUE);
      }
diff --cc src/java/org/apache/cassandra/service/LoadBroadcaster.java
index 113aead,0a3e1a4..ebda3cd
--- a/src/java/org/apache/cassandra/service/LoadBroadcaster.java
+++ b/src/java/org/apache/cassandra/service/LoadBroadcaster.java
@@@ -60,16 -60,8 +60,8 @@@ public class LoadBroadcaster implement
              onChange(endpoint, ApplicationState.LOAD, localValue);
          }
      }
-     
-     public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
- 
-     public void onAlive(InetAddressAndPort endpoint, EndpointState state) {}
- 
-     public void onDead(InetAddressAndPort endpoint, EndpointState state) {}
- 
-     public void onRestart(InetAddressAndPort endpoint, EndpointState state) {}
  
 -    public void onRemove(InetAddress endpoint)
 +    public void onRemove(InetAddressAndPort endpoint)
      {
          loadInfo.remove(endpoint);
      }
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index 3a32834,675304f..0b4c0cd
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -910,15 -749,9 +910,9 @@@ public class StreamSession implements I
          maybeCompleted();
      }
  
-     public void onJoin(InetAddressAndPort endpoint, EndpointState epState) {}
-     public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
-     public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) {}
-     public void onAlive(InetAddressAndPort endpoint, EndpointState state) {}
-     public void onDead(InetAddressAndPort endpoint, EndpointState state) {}
- 
 -    public void onRemove(InetAddress endpoint)
 +    public void onRemove(InetAddressAndPort endpoint)
      {
 -        logger.error("[Stream #{}] Session failed because remote peer {} has left.", planId(), peer.getHostAddress());
 +        logger.error("[Stream #{}] Session failed because remote peer {} has left.", planId(), peer.toString());
          closeSession(State.FAILED);
      }
  
diff --cc test/distributed/org/apache/cassandra/distributed/test/NativeProtocolTest.java
index 0905b92,92751ef..60fe5b6
--- a/test/distributed/org/apache/cassandra/distributed/test/NativeProtocolTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/NativeProtocolTest.java
@@@ -27,7 -26,8 +26,16 @@@ import com.datastax.driver.core.ResultS
  import com.datastax.driver.core.Session;
  import com.datastax.driver.core.SimpleStatement;
  import com.datastax.driver.core.Statement;
++import com.datastax.driver.core.exceptions.DriverInternalError;
++import com.datastax.driver.core.policies.LoadBalancingPolicy;
++import net.bytebuddy.ByteBuddy;
++import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
++import net.bytebuddy.implementation.MethodDelegation;
++import org.apache.cassandra.cql3.CQLStatement;
++import org.apache.cassandra.cql3.QueryHandler;
++import org.apache.cassandra.cql3.QueryProcessor;
  import org.apache.cassandra.distributed.api.ICluster;
+ import org.apache.cassandra.distributed.impl.RowUtil;
  
  import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
  import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
diff --cc test/distributed/org/apache/cassandra/distributed/test/ReprepareTestBase.java
index 0000000,c270144..02af6d5
mode 000000,100644..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ReprepareTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ReprepareTestBase.java
@@@ -1,0 -1,281 +1,281 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.distributed.test;
+ 
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.Comparator;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.concurrent.CopyOnWriteArrayList;
++import java.util.concurrent.TimeUnit;
+ import java.util.function.BiConsumer;
+ 
+ import com.google.common.collect.Iterators;
+ import org.junit.Assert;
+ 
+ import com.datastax.driver.core.Cluster;
+ import com.datastax.driver.core.Host;
+ import com.datastax.driver.core.HostDistance;
+ import com.datastax.driver.core.PreparedStatement;
+ import com.datastax.driver.core.Session;
+ import com.datastax.driver.core.Statement;
+ import com.datastax.driver.core.exceptions.DriverInternalError;
+ import com.datastax.driver.core.policies.LoadBalancingPolicy;
+ import net.bytebuddy.ByteBuddy;
+ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+ import net.bytebuddy.implementation.FixedValue;
+ import net.bytebuddy.implementation.MethodDelegation;
++import org.apache.cassandra.cql3.CQLStatement;
++import org.apache.cassandra.cql3.QueryHandler;
+ import org.apache.cassandra.cql3.QueryProcessor;
 -import org.apache.cassandra.cql3.statements.ParsedStatement;
+ import org.apache.cassandra.distributed.api.ICluster;
+ import org.apache.cassandra.distributed.api.IInvokableInstance;
+ import org.apache.cassandra.exceptions.InvalidRequestException;
+ import org.apache.cassandra.service.ClientState;
 -import org.apache.cassandra.service.QueryState;
+ import org.apache.cassandra.transport.messages.ResultMessage;
+ import org.apache.cassandra.utils.FBUtilities;
+ 
+ import static net.bytebuddy.matcher.ElementMatchers.named;
+ import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+ import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+ import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+ import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+ import static org.apache.cassandra.distributed.shared.AssertUtils.fail;
+ 
+ public class ReprepareTestBase extends TestBaseImpl
+ {
+     protected static ReprepareTestConfiguration cfg(boolean withUse, boolean skipBrokenBehaviours)
+     {
+         return new ReprepareTestConfiguration(withUse, skipBrokenBehaviours);
+     }
+ 
+     public void testReprepare(BiConsumer<ClassLoader, Integer> instanceInitializer, ReprepareTestConfiguration... configs) throws Throwable
+     {
+         try (ICluster<IInvokableInstance> c = init(builder().withNodes(2)
+                                                             .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))
+                                                             .withInstanceInitializer(instanceInitializer)
+                                                             .start()))
+         {
+             ForceHostLoadBalancingPolicy lbp = new ForceHostLoadBalancingPolicy();
+             c.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck));"));
+ 
+             for (ReprepareTestConfiguration config : configs)
+             {
+                 // 1 has old behaviour
+                 for (int firstContact : new int[]{ 1, 2 })
+                 {
+                     try (com.datastax.driver.core.Cluster cluster = com.datastax.driver.core.Cluster.builder()
+                                                                                                     .addContactPoint("127.0.0.1")
+                                                                                                     .addContactPoint("127.0.0.2")
+                                                                                                     .withLoadBalancingPolicy(lbp)
+                                                                                                     .build();
+                          Session session = cluster.connect())
+                     {
+                         lbp.setPrimary(firstContact);
+                         final PreparedStatement select = session.prepare(withKeyspace("SELECT * FROM %s.tbl"));
+                         session.execute(select.bind());
+ 
+                         c.stream().forEach((i) -> i.runOnInstance(QueryProcessor::clearPreparedStatementsCache));
+ 
+                         lbp.setPrimary(firstContact == 1 ? 2 : 1);
+ 
+                         if (config.withUse)
+                             session.execute(withKeyspace("USE %s"));
+ 
+                         // Re-preparing on the node
+                         if (!config.skipBrokenBehaviours && firstContact == 1)
+                             session.execute(select.bind());
+ 
+                         c.stream().forEach((i) -> i.runOnInstance(QueryProcessor::clearPreparedStatementsCache));
+ 
+                         lbp.setPrimary(firstContact);
+ 
+                         // Re-preparing on the node with old behaviour will break no matter where the statement was initially prepared
+                         if (!config.skipBrokenBehaviours)
+                             session.execute(select.bind());
+ 
+                         c.stream().forEach((i) -> i.runOnInstance(QueryProcessor::clearPreparedStatementsCache));
+                     }
+                 }
+             }
+         }
+     }
+ 
+     public void testReprepareTwoKeyspaces(BiConsumer<ClassLoader, Integer> instanceInitializer) throws Throwable
+     {
+         try (ICluster<IInvokableInstance> c = init(builder().withNodes(2)
+                                                             .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))
+                                                             .withInstanceInitializer(instanceInitializer)
+                                                             .start()))
+         {
+             c.schemaChange(withKeyspace("CREATE KEYSPACE %s2 WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2};"));
+             c.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck));"));
+ 
+             ForceHostLoadBalancingPolicy lbp = new ForceHostLoadBalancingPolicy();
+ 
+             for (int firstContact : new int[]{ 1, 2 })
+                 try (com.datastax.driver.core.Cluster cluster = com.datastax.driver.core.Cluster.builder()
+                                                                                                 .addContactPoint("127.0.0.1")
+                                                                                                 .addContactPoint("127.0.0.2")
+                                                                                                 .withLoadBalancingPolicy(lbp)
+                                                                                                 .build();
+                      Session session = cluster.connect())
+                 {
+                     {
+                         session.execute(withKeyspace("USE %s"));
+                         c.stream().forEach((i) -> i.runOnInstance(QueryProcessor::clearPreparedStatementsCache));
+ 
+                         lbp.setPrimary(firstContact);
+                         final PreparedStatement select = session.prepare(withKeyspace("SELECT * FROM %s.tbl"));
+                         session.execute(select.bind());
+ 
+                         c.stream().forEach((i) -> i.runOnInstance(QueryProcessor::clearPreparedStatementsCache));
+ 
+                         lbp.setPrimary(firstContact == 1 ? 2 : 1);
+                         session.execute(withKeyspace("USE %s2"));
+                         try
+                         {
+                             session.execute(select.bind());
+                         }
+                         catch (DriverInternalError e)
+                         {
+                             Assert.assertTrue(e.getCause().getMessage().contains("can't execute it on"));
+                             continue;
+                         }
+                         fail("Should have thrown");
+                     }
+                 }
+         }
+     }
+ 
+     protected static class ReprepareTestConfiguration
+     {
+         protected final boolean withUse;
+         protected final boolean skipBrokenBehaviours;
+ 
+         protected ReprepareTestConfiguration(boolean withUse, boolean skipBrokenBehaviours)
+         {
+             this.withUse = withUse;
+             this.skipBrokenBehaviours = skipBrokenBehaviours;
+         }
+     }
+ 
+     public static class PrepareBehaviour
+     {
+         protected static void setReleaseVersion(ClassLoader cl, String value)
+         {
+             new ByteBuddy().rebase(FBUtilities.class)
+                            .method(named("getReleaseVersionString"))
+                            .intercept(FixedValue.value(value))
+                            .make()
+                            .load(cl, ClassLoadingStrategy.Default.INJECTION);
+         }
+ 
+         static void newBehaviour(ClassLoader cl, int nodeNumber)
+         {
+             setReleaseVersion(cl, "3.0.19.63");
+         }
+ 
+         static void oldBehaviour(ClassLoader cl, int nodeNumber)
+         {
+             if (nodeNumber == 1)
+             {
+                 new ByteBuddy().rebase(QueryProcessor.class) // note that we need to `rebase` when we use @SuperCall
+                                .method(named("prepare").and(takesArguments(2)))
+                                .intercept(MethodDelegation.to(PrepareBehaviour.class))
+                                .make()
+                                .load(cl, ClassLoadingStrategy.Default.INJECTION);
+                 setReleaseVersion(cl, "3.0.19.60");
+             }
+             else
+             {
+                 setReleaseVersion(cl, "3.0.19.63");
+             }
+         }
+ 
 -        public static ResultMessage.Prepared prepare(String queryString, QueryState queryState)
++        public static ResultMessage.Prepared prepare(String queryString, ClientState clientState)
+         {
 -            ClientState clientState = queryState.getClientState();
 -            ResultMessage.Prepared existing = QueryProcessor.getStoredPreparedStatement(queryString, clientState.getRawKeyspace(), false);
++            ResultMessage.Prepared existing = QueryProcessor.getStoredPreparedStatement(queryString, clientState.getRawKeyspace());
+             if (existing != null)
+                 return existing;
+ 
 -            ParsedStatement.Prepared prepared = QueryProcessor.getStatement(queryString, clientState);
 -            int boundTerms = prepared.statement.getBoundTerms();
++            CQLStatement statement = QueryProcessor.getStatement(queryString, clientState);
++            QueryHandler.Prepared prepared = new QueryHandler.Prepared(statement, queryString);
++
++            int boundTerms = statement.getBindVariables().size();
+             if (boundTerms > FBUtilities.MAX_UNSIGNED_SHORT)
+                 throw new InvalidRequestException(String.format("Too many markers(?). %d markers exceed the allowed maximum of %d", boundTerms, FBUtilities.MAX_UNSIGNED_SHORT));
 -            assert boundTerms == prepared.boundNames.size();
+ 
 -            return QueryProcessor.storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared, false);
++            return QueryProcessor.storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared);
+         }
 -
+     }
+ 
+     protected static class ForceHostLoadBalancingPolicy implements LoadBalancingPolicy {
+ 
+         protected final List<Host> hosts = new CopyOnWriteArrayList<Host>();
+         protected int currentPrimary = 0;
+ 
+         public void setPrimary(int idx) {
+             this.currentPrimary = idx - 1; // arrays are 0-based
+         }
+ 
+         @Override
+         public void init(Cluster cluster, Collection<Host> hosts) {
+             this.hosts.addAll(hosts);
+             this.hosts.sort(Comparator.comparingInt(h -> h.getAddress().getAddress()[3]));
+         }
+ 
+         @Override
+         public HostDistance distance(Host host) {
+             return HostDistance.LOCAL;
+         }
+ 
+         @Override
+         public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement statement) {
+             if (hosts.isEmpty()) return Collections.emptyIterator();
+             return Iterators.singletonIterator(hosts.get(currentPrimary));
+         }
+ 
+         @Override
+         public void onAdd(Host host) {
+             onUp(host);
+         }
+ 
+         @Override
+         public void onUp(Host host) {
+             hosts.add(host);
+         }
+ 
+         @Override
+         public void onDown(Host host) {
+             // no-op
+         }
+ 
+         @Override
+         public void onRemove(Host host) {
+             // no-op
+         }
+ 
+         @Override
+         public void close() {
+             // no-op
+         }
+     }
+ }
diff --cc test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
index ef705bd,e01b812..d885071
--- a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
@@@ -218,24 -117,18 +218,24 @@@ public class PreparedStatementsTest ext
          session.execute(preparedInsert.bind(1, 1, "value"));
          assertEquals(1, session.execute(preparedSelect.bind(1)).all().size());
  
 -        cluster.close();
 -
 -        cluster = Cluster.builder().addContactPoint("127.0.0.1")
 -                                   .withPort(DatabaseDescriptor.getNativeTransportPort())
 -                                   .build();
 -        session = cluster.connect();
 -
 -        preparedInsert = session.prepare(insertCQL);
 -        preparedSelect = session.prepare(selectCQL);
 -        session.execute(preparedInsert.bind(1, 1, "value"));
 +        try (Cluster newCluster = Cluster.builder()
 +                                 .addContactPoints(nativeAddr)
 +                                 .withClusterName("Test Cluster")
 +                                 .withPort(nativePort)
 +                                 .withoutJMXReporting()
 +                                 .allowBetaProtocolVersion()
 +                                 .build())
 +        {
 +            try (Session newSession = newCluster.connect())
 +            {
 +                newSession.execute("USE " + keyspace());
 +                preparedInsert = newSession.prepare(insertCQL);
 +                preparedSelect = newSession.prepare(selectCQL);
-                 session.execute(preparedInsert.bind(1, 1, "value"));
++                newSession.execute(preparedInsert.bind(1, 1, "value"));
  
-                 assertEquals(1, session.execute(preparedSelect.bind(1)).all().size());
 -        assertEquals(1, session.execute(preparedSelect.bind(1)).all().size());
++                assertEquals(1, newSession.execute(preparedSelect.bind(1)).all().size());
 +            }
 +        }
      }
  
      @Test
diff --cc test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
index eca6c20,753d6ff..c95728b
--- a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
@@@ -185,6 -187,6 +186,7 @@@ public class PstmtPersistenceTest exten
  
      private MD5Digest prepareStatement(String stmt, String keyspace, String table, ClientState clientState)
      {
 -        return QueryProcessor.prepare(String.format(stmt, keyspace + "." + table), clientState, false).statementId;
++        System.out.println(stmt + String.format(stmt, keyspace + "." + table));
 +        return QueryProcessor.prepare(String.format(stmt, keyspace + "." + table), clientState).statementId;
      }
  }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org