You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2022/07/25 21:35:17 UTC

[cassandra] branch trunk updated: When bootstrap fails, CassandraRoleManager may attempt to do read queries that fail with "Cannot read from a bootstrapping node", and increments unavailables counters

This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e0a6b83a02 When bootstrap fails, CassandraRoleManager may attempt to do read queries that fail with "Cannot read from a bootstrapping node", and increments unavailables counters
e0a6b83a02 is described below

commit e0a6b83a02804bf976fdc43718001f23818ee53d
Author: David Capwell <dc...@apache.org>
AuthorDate: Mon Jul 25 12:26:35 2022 -0700

    When bootstrap fails, CassandraRoleManager may attempt to do read queries that fail with "Cannot read from a bootstrapping node", and increments unavailables counters
    
    patch by David Capwell; reviewed by Sam Tunnicliffe for CASSANDRA-17754
---
 CHANGES.txt                                        |   1 +
 .../cassandra/auth/CassandraRoleManager.java       |   7 ++
 .../org/apache/cassandra/service/StorageProxy.java |  19 ++-
 .../cassandra/distributed/shared/ClusterUtils.java |  46 +++++++
 .../test/hostreplacement/FailedBootstrapTest.java  | 138 +++++++++++++++++++++
 .../test/hostreplacement/HostReplacementTest.java  |   3 +
 6 files changed, 212 insertions(+), 2 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 33753cb531..63e8fdd328 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.2
+ * When bootstrap fails, CassandraRoleManager may attempt to do read queries that fail with "Cannot read from a bootstrapping node", and increments unavailables counters (CASSANDRA-17754)
  * Add guardrail to disallow DROP KEYSPACE commands (CASSANDRA-17767)
  * Remove ephemeral snapshot marker file and introduce a flag to SnapshotManifest (CASSANDRA-16911)
  * Add a virtual table that exposes currently running queries (CASSANDRA-15241)
