You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/12/15 09:56:27 UTC
[cassandra] branch trunk updated: Make sure OOM errors are rethrown
on truncation failure
This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 242ab77 Make sure OOM errors are rethrown on truncation failure
242ab77 is described below
commit 242ab778f2decbf15e34ab4c28308274f4e01d01
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Mon Nov 9 15:09:44 2020 +0100
Make sure OOM errors are rethrown on truncation failure
Patch by marcuse; reviewed by Benjamin Lerer and Berenguer Blasi for CASSANDRA-16254
---
CHANGES.txt | 1 +
.../apache/cassandra/db/TruncateVerbHandler.java | 21 +-----
.../cassandra/service/TruncateResponseHandler.java | 25 ++++---
.../distributed/test/FailingTruncationTest.java | 79 ++++++++++++++++++++++
4 files changed, 99 insertions(+), 27 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 0e29bca..866a180 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-beta4
+ * Make sure OOM errors are rethrown on truncation failure (CASSANDRA-16254)
* Send back client warnings when creating too many tables/keyspaces (CASSANDRA-16309)
* Add dedicated tcp user timeout for streaming connection (CASSANDRA-16143)
* Add generatetokens script for offline token allocation strategy generation (CASSANDRA-16205)
diff --git a/src/java/org/apache/cassandra/db/TruncateVerbHandler.java b/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
index 0d71464..169f39d 100644
--- a/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.io.FSError;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
@@ -36,29 +35,13 @@ public class TruncateVerbHandler implements IVerbHandler<TruncateRequest>
{
TruncateRequest truncation = message.payload;
Tracing.trace("Applying truncation of {}.{}", truncation.keyspace, truncation.table);
- try
- {
- ColumnFamilyStore cfs = Keyspace.open(truncation.keyspace).getColumnFamilyStore(truncation.table);
- cfs.truncateBlocking();
- }
- catch (Throwable throwable)
- {
- logger.error("Error in truncation", throwable);
- respondError(truncation, message);
- if (FSError.findNested(throwable) != null)
- throw FSError.findNested(throwable);
- }
+ ColumnFamilyStore cfs = Keyspace.open(truncation.keyspace).getColumnFamilyStore(truncation.table);
+ cfs.truncateBlocking();
Tracing.trace("Enqueuing response to truncate operation to {}", message.from());
TruncateResponse response = new TruncateResponse(truncation.keyspace, truncation.table, true);
logger.trace("{} applied. Enqueuing response to {}@{} ", truncation, message.id(), message.from());
MessagingService.instance().send(message.responseWith(response), message.from());
}
-
- private static void respondError(TruncateRequest truncation, Message truncateRequestMessage)
- {
- TruncateResponse response = new TruncateResponse(truncation.keyspace, truncation.table, false);
- MessagingService.instance().send(truncateRequestMessage.responseWith(response), truncateRequestMessage.from());
- }
}
diff --git a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
index c2651e6..60e8d0b 100644
--- a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
@@ -26,7 +26,9 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.TruncateResponse;
+import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.exceptions.TruncateException;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.RequestCallback;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
@@ -76,18 +78,25 @@ public class TruncateResponseHandler implements RequestCallback<TruncateResponse
}
}
+ @Override
public void onResponse(Message<TruncateResponse> message)
{
- // If the truncation hasn't succeeded on some replica, abort and indicate this back to the client.
- if (!message.payload.success)
- {
- truncateFailingReplica = message.from().address;
- condition.signalAll();
- return;
- }
-
responses.incrementAndGet();
if (responses.get() >= responseCount)
condition.signalAll();
}
+
+ @Override
+ public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason)
+ {
+ // If the truncation hasn't succeeded on some replica, abort and indicate this back to the client.
+ truncateFailingReplica = from.address;
+ condition.signalAll();
+ }
+
+ @Override
+ public boolean invokeOnFailure()
+ {
+ return true;
+ }
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/FailingTruncationTest.java b/test/distributed/org/apache/cassandra/distributed/test/FailingTruncationTest.java
new file mode 100644
index 0000000..bcd184e
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/FailingTruncationTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class FailingTruncationTest extends TestBaseImpl
+{
+ @Test
+ public void testFailingTruncation() throws IOException
+ {
+ try(Cluster cluster = init(Cluster.build(2)
+ .withInstanceInitializer(BBFailHelper::install)
+ .start()))
+ {
+ cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
+ try
+ {
+ cluster.coordinator(1).execute("TRUNCATE " + KEYSPACE + ".tbl", ConsistencyLevel.ALL);
+ fail("Truncate should fail on node 2");
+ }
+ catch (Exception e)
+ {
+ assertTrue(e.getMessage().contains("Truncate failed on replica /127.0.0.2"));
+ }
+ }
+
+ }
+
+ public static class BBFailHelper
+ {
+ static void install(ClassLoader cl, int nodeNumber)
+ {
+ if (nodeNumber == 2)
+ {
+ new ByteBuddy().redefine(ColumnFamilyStore.class)
+ .method(named("truncateBlocking"))
+ .intercept(MethodDelegation.to(BBFailHelper.class))
+ .make()
+ .load(cl, ClassLoadingStrategy.Default.INJECTION);
+ }
+ }
+
+ public static void truncateBlocking()
+ {
+ throw new RuntimeException();
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org