You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/12/15 09:56:27 UTC

[cassandra] branch trunk updated: Make sure OOM errors are rethrown on truncation failure

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 242ab77  Make sure OOM errors are rethrown on truncation failure
242ab77 is described below

commit 242ab778f2decbf15e34ab4c28308274f4e01d01
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Mon Nov 9 15:09:44 2020 +0100

    Make sure OOM errors are rethrown on truncation failure
    
    Patch by marcuse; reviewed by Benjamin Lerer and Berenguer Blasi for CASSANDRA-16254
---
 CHANGES.txt                                        |  1 +
 .../apache/cassandra/db/TruncateVerbHandler.java   | 21 +-----
 .../cassandra/service/TruncateResponseHandler.java | 25 ++++---
 .../distributed/test/FailingTruncationTest.java    | 79 ++++++++++++++++++++++
 4 files changed, 99 insertions(+), 27 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 0e29bca..866a180 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-beta4
+ * Make sure OOM errors are rethrown on truncation failure (CASSANDRA-16254)
  * Send back client warnings when creating too many tables/keyspaces (CASSANDRA-16309)
  * Add dedicated tcp user timeout for streaming connection (CASSANDRA-16143)
  * Add generatetokens script for offline token allocation strategy generation (CASSANDRA-16205)
diff --git a/src/java/org/apache/cassandra/db/TruncateVerbHandler.java b/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
index 0d71464..169f39d 100644
--- a/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
@@ -36,29 +35,13 @@ public class TruncateVerbHandler implements IVerbHandler<TruncateRequest>
     {
         TruncateRequest truncation = message.payload;
         Tracing.trace("Applying truncation of {}.{}", truncation.keyspace, truncation.table);
-        try
-        {
-            ColumnFamilyStore cfs = Keyspace.open(truncation.keyspace).getColumnFamilyStore(truncation.table);
-            cfs.truncateBlocking();
-        }
-        catch (Throwable throwable)
-        {
-            logger.error("Error in truncation", throwable);
-            respondError(truncation, message);
 
-            if (FSError.findNested(throwable) != null)
-                throw FSError.findNested(throwable);
-        }
+        ColumnFamilyStore cfs = Keyspace.open(truncation.keyspace).getColumnFamilyStore(truncation.table);
+        cfs.truncateBlocking();
         Tracing.trace("Enqueuing response to truncate operation to {}", message.from());
 
         TruncateResponse response = new TruncateResponse(truncation.keyspace, truncation.table, true);
         logger.trace("{} applied.  Enqueuing response to {}@{} ", truncation, message.id(), message.from());
         MessagingService.instance().send(message.responseWith(response), message.from());
     }
-
-    private static void respondError(TruncateRequest truncation, Message truncateRequestMessage)
-    {
-        TruncateResponse response = new TruncateResponse(truncation.keyspace, truncation.table, false);
-        MessagingService.instance().send(truncateRequestMessage.responseWith(response), truncateRequestMessage.from());
-    }
 }
diff --git a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
index c2651e6..60e8d0b 100644
--- a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
@@ -26,7 +26,9 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.TruncateResponse;
+import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.exceptions.TruncateException;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.RequestCallback;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
@@ -76,18 +78,25 @@ public class TruncateResponseHandler implements RequestCallback<TruncateResponse
         }
     }
 
+    @Override
     public void onResponse(Message<TruncateResponse> message)
     {
-        // If the truncation hasn't succeeded on some replica, abort and indicate this back to the client.
-        if (!message.payload.success)
-        {
-            truncateFailingReplica = message.from().address;
-            condition.signalAll();
-            return;
-        }
-
         responses.incrementAndGet();
         if (responses.get() >= responseCount)
             condition.signalAll();
     }
+
+    @Override
+    public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason)
+    {
+        // If the truncation hasn't succeeded on some replica, abort and indicate this back to the client.
+        truncateFailingReplica = from.address;
+        condition.signalAll();
+    }
+
+    @Override
+    public boolean invokeOnFailure()
+    {
+        return true;
+    }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/FailingTruncationTest.java b/test/distributed/org/apache/cassandra/distributed/test/FailingTruncationTest.java
new file mode 100644
index 0000000..bcd184e
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/FailingTruncationTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class FailingTruncationTest extends TestBaseImpl
+{
+    @Test
+    public void testFailingTruncation() throws IOException
+    {
+        try(Cluster cluster = init(Cluster.build(2)
+                                          .withInstanceInitializer(BBFailHelper::install)
+                                          .start()))
+        {
+            cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
+            try
+            {
+                cluster.coordinator(1).execute("TRUNCATE " + KEYSPACE + ".tbl", ConsistencyLevel.ALL);
+                fail("Truncate should fail on node 2");
+            }
+            catch (Exception e)
+            {
+                assertTrue(e.getMessage().contains("Truncate failed on replica /127.0.0.2"));
+            }
+        }
+
+    }
+
+    public static class BBFailHelper
+    {
+        static void install(ClassLoader cl, int nodeNumber)
+        {
+            if (nodeNumber == 2)
+            {
+                new ByteBuddy().redefine(ColumnFamilyStore.class)
+                               .method(named("truncateBlocking"))
+                               .intercept(MethodDelegation.to(BBFailHelper.class))
+                               .make()
+                               .load(cl, ClassLoadingStrategy.Default.INJECTION);
+            }
+        }
+
+        public static void truncateBlocking()
+        {
+            throw new RuntimeException();
+        }
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org