diff --git a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
index 0344de921d..c2272707ec 100644
--- a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
+++ b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
@@ -43,6 +43,7 @@ import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -386,6 +387,12 @@ public class CassandraRoleManager implements IRoleManager
     {
         // The delay is to give the node a chance to see its peers before attempting the operation
         ScheduledExecutors.optionalTasks.scheduleSelfRecurring(() -> {
+            if (!StorageProxy.isSafeToPerformRead())
+            {
+                logger.trace("Setup task may not run due to it not being safe to perform reads... rescheduling");
+                scheduleSetupTask(setupTask);
+                return;
+            }
             try
             {
                 setupTask.call();
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 557382df54..e89bdae717 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1823,7 +1823,7 @@ public class StorageProxy implements StorageProxyMBean
     public static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
     throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException
     {
-        if (StorageService.instance.isBootstrapMode() && !systemKeyspaceQuery(group.queries))
+        if (!isSafeToPerformRead(group.queries))
         {
             readMetrics.unavailables.mark();
             readMetricsForLevel(consistencyLevel).unavailables.mark();
@@ -1850,6 +1850,16 @@ public class StorageProxy implements StorageProxyMBean
              : readRegular(group, consistencyLevel, queryStartNanoTime);
     }
 
+    public static boolean isSafeToPerformRead(List<SinglePartitionReadCommand> queries)
+    {
+        return isSafeToPerformRead() || systemKeyspaceQuery(queries);
+    }
+
+    public static boolean isSafeToPerformRead()
+    {
+        return !StorageService.instance.isBootstrapMode();
+    }
+
     private static PartitionIterator readWithPaxos(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
     throws InvalidRequestException, UnavailableException, ReadFailureException, ReadTimeoutException
     {
@@ -2619,8 +2629,13 @@ public class StorageProxy implements StorageProxyMBean
 
     public static void logRequestException(Exception exception, Collection<? extends ReadCommand> commands)
     {
+        // Multiple different types of errors can happen, so by dedupping on the error type we can see each error
+        // case rather than just exposing the first error seen; this should make sure more rare issues are exposed
+        // rather than being hidden by more common errors such as timeout or unavailable
+        // see CASSANDRA-17754
+        String msg = exception.getClass().getSimpleName() + " \"{}\" while executing {}";
         NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, FAILURE_LOGGING_INTERVAL_SECONDS, TimeUnit.SECONDS,
-                         "\"{}\" while executing {}",
+                         msg,
                          () -> new Object[]
                                {
                                    exception.getMessage(),
diff --git a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
index dc280f3085..d848d201dc 100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
+++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
@@ -40,6 +41,10 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import com.google.common.util.concurrent.Futures;
+
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.io.util.File;
 import org.junit.Assert;
 
@@ -554,6 +559,47 @@ public class ClusterUtils
         });
     }
 
+    public static void awaitGossipSchemaMatch(ICluster<? extends  IInstance> cluster)
+    {
+        cluster.forEach(ClusterUtils::awaitGossipSchemaMatch);
+    }
+
+    public static void awaitGossipSchemaMatch(IInstance instance)
+    {
+        if (!instance.config().has(Feature.GOSSIP))
+        {
+            // when gosisp isn't enabled, don't bother waiting on gossip to settle...
+            return;
+        }
+        awaitGossip(instance, "Schema IDs did not match", all -> {
+            String current = null;
+            for (Map.Entry<String, Map<String, String>> e : all.entrySet())
+            {
+                Map<String, String> state = e.getValue();
+                // has the instance joined?
+                String status = state.get(ApplicationState.STATUS_WITH_PORT.name());
+                if (status == null)
+                    status = state.get(ApplicationState.STATUS.name());
+                if (status == null || !status.contains(VersionedValue.STATUS_NORMAL))
+                    continue; // ignore instances not joined yet
+                String schema = state.get("SCHEMA");
+                if (schema == null)
+                    throw new AssertionError("Unable to find schema for " + e.getKey() + "; status was " + status);
+                schema = schema.split(":")[1];
+
+                if (current == null)
+                {
+                    current = schema;
+                }
+                else if (!current.equals(schema))
+                {
+                    return false;
+                }
+            }
+            return true;
+        });
+    }
+
     /**
      * Get the gossip information from the node.  Currently only address, generation, and heartbeat are returned
      *
diff --git a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/FailedBootstrapTest.java b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/FailedBootstrapTest.java
new file mode 100644
index 0000000000..56de092844
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/FailedBootstrapTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.hostreplacement;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import net.bytebuddy.implementation.bind.annotation.This;
+import org.apache.cassandra.auth.CassandraRoleManager;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.metrics.ClientRequestsMetricsHolder;
+import org.apache.cassandra.streaming.StreamException;
+import org.apache.cassandra.streaming.StreamResultFuture;
+import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.cassandra.distributed.shared.ClusterUtils.replaceHostAndStart;
+import static org.apache.cassandra.distributed.shared.ClusterUtils.stopUnchecked;
+import static org.apache.cassandra.distributed.test.hostreplacement.HostReplacementTest.setupCluster;
+
+public class FailedBootstrapTest extends TestBaseImpl
+{
+    private static final Logger logger = LoggerFactory.getLogger(FailedBootstrapTest.class);
+
+    private static final int NODE_TO_REMOVE = 2;
+
+    @Test
+    public void roleSetupDoesNotProduceUnavailables() throws IOException
+    {
+        Cluster.Builder builder = Cluster.build(3)
+                                         .withConfig(c -> c.with(Feature.values()))
+                                         .withInstanceInitializer(BB::install);
+        TokenSupplier even = TokenSupplier.evenlyDistributedTokens(3, builder.getTokenCount());
+        builder = builder.withTokenSupplier((TokenSupplier) node -> even.tokens(node == 4 ? NODE_TO_REMOVE : node));
+        try (Cluster cluster = builder.start())
+        {
+            List<IInvokableInstance> alive = Arrays.asList(cluster.get(1), cluster.get(3));
+            IInvokableInstance nodeToRemove = cluster.get(NODE_TO_REMOVE);
+
+            setupCluster(cluster);
+
+            stopUnchecked(nodeToRemove);
+
+            // should fail to join, but should start up!
+            IInvokableInstance added = replaceHostAndStart(cluster, nodeToRemove, p -> p.setProperty("cassandra.superuser_setup_delay_ms", "1"));
+            // log gossip for debugging
+            alive.forEach(i -> {
+                NodeToolResult result = i.nodetoolResult("gossipinfo");
+                result.asserts().success();
+                logger.info("gossipinfo for node{}\n{}", i.config().num(), result.getStdout());
+            });
+
+            // CassandraRoleManager attempted to do distributed reads while bootstrap was still going (it failed, so still in bootstrap mode)
+            // so need to validate that is no longer happening and we incrementing org.apache.cassandra.metrics.ClientRequestMetrics.unavailables
+            // sleep larger than multiple retry attempts...
+            Awaitility.await()
+                      .atMost(1, TimeUnit.MINUTES)
+                      .until(() -> added.callOnInstance(() -> BB.SETUP_SCHEDULE_COUNTER.get()) >= 42); // why 42?  just need something large enough to make sure multiple attempts happened
+
+            // do we have any read metrics have unavailables?
+            added.runOnInstance(() -> {
+                Assertions.assertThat(ClientRequestsMetricsHolder.readMetrics.unavailables.getCount()).describedAs("read unavailables").isEqualTo(0);
+                Assertions.assertThat(ClientRequestsMetricsHolder.casReadMetrics.unavailables.getCount()).describedAs("CAS read unavailables").isEqualTo(0);
+            });
+        }
+    }
+
+    public static class BB
+    {
+        public static void install(ClassLoader classLoader, Integer num)
+        {
+            if (num != 4)
+                return;
+
+            new ByteBuddy().rebase(StreamResultFuture.class)
+                           .method(named("maybeComplete"))
+                           .intercept(MethodDelegation.to(BB.class))
+                           .make()
+                           .load(classLoader, ClassLoadingStrategy.Default.INJECTION);
+
+            new ByteBuddy().rebase(CassandraRoleManager.class)
+                           .method(named("scheduleSetupTask"))
+                           .intercept(MethodDelegation.to(BB.class))
+                           .make()
+                           .load(classLoader, ClassLoadingStrategy.Default.INJECTION);
+        }
+
+        public static void maybeComplete(@This StreamResultFuture future) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException
+        {
+            Method method = future.getClass().getSuperclass().getSuperclass().getDeclaredMethod("tryFailure", Throwable.class);
+            method.setAccessible(true);
+            method.invoke(future, new StreamException(future.getCurrentState(), "Stream failed"));
+        }
+
+        private static final AtomicInteger SETUP_SCHEDULE_COUNTER = new AtomicInteger(0);
+        public static void scheduleSetupTask(final Callable<?> setupTask, @SuperCall Runnable fn)
+        {
+            SETUP_SCHEDULE_COUNTER.incrementAndGet();
+            fn.run();
+        }
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementTest.java b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementTest.java
index 3de0bf51d5..8219d43ad1 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementTest.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.api.SimpleQueryResult;
 import org.apache.cassandra.distributed.api.TokenSupplier;
 import org.apache.cassandra.distributed.shared.AssertUtils;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
 import org.apache.cassandra.distributed.test.TestBaseImpl;
 import org.assertj.core.api.Assertions;
 
@@ -210,6 +211,8 @@ public class HostReplacementTest extends TestBaseImpl
         fixDistributedSchemas(cluster);
         init(cluster);
 
+        ClusterUtils.awaitGossipSchemaMatch(cluster);
+
         populate(cluster);
         cluster.forEach(i -> i.flush(KEYSPACE));
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org