You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2021/10/03 21:43:43 UTC
[cassandra] 01/02: Merge branch 'cassandra-3.11' into cassandra-4.0
This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 3660a58ab458b90fdf9de5cda83099b87aad4c86
Merge: b22749b 32a15f0
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Sun Oct 3 23:23:31 2021 +0200
Merge branch 'cassandra-3.11' into cassandra-4.0
.../org/apache/cassandra/cql3/CQLStatement.java | 5 +
.../org/apache/cassandra/cql3/QueryProcessor.java | 50 +++-
.../cql3/statements/ModificationStatement.java | 3 +-
.../cassandra/cql3/statements/SelectStatement.java | 3 +-
.../cassandra/cql3/statements/UseStatement.java | 5 +
.../statements/schema/AlterSchemaStatement.java | 8 +-
src/java/org/apache/cassandra/gms/Gossiper.java | 105 +++++++-
.../gms/IEndpointStateChangeSubscriber.java | 14 +-
.../net/StartupClusterConnectivityChecker.java | 24 --
.../org/apache/cassandra/repair/RepairSession.java | 6 -
.../apache/cassandra/service/LoadBroadcaster.java | 8 -
.../apache/cassandra/streaming/StreamSession.java | 6 -
.../cassandra/utils/RecomputingSupplier.java | 122 +++++++++
.../distributed/test/GossipShutdownTest.java | 7 -
.../distributed/test/NativeProtocolTest.java | 14 +-
.../test/ReprepareNewBehaviourTest.java | 38 +++
.../distributed/test/ReprepareTestBase.java | 281 +++++++++++++++++++++
.../test/ReprepareTestOldBehaviour.java | 129 ++++++++++
.../cassandra/cql3/PreparedStatementsTest.java | 4 +-
.../cassandra/cql3/PstmtPersistenceTest.java | 36 +--
.../apache/cassandra/metrics/CQLMetricsTest.java | 5 +-
.../cassandra/utils/RecomputingSupplierTest.java | 150 +++++++++++
22 files changed, 933 insertions(+), 90 deletions(-)
diff --cc src/java/org/apache/cassandra/cql3/CQLStatement.java
index c34e27f,901ecd4..e78f733
--- a/src/java/org/apache/cassandra/cql3/CQLStatement.java
+++ b/src/java/org/apache/cassandra/cql3/CQLStatement.java
@@@ -84,30 -59,12 +84,35 @@@ public interface CQLStatemen
*
* @param state the current query state
*/
- public ResultMessage executeInternal(QueryState state, QueryOptions options) throws RequestValidationException, RequestExecutionException;
+ public ResultMessage executeLocally(QueryState state, QueryOptions options);
/**
- * Return an Iterable over all of the functions (both native and user-defined) used by any component
- * of the statement
- * @return functions all functions found (may contain duplicates)
+ * Provides the context needed for audit logging statements.
+ */
+ AuditLogContext getAuditLogContext();
+
+ /**
+ * Whether or not this CQL Statement has LWT conditions
*/
- public Iterable<Function> getFunctions();
+ default public boolean hasConditions()
+ {
+ return false;
+ }
+
+ public static abstract class Raw
+ {
+ protected VariableSpecifications bindVariables;
+
+ public void setBindVariables(List<ColumnIdentifier> variables)
+ {
+ bindVariables = new VariableSpecifications(variables);
+ }
+
+ public abstract CQLStatement prepare(ClientState state);
+ }
++
++ public static interface SingleKeyspaceCqlStatement extends CQLStatement
++ {
++ public String keyspace();
++ }
}
diff --cc src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 87829ab,589a25a..3c04c2d
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@@ -61,8 -61,19 +62,18 @@@ import static org.apache.cassandra.cql3
public class QueryProcessor implements QueryHandler
{
- public static final CassandraVersion CQL_VERSION = new CassandraVersion("3.4.4");
+ public static final CassandraVersion CQL_VERSION = new CassandraVersion("3.4.5");
+ /**
+ * If a query is prepared with a fully qualified name, but the user also uses USE (specifically when USE keyspace
+ * is different) then the IDs generated could change over time; invalidating the assumption that IDs won't ever
+ * change. In the version defined below, the USE keyspace is ignored when a fully-qualified name is used as an
+ * attempt to make IDs stable.
+ */
+ private static final CassandraVersion PREPARE_ID_BEHAVIOR_CHANGE_30 = new CassandraVersion("3.0.26");
+ private static final CassandraVersion PREPARE_ID_BEHAVIOR_CHANGE_3X = new CassandraVersion("3.11.12");
+ private static final CassandraVersion PREPARE_ID_BEHAVIOR_CHANGE_40 = new CassandraVersion("4.0.1");
+
-
public static final QueryProcessor instance = new QueryProcessor();
private static final Logger logger = LoggerFactory.getLogger(QueryProcessor.class);
@@@ -436,14 -430,36 +447,37 @@@
if (existing != null)
return existing;
- ParsedStatement.Prepared prepared = getStatement(queryString, clientState);
- prepared.rawCQLStatement = queryString;
- int boundTerms = prepared.statement.getBoundTerms();
+ CQLStatement statement = getStatement(queryString, clientState);
+ Prepared prepared = new Prepared(statement, queryString);
+
+ int boundTerms = statement.getBindVariables().size();
if (boundTerms > FBUtilities.MAX_UNSIGNED_SHORT)
throw new InvalidRequestException(String.format("Too many markers(?). %d markers exceed the allowed maximum of %d", boundTerms, FBUtilities.MAX_UNSIGNED_SHORT));
- assert boundTerms == prepared.boundNames.size();
- return storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared);
- if (prepared.keyspace != null)
++ if (statement instanceof CQLStatement.SingleKeyspaceCqlStatement)
+ {
+ // Edge-case of CASSANDRA-15252 in mixed-mode cluster. We accept that 15252 itself can manifest in a
+ // cluster that has both old and new nodes, but we would like to avoid a situation when the fix adds
+ // a new behaviour that can break which, in addition, can get triggered more frequently.
+ // If statement ID was generated on the old node _with_ use, when attempting to execute on the new node,
+ // we may fall into infinite loop. To break out of this loop, we put a prepared statement that client
+ // expects into cache, so that it could get PREPARED response on the second try.
- ResultMessage.Prepared newBehavior = storePreparedStatement(queryString, null, prepared, forThrift);
- ResultMessage.Prepared oldBehavior = clientState.getRawKeyspace() != null ? storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared, forThrift) : newBehavior;
++ ResultMessage.Prepared newBehavior = storePreparedStatement(queryString, null, prepared);
++ ResultMessage.Prepared oldBehavior = clientState.getRawKeyspace() != null ? storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared) : newBehavior;
+ CassandraVersion minVersion = Gossiper.instance.getMinVersion(20, TimeUnit.MILLISECONDS);
+
+ // Default to old behaviour in case we're not sure about the version. Even if we ever flip back to the old
+ // behaviour due to the gossip bug or incorrect version string, we'll end up with two re-prepare round-trips.
++
+ return minVersion != null &&
+ ((minVersion.major == 3 && minVersion.minor == 0 && minVersion.compareTo(PREPARE_ID_BEHAVIOR_CHANGE_30) >= 0) ||
+ (minVersion.major == 3 && minVersion.minor != 0 && minVersion.compareTo(PREPARE_ID_BEHAVIOR_CHANGE_3X) >= 0) ||
+ (minVersion.major == 4 && minVersion.compareTo(PREPARE_ID_BEHAVIOR_CHANGE_40) >= 0)) ? newBehavior : oldBehavior;
+ }
+ else
+ {
- return storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared, forThrift);
++ return storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared);
+ }
}
private static MD5Digest computeId(String queryString, String keyspace)
@@@ -452,23 -468,42 +486,25 @@@
return MD5Digest.compute(toHash);
}
- private static ResultMessage.Prepared getStoredPreparedStatement(String queryString, String keyspace)
- private static Integer computeThriftId(String queryString, String keyspace)
- {
- String toHash = keyspace == null ? queryString : keyspace + queryString;
- return toHash.hashCode();
- }
-
+ @VisibleForTesting
- public static ResultMessage.Prepared getStoredPreparedStatement(String queryString, String clientKeyspace, boolean forThrift)
++ public static ResultMessage.Prepared getStoredPreparedStatement(String queryString, String clientKeyspace)
throws InvalidRequestException
{
- MD5Digest statementId = computeId(queryString, keyspace);
- if (forThrift)
- {
- Integer thriftStatementId = computeThriftId(queryString, clientKeyspace);
- ParsedStatement.Prepared existing = thriftPreparedStatements.get(thriftStatementId);
- if (existing == null)
- return null;
++ MD5Digest statementId = computeId(queryString, clientKeyspace);
+ Prepared existing = preparedStatements.getIfPresent(statementId);
+ if (existing == null)
+ return null;
- checkTrue(queryString.equals(existing.rawCQLStatement),
- String.format("MD5 hash collision: query with the same MD5 hash was already prepared. \n Existing: '%s'", existing.rawCQLStatement));
- return ResultMessage.Prepared.forThrift(thriftStatementId, existing.boundNames);
- }
- else
- {
- MD5Digest statementId = computeId(queryString, clientKeyspace);
- ParsedStatement.Prepared existing = preparedStatements.get(statementId);
- if (existing == null)
- return null;
+ checkTrue(queryString.equals(existing.rawCQLStatement),
+ String.format("MD5 hash collision: query with the same MD5 hash was already prepared. \n Existing: '%s'", existing.rawCQLStatement));
- checkTrue(queryString.equals(existing.rawCQLStatement),
- String.format("MD5 hash collision: query with the same MD5 hash was already prepared. \n Existing: '%s'", existing.rawCQLStatement));
- return new ResultMessage.Prepared(statementId, existing);
- }
+ ResultSet.PreparedMetadata preparedMetadata = ResultSet.PreparedMetadata.fromPrepared(existing.statement);
+ ResultSet.ResultMetadata resultMetadata = ResultSet.ResultMetadata.fromPrepared(existing.statement);
+ return new ResultMessage.Prepared(statementId, resultMetadata.getResultMetadataId(), preparedMetadata, resultMetadata);
}
- private static ResultMessage.Prepared storePreparedStatement(String queryString, String keyspace, Prepared prepared)
+ @VisibleForTesting
- public static ResultMessage.Prepared storePreparedStatement(String queryString, String keyspace, ParsedStatement.Prepared prepared, boolean forThrift)
++ public static ResultMessage.Prepared storePreparedStatement(String queryString, String keyspace, Prepared prepared)
throws InvalidRequestException
{
// Concatenate the current keyspace so we don't mix prepared statements between keyspace (#5352).
@@@ -608,7 -656,14 +644,13 @@@
internalStatements.clear();
}
+ @VisibleForTesting
+ public static void clearPreparedStatementsCache()
+ {
- preparedStatements.clear();
- thriftPreparedStatements.clear();
++ preparedStatements.asMap().clear();
+ }
+
- private static class MigrationSubscriber extends MigrationListener
+ private static class StatementInvalidatingListener extends SchemaChangeListener
{
private static void removeInvalidPreparedStatements(String ksName, String cfName)
{
diff --cc src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 087f3b0,8376a0a..a072da5
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@@ -66,7 -59,7 +66,7 @@@ import static org.apache.cassandra.cql3
/*
* Abstract parent class of individual modifications, i.e. INSERT, UPDATE and DELETE.
*/
--public abstract class ModificationStatement implements CQLStatement
++public abstract class ModificationStatement implements CQLStatement.SingleKeyspaceCqlStatement
{
protected static final Logger logger = LoggerFactory.getLogger(ModificationStatement.class);
@@@ -192,13 -163,18 +192,14 @@@
return restrictions;
}
- public abstract void addUpdateForKey(PartitionUpdate update, Clustering clustering, UpdateParameters params);
+ public abstract void addUpdateForKey(PartitionUpdate.Builder updateBuilder, Clustering<?> clustering, UpdateParameters params);
- public abstract void addUpdateForKey(PartitionUpdate update, Slice slice, UpdateParameters params);
-
- public int getBoundTerms()
- {
- return boundTerms;
- }
+ public abstract void addUpdateForKey(PartitionUpdate.Builder updateBuilder, Slice slice, UpdateParameters params);
++ @Override
public String keyspace()
{
- return cfm.ksName;
+ return metadata.keyspace;
}
public String columnFamily()
diff --cc src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 774bd68,632bf94..494d9af
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@@ -85,7 -114,7 +85,7 @@@ import static org.apache.cassandra.util
* QueryHandler implementations, so before reducing their accessibility
* due consideration should be given.
*/
--public class SelectStatement implements CQLStatement
++public class SelectStatement implements CQLStatement.SingleKeyspaceCqlStatement
{
private static final Logger logger = LoggerFactory.getLogger(SelectStatement.class);
@@@ -474,14 -481,12 +474,15 @@@
public ResultSet process(PartitionIterator partitions, int nowInSec) throws InvalidRequestException
{
- return process(partitions, QueryOptions.DEFAULT, nowInSec, getLimit(QueryOptions.DEFAULT));
+ QueryOptions options = QueryOptions.DEFAULT;
+ Selectors selectors = selection.newSelectors(options);
+ return process(partitions, options, selectors, nowInSec, getLimit(options));
}
++ @Override
public String keyspace()
{
- return cfm.ksName;
+ return table.keyspace;
}
public String columnFamily()
diff --cc src/java/org/apache/cassandra/cql3/statements/UseStatement.java
index 3013d9f,0242f09..0504f43
--- a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
@@@ -64,16 -65,4 +64,21 @@@ public class UseStatement extends CQLSt
// but for some unit tests we need to set the keyspace (e.g. for tests with DROP INDEX)
return execute(state, options, System.nanoTime());
}
+
+ @Override
+ public String toString()
+ {
+ return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
+ }
+
+ @Override
+ public AuditLogContext getAuditLogContext()
+ {
+ return new AuditLogContext(AuditLogEntryType.USE_KEYSPACE, keyspace);
+ }
++
++ public String keyspace()
++ {
++ return keyspace;
++ }
}
diff --cc src/java/org/apache/cassandra/cql3/statements/schema/AlterSchemaStatement.java
index 161c9c4,0000000..124d04c
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/statements/schema/AlterSchemaStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/schema/AlterSchemaStatement.java
@@@ -1,153 -1,0 +1,159 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements.schema;
+
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.auth.AuthenticatedUser;
+import org.apache.cassandra.auth.IResource;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.*;
+import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.transport.Event.SchemaChange;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
- abstract class AlterSchemaStatement implements CQLStatement, SchemaTransformation
++abstract public class AlterSchemaStatement implements CQLStatement.SingleKeyspaceCqlStatement, SchemaTransformation
+{
+ protected final String keyspaceName; // name of the keyspace affected by the statement
+
+ protected AlterSchemaStatement(String keyspaceName)
+ {
+ this.keyspaceName = keyspaceName;
+ }
+
+ public final void validate(ClientState state)
+ {
+ // no-op; validation is performed while executing the statement, in apply()
+ }
+
+ public ResultMessage execute(QueryState state, QueryOptions options, long queryStartNanoTime)
+ {
+ return execute(state, false);
+ }
+
++ @Override
++ public String keyspace()
++ {
++ return keyspaceName;
++ }
++
+ public ResultMessage executeLocally(QueryState state, QueryOptions options)
+ {
+ return execute(state, true);
+ }
+
+ /**
+ * TODO: document
+ */
+ abstract SchemaChange schemaChangeEvent(KeyspacesDiff diff);
+
+ /**
+ * Schema alteration may result in a new database object (keyspace, table, role, function) being created capable of
+ * having permissions GRANTed on it. The creator of the object (the primary role assigned to the AuthenticatedUser
+ * performing the operation) is automatically granted ALL applicable permissions on the object. This is a hook for
+ * subclasses to override in order indicate which resources to to perform that grant on when the statement is executed.
+ *
+ * Only called if the transformation resulted in a non-empty diff.
+ */
+ Set<IResource> createdResources(KeyspacesDiff diff)
+ {
+ return ImmutableSet.of();
+ }
+
+ /**
+ * Schema alteration might produce a client warning (e.g. a warning to run full repair when increading RF of a keyspace).
+ * This method should be used to generate them instead of calling warn() in transformation code.
+ *
+ * Only called if the transformation resulted in a non-empty diff.
+ */
+ Set<String> clientWarnings(KeyspacesDiff diff)
+ {
+ return ImmutableSet.of();
+ }
+
+ public ResultMessage execute(QueryState state, boolean locally)
+ {
+ if (SchemaConstants.isLocalSystemKeyspace(keyspaceName))
+ throw ire("System keyspace '%s' is not user-modifiable", keyspaceName);
+
+ KeyspaceMetadata keyspace = Schema.instance.getKeyspaceMetadata(keyspaceName);
+ if (null != keyspace && keyspace.isVirtual())
+ throw ire("Virtual keyspace '%s' is not user-modifiable", keyspaceName);
+
+ validateKeyspaceName();
+
+ KeyspacesDiff diff = MigrationManager.announce(this, locally);
+
+ clientWarnings(diff).forEach(ClientWarn.instance::warn);
+
+ if (diff.isEmpty())
+ return new ResultMessage.Void();
+
+ /*
+ * When a schema alteration results in a new db object being created, we grant permissions on the new
+ * object to the user performing the request if:
+ * - the user is not anonymous
+ * - the configured IAuthorizer supports granting of permissions (not all do, AllowAllAuthorizer doesn't and
+ * custom external implementations may not)
+ */
+ AuthenticatedUser user = state.getClientState().getUser();
+ if (null != user && !user.isAnonymous())
+ createdResources(diff).forEach(r -> grantPermissionsOnResource(r, user));
+
+ return new ResultMessage.SchemaChange(schemaChangeEvent(diff));
+ }
+
+ private void validateKeyspaceName()
+ {
+ if (!SchemaConstants.isValidName(keyspaceName))
+ {
+ throw ire("Keyspace name must not be empty, more than %d characters long, " +
+ "or contain non-alphanumeric-underscore characters (got '%s')",
+ SchemaConstants.NAME_LENGTH, keyspaceName);
+ }
+ }
+
+ private void grantPermissionsOnResource(IResource resource, AuthenticatedUser user)
+ {
+ try
+ {
+ DatabaseDescriptor.getAuthorizer()
+ .grant(AuthenticatedUser.SYSTEM_USER,
+ resource.applicablePermissions(),
+ resource,
+ user.getPrimaryRole());
+ }
+ catch (UnsupportedOperationException e)
+ {
+ // not a problem - grant is an optional method on IAuthorizer
+ }
+ }
+
+ static InvalidRequestException ire(String format, Object... args)
+ {
+ return new InvalidRequestException(String.format(format, args));
+ }
+}
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index 6eb674f,363db5f..c147434
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -33,7 -34,6 +33,8 @@@ import com.google.common.base.Throwable
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
+import com.google.common.collect.ImmutableSet;
++import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFutureTask;
import com.google.common.util.concurrent.Uninterruptibles;
@@@ -51,23 -46,26 +52,30 @@@ import org.apache.cassandra.utils.Pair
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
++import io.netty.util.concurrent.FastThreadLocal;
import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.RequestCallback;
+import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.utils.CassandraVersion;
++import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
++import org.apache.cassandra.utils.MBeanWrapper;
++import org.apache.cassandra.utils.NoSpamLogger;
++import org.apache.cassandra.utils.Pair;
+ import org.apache.cassandra.utils.RecomputingSupplier;
-import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination;
-import static org.apache.cassandra.utils.ExecutorUtils.shutdown;
+import static org.apache.cassandra.config.CassandraRelevantProperties.GOSSIPER_QUARANTINE_DELAY;
+import static org.apache.cassandra.net.NoPayload.noPayload;
+import static org.apache.cassandra.net.Verb.ECHO_REQ;
+import static org.apache.cassandra.net.Verb.GOSSIP_DIGEST_SYN;
/**
* This module is responsible for Gossiping information for the local endpoint. This abstraction
@@@ -333,7 -259,9 +341,10 @@@ public class Gossiper implements IFailu
}
}
- Gossiper(boolean registerJmx)
+ private final RecomputingSupplier<CassandraVersion> minVersionSupplier = new RecomputingSupplier<>(this::computeMinVersion, executor);
+
- private Gossiper()
++ @VisibleForTesting
++ public Gossiper(boolean registerJmx)
{
// half of QUARATINE_DELAY, to ensure justRemovedEndpoints has enough leeway to prevent re-gossip
fatClientTimeout = (QUARANTINE_DELAY / 2);
@@@ -341,10 -269,33 +352,35 @@@
FailureDetector.instance.registerFailureDetectionEventListener(this);
// Register this instance with JMX
- MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
-
+ if (registerJmx)
+ {
+ MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
+ }
+
+ subscribers.add(new IEndpointStateChangeSubscriber()
+ {
- public void onJoin(InetAddress endpoint, EndpointState state)
++ public void onJoin(InetAddressAndPort endpoint, EndpointState state)
+ {
+ maybeRecompute(state);
+ }
+
- public void onAlive(InetAddress endpoint, EndpointState state)
++ public void onAlive(InetAddressAndPort endpoint, EndpointState state)
+ {
+ maybeRecompute(state);
+ }
+
+ private void maybeRecompute(EndpointState state)
+ {
+ if (state.getApplicationState(ApplicationState.RELEASE_VERSION) != null)
+ minVersionSupplier.recompute();
+ }
+
- public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
++ public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value)
+ {
+ if (state == ApplicationState.RELEASE_VERSION)
+ minVersionSupplier.recompute();
+ }
+ });
}
public void setLastProcessedMessageAt(long timeInMillis)
@@@ -1712,8 -1506,9 +1748,9 @@@
buildSeedsList();
/* initialize the heartbeat state for this localEndpoint */
maybeInitializeLocalState(generationNbr);
- EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddress());
+ EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort());
localState.addApplicationStates(preloadLocalStates);
+ minVersionSupplier.recompute();
//notify snitches that Gossiper is about to start
DatabaseDescriptor.getEndpointSnitch().gossiperStarting();
@@@ -2285,4 -1906,76 +2322,70 @@@
stop();
ExecutorUtils.shutdownAndWait(timeout, unit, executor);
}
+
+ @Nullable
+ public CassandraVersion getMinVersion(int delay, TimeUnit timeUnit)
+ {
+ try
+ {
+ return minVersionSupplier.get(delay, timeUnit);
+ }
+ catch (TimeoutException e)
+ {
+ // Timeouts here are harmless: they won't cause reprepares and may only
+ // cause the old version of the hash to be kept for longer
+ return null;
+ }
+ catch (Throwable e)
+ {
+ logger.error("Caught an exception while waiting for min version", e);
+ return null;
+ }
+ }
+
+ @Nullable
- private String getReleaseVersionString(InetAddress ep)
++ private String getReleaseVersionString(InetAddressAndPort ep)
+ {
+ EndpointState state = getEndpointStateForEndpoint(ep);
+ if (state == null)
+ return null;
+
+ VersionedValue value = state.getApplicationState(ApplicationState.RELEASE_VERSION);
+ return value == null ? null : value.value;
+ }
+
+ private CassandraVersion computeMinVersion()
+ {
+ CassandraVersion minVersion = null;
+
- for (InetAddress addr : Iterables.concat(Gossiper.instance.getLiveMembers(),
++ for (InetAddressAndPort addr : Iterables.concat(Gossiper.instance.getLiveMembers(),
+ Gossiper.instance.getUnreachableMembers()))
+ {
+ String versionString = getReleaseVersionString(addr);
+ // Raced with changes to gossip state, wait until next iteration
+ if (versionString == null)
+ return null;
+
+ CassandraVersion version;
+
+ try
+ {
+ version = new CassandraVersion(versionString);
+ }
+ catch (Throwable t)
+ {
+ JVMStabilityInspector.inspectThrowable(t);
+ String message = String.format("Can't parse version string %s", versionString);
+ logger.warn(message);
+ if (logger.isDebugEnabled())
+ logger.debug(message, t);
+ return null;
+ }
+
+ if (minVersion == null || version.compareTo(minVersion) < 0)
+ minVersion = version;
+ }
+
+ return minVersion;
+ }
-
- @VisibleForTesting
- public void setAnyNodeOn30(boolean anyNodeOn30)
- {
- this.anyNodeOn30 = anyNodeOn30;
- }
}
diff --cc src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
index dc81650,861d4ac..df04697
--- a/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
+++ b/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
@@@ -36,17 -36,17 +36,17 @@@ public interface IEndpointStateChangeSu
* @param endpoint endpoint for which the state change occurred.
* @param epState state that actually changed for the above endpoint.
*/
- public void onJoin(InetAddressAndPort endpoint, EndpointState epState);
- default void onJoin(InetAddress endpoint, EndpointState epState) {}
++ default void onJoin(InetAddressAndPort endpoint, EndpointState epState) {}
- public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue);
- default void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
++ default void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
- public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value);
- default void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
++ default void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) {}
- public void onAlive(InetAddressAndPort endpoint, EndpointState state);
- default void onAlive(InetAddress endpoint, EndpointState state) {}
++ default void onAlive(InetAddressAndPort endpoint, EndpointState state) {}
- public void onDead(InetAddressAndPort endpoint, EndpointState state);
- default void onDead(InetAddress endpoint, EndpointState state) {}
++ default void onDead(InetAddressAndPort endpoint, EndpointState state) {}
- public void onRemove(InetAddressAndPort endpoint);
- default void onRemove(InetAddress endpoint) {}
++ default void onRemove(InetAddressAndPort endpoint) {}
/**
* Called whenever a node is restarted.
@@@ -54,5 -54,5 +54,5 @@@
* previously marked down. It will have only if {@code state.isAlive() == false}
* as {@code state} is from before the restarted node is marked up.
*/
- public void onRestart(InetAddressAndPort endpoint, EndpointState state);
- default void onRestart(InetAddress endpoint, EndpointState state) {}
++ default void onRestart(InetAddressAndPort endpoint, EndpointState state) {}
}
diff --cc src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
index b901338,0000000..8bc1e5d
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
+++ b/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
@@@ -1,275 -1,0 +1,251 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.net.Verb.PING_REQ;
+import static org.apache.cassandra.net.ConnectionType.LARGE_MESSAGES;
+import static org.apache.cassandra.net.ConnectionType.SMALL_MESSAGES;
+
+public class StartupClusterConnectivityChecker
+{
+ private static final Logger logger = LoggerFactory.getLogger(StartupClusterConnectivityChecker.class);
+
+ private final boolean blockForRemoteDcs;
+ private final long timeoutNanos;
+
+ public static StartupClusterConnectivityChecker create(long timeoutSecs, boolean blockForRemoteDcs)
+ {
+ if (timeoutSecs > 100)
+ logger.warn("setting the block-for-peers timeout (in seconds) to {} might be a bit excessive, but using it nonetheless", timeoutSecs);
+ long timeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSecs);
+
+ return new StartupClusterConnectivityChecker(timeoutNanos, blockForRemoteDcs);
+ }
+
+ @VisibleForTesting
+ StartupClusterConnectivityChecker(long timeoutNanos, boolean blockForRemoteDcs)
+ {
+ this.blockForRemoteDcs = blockForRemoteDcs;
+ this.timeoutNanos = timeoutNanos;
+ }
+
+ /**
+ * @param peers The currently known peers in the cluster; argument is not modified.
+ * @param getDatacenterSource A function for mapping peers to their datacenter.
+ * @return true if the requested percentage of peers are marked ALIVE in gossip and have their connections opened;
+ * else false.
+ */
+ public boolean execute(Set<InetAddressAndPort> peers, Function<InetAddressAndPort, String> getDatacenterSource)
+ {
+ if (peers == null || this.timeoutNanos < 0)
+ return true;
+
+ // make a copy of the set, to avoid mucking with the input (in case it's a sensitive collection)
+ peers = new HashSet<>(peers);
+ InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
+ String localDc = getDatacenterSource.apply(localAddress);
+
+ peers.remove(localAddress);
+ if (peers.isEmpty())
+ return true;
+
+ // make a copy of the datacenter mapping (in case gossip updates happen during this method or some such)
+ Map<InetAddressAndPort, String> peerToDatacenter = new HashMap<>();
+ SetMultimap<String, InetAddressAndPort> datacenterToPeers = HashMultimap.create();
+
+ for (InetAddressAndPort peer : peers)
+ {
+ String datacenter = getDatacenterSource.apply(peer);
+ peerToDatacenter.put(peer, datacenter);
+ datacenterToPeers.put(datacenter, peer);
+ }
+
+ // In the case where we do not want to block startup on remote datacenters (e.g. because clients only use
+ // LOCAL_X consistency levels), we remove all other datacenter hosts from the mapping and we only wait
+ // on the remaining local datacenter.
+ if (!blockForRemoteDcs)
+ {
+ datacenterToPeers.keySet().retainAll(Collections.singleton(localDc));
+ logger.info("Blocking coordination until only a single peer is DOWN in the local datacenter, timeout={}s",
+ TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
+ }
+ else
+ {
+ logger.info("Blocking coordination until only a single peer is DOWN in each datacenter, timeout={}s",
+ TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
+ }
+
+ AckMap acks = new AckMap(3);
+ Map<String, CountDownLatch> dcToRemainingPeers = new HashMap<>(datacenterToPeers.size());
+ for (String datacenter: datacenterToPeers.keys())
+ {
+ dcToRemainingPeers.put(datacenter,
+ new CountDownLatch(Math.max(datacenterToPeers.get(datacenter).size() - 1, 0)));
+ }
+
+ long startNanos = System.nanoTime();
+
+ // set up a listener to react to new nodes becoming alive (in gossip), and account for all the nodes that are already alive
+ Set<InetAddressAndPort> alivePeers = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ AliveListener listener = new AliveListener(alivePeers, dcToRemainingPeers, acks, peerToDatacenter::get);
+ Gossiper.instance.register(listener);
+
+ // send out a ping message to open up the non-gossip connections to all peers. Note that this sends the
+ // ping messages to _all_ peers, not just the ones we block for in dcToRemainingPeers.
+ sendPingMessages(peers, dcToRemainingPeers, acks, peerToDatacenter::get);
+
+ for (InetAddressAndPort peer : peers)
+ {
+ if (Gossiper.instance.isAlive(peer) && alivePeers.add(peer) && acks.incrementAndCheck(peer))
+ {
+ String datacenter = peerToDatacenter.get(peer);
+ // We have to check because we might only have the local DC in the map
+ if (dcToRemainingPeers.containsKey(datacenter))
+ dcToRemainingPeers.get(datacenter).countDown();
+ }
+ }
+
+ boolean succeeded = true;
+ for (CountDownLatch countDownLatch : dcToRemainingPeers.values())
+ {
+ long remainingNanos = Math.max(1, timeoutNanos - (System.nanoTime() - startNanos));
+ //noinspection UnstableApiUsage
+ succeeded &= Uninterruptibles.awaitUninterruptibly(countDownLatch, remainingNanos, TimeUnit.NANOSECONDS);
+ }
+
+ Gossiper.instance.unregister(listener);
+
+ Map<String, Long> numDown = dcToRemainingPeers.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey,
+ e -> e.getValue().getCount()));
+
+ if (succeeded)
+ {
+ logger.info("Ensured sufficient healthy connections with {} after {} milliseconds",
+ numDown.keySet(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
+ }
+ else
+ {
+ logger.warn("Timed out after {} milliseconds, was waiting for remaining peers to connect: {}",
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos), numDown);
+ }
+
+ return succeeded;
+ }
+
+ /**
+ * Sends a "connection warmup" message to each peer in the collection, on every {@link ConnectionType}
+ * used for internode messaging (that is not gossip).
+ */
+ private void sendPingMessages(Set<InetAddressAndPort> peers, Map<String, CountDownLatch> dcToRemainingPeers,
+ AckMap acks, Function<InetAddressAndPort, String> getDatacenter)
+ {
+ RequestCallback responseHandler = msg -> {
+ if (acks.incrementAndCheck(msg.from()))
+ {
+ String datacenter = getDatacenter.apply(msg.from());
+ // We have to check because we might only have the local DC in the map
+ if (dcToRemainingPeers.containsKey(datacenter))
+ dcToRemainingPeers.get(datacenter).countDown();
+ }
+ };
+
+ Message<PingRequest> small = Message.out(PING_REQ, PingRequest.forSmall);
+ Message<PingRequest> large = Message.out(PING_REQ, PingRequest.forLarge);
+ for (InetAddressAndPort peer : peers)
+ {
+ MessagingService.instance().sendWithCallback(small, peer, responseHandler, SMALL_MESSAGES);
+ MessagingService.instance().sendWithCallback(large, peer, responseHandler, LARGE_MESSAGES);
+ }
+ }
+
+ /**
+ * A trivial implementation of {@link IEndpointStateChangeSubscriber} that really only cares about
+ * {@link #onAlive(InetAddressAndPort, EndpointState)} invocations.
+ */
+ private static final class AliveListener implements IEndpointStateChangeSubscriber
+ {
+ private final Map<String, CountDownLatch> dcToRemainingPeers;
+ private final Set<InetAddressAndPort> livePeers;
+ private final Function<InetAddressAndPort, String> getDatacenter;
+ private final AckMap acks;
+
+ AliveListener(Set<InetAddressAndPort> livePeers, Map<String, CountDownLatch> dcToRemainingPeers,
+ AckMap acks, Function<InetAddressAndPort, String> getDatacenter)
+ {
+ this.livePeers = livePeers;
+ this.dcToRemainingPeers = dcToRemainingPeers;
+ this.acks = acks;
+ this.getDatacenter = getDatacenter;
+ }
+
- public void onJoin(InetAddressAndPort endpoint, EndpointState epState)
- {
- }
-
- public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue)
- {
- }
-
- public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value)
- {
- }
-
+ public void onAlive(InetAddressAndPort endpoint, EndpointState state)
+ {
+ if (livePeers.add(endpoint) && acks.incrementAndCheck(endpoint))
+ {
+ String datacenter = getDatacenter.apply(endpoint);
+ if (dcToRemainingPeers.containsKey(datacenter))
+ dcToRemainingPeers.get(datacenter).countDown();
+ }
+ }
-
- public void onDead(InetAddressAndPort endpoint, EndpointState state)
- {
- }
-
- public void onRemove(InetAddressAndPort endpoint)
- {
- }
-
- public void onRestart(InetAddressAndPort endpoint, EndpointState state)
- {
- }
+ }
+
+ private static final class AckMap
+ {
+ private final int threshold;
+ private final Map<InetAddressAndPort, AtomicInteger> acks;
+
+ AckMap(int threshold)
+ {
+ this.threshold = threshold;
+ acks = new ConcurrentHashMap<>();
+ }
+
+ boolean incrementAndCheck(InetAddressAndPort address)
+ {
+ return acks.computeIfAbsent(address, addr -> new AtomicInteger(0)).incrementAndGet() == threshold;
+ }
+ }
+}
diff --cc src/java/org/apache/cassandra/repair/RepairSession.java
index e13c90c,1207d36..75ed1a3
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@@ -342,13 -333,7 +342,7 @@@ public class RepairSession extends Abst
terminate();
}
- public void onJoin(InetAddressAndPort endpoint, EndpointState epState) {}
- public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
- public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) {}
- public void onAlive(InetAddressAndPort endpoint, EndpointState state) {}
- public void onDead(InetAddressAndPort endpoint, EndpointState state) {}
-
- public void onRemove(InetAddress endpoint)
+ public void onRemove(InetAddressAndPort endpoint)
{
convict(endpoint, Double.MAX_VALUE);
}
diff --cc src/java/org/apache/cassandra/service/LoadBroadcaster.java
index 113aead,0a3e1a4..ebda3cd
--- a/src/java/org/apache/cassandra/service/LoadBroadcaster.java
+++ b/src/java/org/apache/cassandra/service/LoadBroadcaster.java
@@@ -60,16 -60,8 +60,8 @@@ public class LoadBroadcaster implement
onChange(endpoint, ApplicationState.LOAD, localValue);
}
}
-
- public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
-
- public void onAlive(InetAddressAndPort endpoint, EndpointState state) {}
-
- public void onDead(InetAddressAndPort endpoint, EndpointState state) {}
-
- public void onRestart(InetAddressAndPort endpoint, EndpointState state) {}
- public void onRemove(InetAddress endpoint)
+ public void onRemove(InetAddressAndPort endpoint)
{
loadInfo.remove(endpoint);
}
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index 3a32834,675304f..0b4c0cd
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -910,15 -749,9 +910,9 @@@ public class StreamSession implements I
maybeCompleted();
}
- public void onJoin(InetAddressAndPort endpoint, EndpointState epState) {}
- public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
- public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) {}
- public void onAlive(InetAddressAndPort endpoint, EndpointState state) {}
- public void onDead(InetAddressAndPort endpoint, EndpointState state) {}
-
- public void onRemove(InetAddress endpoint)
+ public void onRemove(InetAddressAndPort endpoint)
{
- logger.error("[Stream #{}] Session failed because remote peer {} has left.", planId(), peer.getHostAddress());
+ logger.error("[Stream #{}] Session failed because remote peer {} has left.", planId(), peer.toString());
closeSession(State.FAILED);
}
diff --cc test/distributed/org/apache/cassandra/distributed/test/NativeProtocolTest.java
index 0905b92,92751ef..60fe5b6
--- a/test/distributed/org/apache/cassandra/distributed/test/NativeProtocolTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/NativeProtocolTest.java
@@@ -27,7 -26,8 +26,16 @@@ import com.datastax.driver.core.ResultS
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
++import com.datastax.driver.core.exceptions.DriverInternalError;
++import com.datastax.driver.core.policies.LoadBalancingPolicy;
++import net.bytebuddy.ByteBuddy;
++import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
++import net.bytebuddy.implementation.MethodDelegation;
++import org.apache.cassandra.cql3.CQLStatement;
++import org.apache.cassandra.cql3.QueryHandler;
++import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.distributed.api.ICluster;
+ import org.apache.cassandra.distributed.impl.RowUtil;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
diff --cc test/distributed/org/apache/cassandra/distributed/test/ReprepareTestBase.java
index 0000000,c270144..02af6d5
mode 000000,100644..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ReprepareTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ReprepareTestBase.java
@@@ -1,0 -1,281 +1,281 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.cassandra.distributed.test;
+
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.Comparator;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.concurrent.CopyOnWriteArrayList;
++import java.util.concurrent.TimeUnit;
+ import java.util.function.BiConsumer;
+
+ import com.google.common.collect.Iterators;
+ import org.junit.Assert;
+
+ import com.datastax.driver.core.Cluster;
+ import com.datastax.driver.core.Host;
+ import com.datastax.driver.core.HostDistance;
+ import com.datastax.driver.core.PreparedStatement;
+ import com.datastax.driver.core.Session;
+ import com.datastax.driver.core.Statement;
+ import com.datastax.driver.core.exceptions.DriverInternalError;
+ import com.datastax.driver.core.policies.LoadBalancingPolicy;
+ import net.bytebuddy.ByteBuddy;
+ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+ import net.bytebuddy.implementation.FixedValue;
+ import net.bytebuddy.implementation.MethodDelegation;
++import org.apache.cassandra.cql3.CQLStatement;
++import org.apache.cassandra.cql3.QueryHandler;
+ import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.statements.ParsedStatement;
+ import org.apache.cassandra.distributed.api.ICluster;
+ import org.apache.cassandra.distributed.api.IInvokableInstance;
+ import org.apache.cassandra.exceptions.InvalidRequestException;
+ import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.QueryState;
+ import org.apache.cassandra.transport.messages.ResultMessage;
+ import org.apache.cassandra.utils.FBUtilities;
+
+ import static net.bytebuddy.matcher.ElementMatchers.named;
+ import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+ import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+ import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+ import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+ import static org.apache.cassandra.distributed.shared.AssertUtils.fail;
+
+ public class ReprepareTestBase extends TestBaseImpl
+ {
+ protected static ReprepareTestConfiguration cfg(boolean withUse, boolean skipBrokenBehaviours)
+ {
+ return new ReprepareTestConfiguration(withUse, skipBrokenBehaviours);
+ }
+
+ public void testReprepare(BiConsumer<ClassLoader, Integer> instanceInitializer, ReprepareTestConfiguration... configs) throws Throwable
+ {
+ try (ICluster<IInvokableInstance> c = init(builder().withNodes(2)
+ .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))
+ .withInstanceInitializer(instanceInitializer)
+ .start()))
+ {
+ ForceHostLoadBalancingPolicy lbp = new ForceHostLoadBalancingPolicy();
+ c.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck));"));
+
+ for (ReprepareTestConfiguration config : configs)
+ {
+ // 1 has old behaviour
+ for (int firstContact : new int[]{ 1, 2 })
+ {
+ try (com.datastax.driver.core.Cluster cluster = com.datastax.driver.core.Cluster.builder()
+ .addContactPoint("127.0.0.1")
+ .addContactPoint("127.0.0.2")
+ .withLoadBalancingPolicy(lbp)
+ .build();
+ Session session = cluster.connect())
+ {
+ lbp.setPrimary(firstContact);
+ final PreparedStatement select = session.prepare(withKeyspace("SELECT * FROM %s.tbl"));
+ session.execute(select.bind());
+
+ c.stream().forEach((i) -> i.runOnInstance(QueryProcessor::clearPreparedStatementsCache));
+
+ lbp.setPrimary(firstContact == 1 ? 2 : 1);
+
+ if (config.withUse)
+ session.execute(withKeyspace("USE %s"));
+
+ // Re-preparing on the node
+ if (!config.skipBrokenBehaviours && firstContact == 1)
+ session.execute(select.bind());
+
+ c.stream().forEach((i) -> i.runOnInstance(QueryProcessor::clearPreparedStatementsCache));
+
+ lbp.setPrimary(firstContact);
+
+ // Re-preparing on the node with old behaviour will break no matter where the statement was initially prepared
+ if (!config.skipBrokenBehaviours)
+ session.execute(select.bind());
+
+ c.stream().forEach((i) -> i.runOnInstance(QueryProcessor::clearPreparedStatementsCache));
+ }
+ }
+ }
+ }
+ }
+
+ public void testReprepareTwoKeyspaces(BiConsumer<ClassLoader, Integer> instanceInitializer) throws Throwable
+ {
+ try (ICluster<IInvokableInstance> c = init(builder().withNodes(2)
+ .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))
+ .withInstanceInitializer(instanceInitializer)
+ .start()))
+ {
+ c.schemaChange(withKeyspace("CREATE KEYSPACE %s2 WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2};"));
+ c.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck));"));
+
+ ForceHostLoadBalancingPolicy lbp = new ForceHostLoadBalancingPolicy();
+
+ for (int firstContact : new int[]{ 1, 2 })
+ try (com.datastax.driver.core.Cluster cluster = com.datastax.driver.core.Cluster.builder()
+ .addContactPoint("127.0.0.1")
+ .addContactPoint("127.0.0.2")
+ .withLoadBalancingPolicy(lbp)
+ .build();
+ Session session = cluster.connect())
+ {
+ {
+ session.execute(withKeyspace("USE %s"));
+ c.stream().forEach((i) -> i.runOnInstance(QueryProcessor::clearPreparedStatementsCache));
+
+ lbp.setPrimary(firstContact);
+ final PreparedStatement select = session.prepare(withKeyspace("SELECT * FROM %s.tbl"));
+ session.execute(select.bind());
+
+ c.stream().forEach((i) -> i.runOnInstance(QueryProcessor::clearPreparedStatementsCache));
+
+ lbp.setPrimary(firstContact == 1 ? 2 : 1);
+ session.execute(withKeyspace("USE %s2"));
+ try
+ {
+ session.execute(select.bind());
+ }
+ catch (DriverInternalError e)
+ {
+ Assert.assertTrue(e.getCause().getMessage().contains("can't execute it on"));
+ continue;
+ }
+ fail("Should have thrown");
+ }
+ }
+ }
+ }
+
+ protected static class ReprepareTestConfiguration
+ {
+ protected final boolean withUse;
+ protected final boolean skipBrokenBehaviours;
+
+ protected ReprepareTestConfiguration(boolean withUse, boolean skipBrokenBehaviours)
+ {
+ this.withUse = withUse;
+ this.skipBrokenBehaviours = skipBrokenBehaviours;
+ }
+ }
+
+ public static class PrepareBehaviour
+ {
+ protected static void setReleaseVersion(ClassLoader cl, String value)
+ {
+ new ByteBuddy().rebase(FBUtilities.class)
+ .method(named("getReleaseVersionString"))
+ .intercept(FixedValue.value(value))
+ .make()
+ .load(cl, ClassLoadingStrategy.Default.INJECTION);
+ }
+
+ static void newBehaviour(ClassLoader cl, int nodeNumber)
+ {
+ setReleaseVersion(cl, "3.0.19.63");
+ }
+
+ static void oldBehaviour(ClassLoader cl, int nodeNumber)
+ {
+ if (nodeNumber == 1)
+ {
+ new ByteBuddy().rebase(QueryProcessor.class) // note that we need to `rebase` when we use @SuperCall
+ .method(named("prepare").and(takesArguments(2)))
+ .intercept(MethodDelegation.to(PrepareBehaviour.class))
+ .make()
+ .load(cl, ClassLoadingStrategy.Default.INJECTION);
+ setReleaseVersion(cl, "3.0.19.60");
+ }
+ else
+ {
+ setReleaseVersion(cl, "3.0.19.63");
+ }
+ }
+
- public static ResultMessage.Prepared prepare(String queryString, QueryState queryState)
++ public static ResultMessage.Prepared prepare(String queryString, ClientState clientState)
+ {
- ClientState clientState = queryState.getClientState();
- ResultMessage.Prepared existing = QueryProcessor.getStoredPreparedStatement(queryString, clientState.getRawKeyspace(), false);
++ ResultMessage.Prepared existing = QueryProcessor.getStoredPreparedStatement(queryString, clientState.getRawKeyspace());
+ if (existing != null)
+ return existing;
+
- ParsedStatement.Prepared prepared = QueryProcessor.getStatement(queryString, clientState);
- int boundTerms = prepared.statement.getBoundTerms();
++ CQLStatement statement = QueryProcessor.getStatement(queryString, clientState);
++ QueryHandler.Prepared prepared = new QueryHandler.Prepared(statement, queryString);
++
++ int boundTerms = statement.getBindVariables().size();
+ if (boundTerms > FBUtilities.MAX_UNSIGNED_SHORT)
+ throw new InvalidRequestException(String.format("Too many markers(?). %d markers exceed the allowed maximum of %d", boundTerms, FBUtilities.MAX_UNSIGNED_SHORT));
- assert boundTerms == prepared.boundNames.size();
+
- return QueryProcessor.storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared, false);
++ return QueryProcessor.storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared);
+ }
-
+ }
+
+ protected static class ForceHostLoadBalancingPolicy implements LoadBalancingPolicy {
+
+ protected final List<Host> hosts = new CopyOnWriteArrayList<Host>();
+ protected int currentPrimary = 0;
+
+ public void setPrimary(int idx) {
+ this.currentPrimary = idx - 1; // arrays are 0-based
+ }
+
+ @Override
+ public void init(Cluster cluster, Collection<Host> hosts) {
+ this.hosts.addAll(hosts);
+ this.hosts.sort(Comparator.comparingInt(h -> h.getAddress().getAddress()[3]));
+ }
+
+ @Override
+ public HostDistance distance(Host host) {
+ return HostDistance.LOCAL;
+ }
+
+ @Override
+ public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement statement) {
+ if (hosts.isEmpty()) return Collections.emptyIterator();
+ return Iterators.singletonIterator(hosts.get(currentPrimary));
+ }
+
+ @Override
+ public void onAdd(Host host) {
+ onUp(host);
+ }
+
+ @Override
+ public void onUp(Host host) {
+ hosts.add(host);
+ }
+
+ @Override
+ public void onDown(Host host) {
+ // no-op
+ }
+
+ @Override
+ public void onRemove(Host host) {
+ // no-op
+ }
+
+ @Override
+ public void close() {
+ // no-op
+ }
+ }
+ }
diff --cc test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
index ef705bd,e01b812..d885071
--- a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
@@@ -218,24 -117,18 +218,24 @@@ public class PreparedStatementsTest ext
session.execute(preparedInsert.bind(1, 1, "value"));
assertEquals(1, session.execute(preparedSelect.bind(1)).all().size());
- cluster.close();
-
- cluster = Cluster.builder().addContactPoint("127.0.0.1")
- .withPort(DatabaseDescriptor.getNativeTransportPort())
- .build();
- session = cluster.connect();
-
- preparedInsert = session.prepare(insertCQL);
- preparedSelect = session.prepare(selectCQL);
- session.execute(preparedInsert.bind(1, 1, "value"));
+ try (Cluster newCluster = Cluster.builder()
+ .addContactPoints(nativeAddr)
+ .withClusterName("Test Cluster")
+ .withPort(nativePort)
+ .withoutJMXReporting()
+ .allowBetaProtocolVersion()
+ .build())
+ {
+ try (Session newSession = newCluster.connect())
+ {
+ newSession.execute("USE " + keyspace());
+ preparedInsert = newSession.prepare(insertCQL);
+ preparedSelect = newSession.prepare(selectCQL);
- session.execute(preparedInsert.bind(1, 1, "value"));
++ newSession.execute(preparedInsert.bind(1, 1, "value"));
- assertEquals(1, session.execute(preparedSelect.bind(1)).all().size());
- assertEquals(1, session.execute(preparedSelect.bind(1)).all().size());
++ assertEquals(1, newSession.execute(preparedSelect.bind(1)).all().size());
+ }
+ }
}
@Test
diff --cc test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
index eca6c20,753d6ff..c95728b
--- a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
@@@ -185,6 -187,6 +186,7 @@@ public class PstmtPersistenceTest exten
private MD5Digest prepareStatement(String stmt, String keyspace, String table, ClientState clientState)
{
- return QueryProcessor.prepare(String.format(stmt, keyspace + "." + table), clientState, false).statementId;
++ System.out.println(stmt + String.format(stmt, keyspace + "." + table));
+ return QueryProcessor.prepare(String.format(stmt, keyspace + "." + table), clientState).statementId;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org