You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/08/19 11:51:36 UTC

[cassandra] branch trunk updated: Handle errors in StreamSession#prepare

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 778771b  Handle errors in StreamSession#prepare
778771b is described below

commit 778771bf7655e252d44341cd8ab2f39109e3756d
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Thu Jun 4 09:53:01 2020 +0200

    Handle errors in StreamSession#prepare
    
    Patch by marcuse; reviewed by Caleb Rackliffe and David Capwell
    for CASSANDRA-15852
---
 CHANGES.txt                                        |  1 +
 .../apache/cassandra/streaming/StreamSession.java  | 11 ++-
 .../distributed/test/StreamPrepareFailTest.java    | 81 ++++++++++++++++++++++
 3 files changed, 92 insertions(+), 1 deletion(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index f429128..a3d66cf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-beta2
+ * Handle errors in StreamSession#prepare (CASSANDRA-15852)
  * FQL replay should have options to ignore DDL statements (CASSANDRA-16039)
  * Remove COMPACT STORAGE internals (CASSANDRA-13994)
  * Make TimestampSerializer accept fractional seconds of varying precision (CASSANDRA-15976)
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index f59eaa5..9c3a0ed 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -694,7 +694,16 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     {
         // prepare tasks
         state(State.PREPARING);
-        ScheduledExecutors.nonPeriodicTasks.execute(() -> prepareAsync(requests, summaries));
+        ScheduledExecutors.nonPeriodicTasks.execute(() -> {
+            try
+            {
+                prepareAsync(requests, summaries);
+            }
+            catch (Exception e)
+            {
+                onError(e);
+            }
+        });
     }
 
     /**
diff --git a/test/distributed/org/apache/cassandra/distributed/test/StreamPrepareFailTest.java b/test/distributed/org/apache/cassandra/distributed/test/StreamPrepareFailTest.java
new file mode 100644
index 0000000..2553d83
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/StreamPrepareFailTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class StreamPrepareFailTest extends TestBaseImpl
+{
+    @Test
+    public void streamPrepareFailTest() throws Throwable
+    {
+        try(Cluster cluster = init(Cluster.build(2)
+                                          .withInstanceInitializer(StreamFailHelper::install)
+                                          .withConfig(config -> config.with(NETWORK, GOSSIP))
+                                          .start()))
+        {
+            cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
+            try
+            {
+                cluster.get(1).runOnInstance(() -> StorageService.instance.rebuild(null));
+                fail("rebuild should throw exception");
+            }
+            catch (RuntimeException e)
+            {
+                cluster.get(2).runOnInstance(() -> assertTrue(StreamFailHelper.thrown.get()));
+                assertTrue(e.getMessage().contains("Stream failed"));
+            }
+        }
+    }
+
+    public static class StreamFailHelper
+    {
+        static AtomicBoolean thrown = new AtomicBoolean();
+        static void install(ClassLoader cl, int nodeNumber)
+        {
+            new ByteBuddy().redefine(StreamSession.class)
+                           .method(named("prepareAsync"))
+                           .intercept(MethodDelegation.to(StreamFailHelper.class))
+                           .make()
+                           .load(cl, ClassLoadingStrategy.Default.INJECTION);
+        }
+        public static ResultMessage prepareAsync()
+        {
+            thrown.set(true);
+            throw new RuntimeException();
+        }
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org