You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2022/07/25 21:35:17 UTC
[cassandra] branch trunk updated: When bootstrap fails, CassandraRoleManager may attempt to do read queries that fail with "Cannot read from a bootstrapping node", and increments unavailables counters
This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new e0a6b83a02 When bootstrap fails, CassandraRoleManager may attempt to do read queries that fail with "Cannot read from a bootstrapping node", and increments unavailables counters
e0a6b83a02 is described below
commit e0a6b83a02804bf976fdc43718001f23818ee53d
Author: David Capwell <dc...@apache.org>
AuthorDate: Mon Jul 25 12:26:35 2022 -0700
When bootstrap fails, CassandraRoleManager may attempt to do read queries that fail with "Cannot read from a bootstrapping node", and increments unavailables counters
patch by David Capwell; reviewed by Sam Tunnicliffe for CASSANDRA-17754
---
CHANGES.txt | 1 +
.../cassandra/auth/CassandraRoleManager.java | 7 ++
.../org/apache/cassandra/service/StorageProxy.java | 19 ++-
.../cassandra/distributed/shared/ClusterUtils.java | 46 +++++++
.../test/hostreplacement/FailedBootstrapTest.java | 138 +++++++++++++++++++++
.../test/hostreplacement/HostReplacementTest.java | 3 +
6 files changed, 212 insertions(+), 2 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 33753cb531..63e8fdd328 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.2
+ * When bootstrap fails, CassandraRoleManager may attempt to do read queries that fail with "Cannot read from a bootstrapping node", and increments unavailables counters (CASSANDRA-17754)
* Add guardrail to disallow DROP KEYSPACE commands (CASSANDRA-17767)
* Remove ephemeral snapshot marker file and introduce a flag to SnapshotManifest (CASSANDRA-16911)
* Add a virtual table that exposes currently running queries (CASSANDRA-15241)
diff --git a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
index 0344de921d..c2272707ec 100644
--- a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
+++ b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
@@ -43,6 +43,7 @@ import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -386,6 +387,12 @@ public class CassandraRoleManager implements IRoleManager
{
// The delay is to give the node a chance to see its peers before attempting the operation
ScheduledExecutors.optionalTasks.scheduleSelfRecurring(() -> {
+ if (!StorageProxy.isSafeToPerformRead())
+ {
+ logger.trace("Setup task may not run due to it not being safe to perform reads... rescheduling");
+ scheduleSetupTask(setupTask);
+ return;
+ }
try
{
setupTask.call();
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 557382df54..e89bdae717 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1823,7 +1823,7 @@ public class StorageProxy implements StorageProxyMBean
public static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException
{
- if (StorageService.instance.isBootstrapMode() && !systemKeyspaceQuery(group.queries))
+ if (!isSafeToPerformRead(group.queries))
{
readMetrics.unavailables.mark();
readMetricsForLevel(consistencyLevel).unavailables.mark();
@@ -1850,6 +1850,16 @@ public class StorageProxy implements StorageProxyMBean
: readRegular(group, consistencyLevel, queryStartNanoTime);
}
+ public static boolean isSafeToPerformRead(List<SinglePartitionReadCommand> queries)
+ {
+ return isSafeToPerformRead() || systemKeyspaceQuery(queries);
+ }
+
+ public static boolean isSafeToPerformRead()
+ {
+ return !StorageService.instance.isBootstrapMode();
+ }
+
private static PartitionIterator readWithPaxos(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
throws InvalidRequestException, UnavailableException, ReadFailureException, ReadTimeoutException
{
@@ -2619,8 +2629,13 @@ public class StorageProxy implements StorageProxyMBean
public static void logRequestException(Exception exception, Collection<? extends ReadCommand> commands)
{
+ // Multiple different types of errors can happen, so by dedupping on the error type we can see each error
+ // case rather than just exposing the first error seen; this should make sure more rare issues are exposed
+ // rather than being hidden by more common errors such as timeout or unavailable
+ // see CASSANDRA-17754
+ String msg = exception.getClass().getSimpleName() + " \"{}\" while executing {}";
NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, FAILURE_LOGGING_INTERVAL_SECONDS, TimeUnit.SECONDS,
- "\"{}\" while executing {}",
+ msg,
() -> new Object[]
{
exception.getMessage(),
diff --git a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
index dc280f3085..d848d201dc 100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
+++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
@@ -30,6 +30,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
@@ -40,6 +41,10 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import com.google.common.util.concurrent.Futures;
+
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.io.util.File;
import org.junit.Assert;
@@ -554,6 +559,47 @@ public class ClusterUtils
});
}
+ public static void awaitGossipSchemaMatch(ICluster<? extends IInstance> cluster)
+ {
+ cluster.forEach(ClusterUtils::awaitGossipSchemaMatch);
+ }
+
+ public static void awaitGossipSchemaMatch(IInstance instance)
+ {
+ if (!instance.config().has(Feature.GOSSIP))
+ {
+ // when gosisp isn't enabled, don't bother waiting on gossip to settle...
+ return;
+ }
+ awaitGossip(instance, "Schema IDs did not match", all -> {
+ String current = null;
+ for (Map.Entry<String, Map<String, String>> e : all.entrySet())
+ {
+ Map<String, String> state = e.getValue();
+ // has the instance joined?
+ String status = state.get(ApplicationState.STATUS_WITH_PORT.name());
+ if (status == null)
+ status = state.get(ApplicationState.STATUS.name());
+ if (status == null || !status.contains(VersionedValue.STATUS_NORMAL))
+ continue; // ignore instances not joined yet
+ String schema = state.get("SCHEMA");
+ if (schema == null)
+ throw new AssertionError("Unable to find schema for " + e.getKey() + "; status was " + status);
+ schema = schema.split(":")[1];
+
+ if (current == null)
+ {
+ current = schema;
+ }
+ else if (!current.equals(schema))
+ {
+ return false;
+ }
+ }
+ return true;
+ });
+ }
+
/**
* Get the gossip information from the node. Currently only address, generation, and heartbeat are returned
*
diff --git a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/FailedBootstrapTest.java b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/FailedBootstrapTest.java
new file mode 100644
index 0000000000..56de092844
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/FailedBootstrapTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.hostreplacement;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import net.bytebuddy.implementation.bind.annotation.This;
+import org.apache.cassandra.auth.CassandraRoleManager;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.metrics.ClientRequestsMetricsHolder;
+import org.apache.cassandra.streaming.StreamException;
+import org.apache.cassandra.streaming.StreamResultFuture;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.cassandra.distributed.shared.ClusterUtils.replaceHostAndStart;
+import static org.apache.cassandra.distributed.shared.ClusterUtils.stopUnchecked;
+import static org.apache.cassandra.distributed.test.hostreplacement.HostReplacementTest.setupCluster;
+
+public class FailedBootstrapTest extends TestBaseImpl
+{
+ private static final Logger logger = LoggerFactory.getLogger(FailedBootstrapTest.class);
+
+ private static final int NODE_TO_REMOVE = 2;
+
+ @Test
+ public void roleSetupDoesNotProduceUnavailables() throws IOException
+ {
+ Cluster.Builder builder = Cluster.build(3)
+ .withConfig(c -> c.with(Feature.values()))
+ .withInstanceInitializer(BB::install);
+ TokenSupplier even = TokenSupplier.evenlyDistributedTokens(3, builder.getTokenCount());
+ builder = builder.withTokenSupplier((TokenSupplier) node -> even.tokens(node == 4 ? NODE_TO_REMOVE : node));
+ try (Cluster cluster = builder.start())
+ {
+ List<IInvokableInstance> alive = Arrays.asList(cluster.get(1), cluster.get(3));
+ IInvokableInstance nodeToRemove = cluster.get(NODE_TO_REMOVE);
+
+ setupCluster(cluster);
+
+ stopUnchecked(nodeToRemove);
+
+ // should fail to join, but should start up!
+ IInvokableInstance added = replaceHostAndStart(cluster, nodeToRemove, p -> p.setProperty("cassandra.superuser_setup_delay_ms", "1"));
+ // log gossip for debugging
+ alive.forEach(i -> {
+ NodeToolResult result = i.nodetoolResult("gossipinfo");
+ result.asserts().success();
+ logger.info("gossipinfo for node{}\n{}", i.config().num(), result.getStdout());
+ });
+
+ // CassandraRoleManager attempted to do distributed reads while bootstrap was still going (it failed, so still in bootstrap mode)
+ // so need to validate that is no longer happening and we incrementing org.apache.cassandra.metrics.ClientRequestMetrics.unavailables
+ // sleep larger than multiple retry attempts...
+ Awaitility.await()
+ .atMost(1, TimeUnit.MINUTES)
+ .until(() -> added.callOnInstance(() -> BB.SETUP_SCHEDULE_COUNTER.get()) >= 42); // why 42? just need something large enough to make sure multiple attempts happened
+
+ // do we have any read metrics have unavailables?
+ added.runOnInstance(() -> {
+ Assertions.assertThat(ClientRequestsMetricsHolder.readMetrics.unavailables.getCount()).describedAs("read unavailables").isEqualTo(0);
+ Assertions.assertThat(ClientRequestsMetricsHolder.casReadMetrics.unavailables.getCount()).describedAs("CAS read unavailables").isEqualTo(0);
+ });
+ }
+ }
+
+ public static class BB
+ {
+ public static void install(ClassLoader classLoader, Integer num)
+ {
+ if (num != 4)
+ return;
+
+ new ByteBuddy().rebase(StreamResultFuture.class)
+ .method(named("maybeComplete"))
+ .intercept(MethodDelegation.to(BB.class))
+ .make()
+ .load(classLoader, ClassLoadingStrategy.Default.INJECTION);
+
+ new ByteBuddy().rebase(CassandraRoleManager.class)
+ .method(named("scheduleSetupTask"))
+ .intercept(MethodDelegation.to(BB.class))
+ .make()
+ .load(classLoader, ClassLoadingStrategy.Default.INJECTION);
+ }
+
+ public static void maybeComplete(@This StreamResultFuture future) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException
+ {
+ Method method = future.getClass().getSuperclass().getSuperclass().getDeclaredMethod("tryFailure", Throwable.class);
+ method.setAccessible(true);
+ method.invoke(future, new StreamException(future.getCurrentState(), "Stream failed"));
+ }
+
+ private static final AtomicInteger SETUP_SCHEDULE_COUNTER = new AtomicInteger(0);
+ public static void scheduleSetupTask(final Callable<?> setupTask, @SuperCall Runnable fn)
+ {
+ SETUP_SCHEDULE_COUNTER.incrementAndGet();
+ fn.run();
+ }
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementTest.java b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementTest.java
index 3de0bf51d5..8219d43ad1 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementTest.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.distributed.api.TokenSupplier;
import org.apache.cassandra.distributed.shared.AssertUtils;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.assertj.core.api.Assertions;
@@ -210,6 +211,8 @@ public class HostReplacementTest extends TestBaseImpl
fixDistributedSchemas(cluster);
init(cluster);
+ ClusterUtils.awaitGossipSchemaMatch(cluster);
+
populate(cluster);
cluster.forEach(i -> i.flush(KEYSPACE));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org