You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/08/19 11:51:36 UTC
[cassandra] branch trunk updated: Handle errors in
StreamSession#prepare
This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 778771b Handle errors in StreamSession#prepare
778771b is described below
commit 778771bf7655e252d44341cd8ab2f39109e3756d
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Thu Jun 4 09:53:01 2020 +0200
Handle errors in StreamSession#prepare
Patch by marcuse; reviewed by Caleb Rackliffe and David Capwell
for CASSANDRA-15852
---
CHANGES.txt | 1 +
.../apache/cassandra/streaming/StreamSession.java | 11 ++-
.../distributed/test/StreamPrepareFailTest.java | 81 ++++++++++++++++++++++
3 files changed, 92 insertions(+), 1 deletion(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index f429128..a3d66cf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-beta2
+ * Handle errors in StreamSession#prepare (CASSANDRA-15852)
* FQL replay should have options to ignore DDL statements (CASSANDRA-16039)
* Remove COMPACT STORAGE internals (CASSANDRA-13994)
* Make TimestampSerializer accept fractional seconds of varying precision (CASSANDRA-15976)
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index f59eaa5..9c3a0ed 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -694,7 +694,16 @@ public class StreamSession implements IEndpointStateChangeSubscriber
{
// prepare tasks
state(State.PREPARING);
- ScheduledExecutors.nonPeriodicTasks.execute(() -> prepareAsync(requests, summaries));
+ ScheduledExecutors.nonPeriodicTasks.execute(() -> {
+ try
+ {
+ prepareAsync(requests, summaries);
+ }
+ catch (Exception e)
+ {
+ onError(e);
+ }
+ });
}
/**
diff --git a/test/distributed/org/apache/cassandra/distributed/test/StreamPrepareFailTest.java b/test/distributed/org/apache/cassandra/distributed/test/StreamPrepareFailTest.java
new file mode 100644
index 0000000..2553d83
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/StreamPrepareFailTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class StreamPrepareFailTest extends TestBaseImpl
+{
+ @Test
+ public void streamPrepareFailTest() throws Throwable
+ {
+ try(Cluster cluster = init(Cluster.build(2)
+ .withInstanceInitializer(StreamFailHelper::install)
+ .withConfig(config -> config.with(NETWORK, GOSSIP))
+ .start()))
+ {
+ cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
+ try
+ {
+ cluster.get(1).runOnInstance(() -> StorageService.instance.rebuild(null));
+ fail("rebuild should throw exception");
+ }
+ catch (RuntimeException e)
+ {
+ cluster.get(2).runOnInstance(() -> assertTrue(StreamFailHelper.thrown.get()));
+ assertTrue(e.getMessage().contains("Stream failed"));
+ }
+ }
+ }
+
+ public static class StreamFailHelper
+ {
+ static AtomicBoolean thrown = new AtomicBoolean();
+ static void install(ClassLoader cl, int nodeNumber)
+ {
+ new ByteBuddy().redefine(StreamSession.class)
+ .method(named("prepareAsync"))
+ .intercept(MethodDelegation.to(StreamFailHelper.class))
+ .make()
+ .load(cl, ClassLoadingStrategy.Default.INJECTION);
+ }
+ public static ResultMessage prepareAsync()
+ {
+ thrown.set(true);
+ throw new RuntimeException();
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org