You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2020/10/06 02:07:34 UTC

[cassandra] branch cassandra-2.2 updated: Don't adjust nodeCount when setting node id topology in in-jvm dtests. Make sure we don't throw any uncaught exceptions during in-jvm dtests.

This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cassandra-2.2
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-2.2 by this push:
     new 4d173e0  Don't adjust nodeCount when setting node id topology in in-jvm dtests. Make sure we don't throw any uncaught exceptions during in-jvm dtests.
4d173e0 is described below

commit 4d173e0a3f97b68b2ce0fb72befe2912efd31102
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Mon Oct 5 16:25:56 2020 -0700

    Don't adjust nodeCount when setting node id topology in in-jvm dtests.
    Make sure we don't throw any uncaught exceptions during in-jvm dtests.
    
    patch by Marcus Eriksson; reviewed by Alex Petrov, David Capwell for CASSANDRA-16109,CASSANDRA-16101
---
 build.xml                                          |  2 +-
 .../distributed/impl/AbstractCluster.java          | 36 +++++++++++++++++++++-
 .../impl/DelegatingInvokableInstance.java          |  1 +
 .../cassandra/distributed/impl/Instance.java       |  3 +-
 .../cassandra/distributed/impl/InstanceConfig.java | 13 ++++++--
 .../distributed/shared/ShutdownException.java      | 30 ++++++++++++++++++
 .../distributed/test/NetworkTopologyTest.java      | 15 +++++----
 7 files changed, 89 insertions(+), 11 deletions(-)

diff --git a/build.xml b/build.xml
index 693cc8f..d003edf 100644
--- a/build.xml
+++ b/build.xml
@@ -396,7 +396,7 @@
           </dependency>
           <dependency groupId="junit" artifactId="junit" version="4.6" />
           <dependency groupId="org.mockito" artifactId="mockito-core" version="3.2.4" />
-          <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.3" />
+          <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.5" />
           <dependency groupId="org.apache.rat" artifactId="apache-rat" version="0.10">
              <exclusion groupId="commons-lang" artifactId="commons-lang"/>
           </dependency>
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 0085f1c..9793add 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -22,16 +22,20 @@ import java.io.File;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
+import java.util.function.BiPredicate;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -61,6 +65,7 @@ import org.apache.cassandra.distributed.shared.AbstractBuilder;
 import org.apache.cassandra.distributed.shared.InstanceClassLoader;
 import org.apache.cassandra.distributed.shared.MessageFilters;
 import org.apache.cassandra.distributed.shared.NetworkTopology;
+import org.apache.cassandra.distributed.shared.ShutdownException;
 import org.apache.cassandra.distributed.shared.Versions;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.net.MessagingService;
@@ -118,6 +123,9 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
     // mutated by user-facing API
     private final MessageFilters filters;
     private final BiConsumer<ClassLoader, Integer> instanceInitializer;
+    private final int datadirCount;
+    private volatile BiPredicate<Integer, Throwable> ignoreUncaughtThrowable = null;
+    private final List<Throwable> uncaughtExceptions = new CopyOnWriteArrayList<>();
 
     private volatile Thread.UncaughtExceptionHandler previousHandler = null;
 
@@ -267,6 +275,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
         this.initialVersion = builder.getVersion();
         this.filters = new MessageFilters();
         this.instanceInitializer = builder.getInstanceInitializer();
+        this.datadirCount = builder.getDatadirCount();
 
         int generation = GENERATION.incrementAndGet();
         for (int i = 0; i < builder.getNodeCount(); ++i)
@@ -297,7 +306,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
 
         NetworkTopology topology = NetworkTopology.build(ipPrefix, broadcastPort, nodeIdTopology);
 
-        InstanceConfig config = InstanceConfig.generate(nodeNum, ipAddress, topology, root, String.valueOf(token), seedIp);
+        InstanceConfig config = InstanceConfig.generate(nodeNum, ipAddress, topology, root, String.valueOf(token), seedIp, datadirCount);
         if (configUpdater != null)
             configUpdater.accept(config);
 
@@ -610,8 +619,20 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
                 handler.uncaughtException(thread, error);
             return;
         }
+
         InstanceClassLoader cl = (InstanceClassLoader) thread.getContextClassLoader();
         get(cl.getInstanceId()).uncaughtException(thread, error);
+
+        BiPredicate<Integer, Throwable> ignore = ignoreUncaughtThrowable;
+        I instance = get(cl.getInstanceId());
+        if ((ignore == null || !ignore.test(cl.getInstanceId(), error)) && instance != null && !instance.isShutdown())
+            uncaughtExceptions.add(error);
+    }
+
+    @Override
+    public void setUncaughtExceptionsFilter(BiPredicate<Integer, Throwable> ignoreUncaughtThrowable)
+    {
+        this.ignoreUncaughtThrowable = ignoreUncaughtThrowable;
     }
 
     @Override
@@ -630,10 +651,23 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
             FileUtils.deleteRecursive(root);
         Thread.setDefaultUncaughtExceptionHandler(previousHandler);
         previousHandler = null;
+        checkAndResetUncaughtExceptions();
 
         //withThreadLeakCheck(futures);
     }
 
+    @Override
+    public void checkAndResetUncaughtExceptions()
+    {
+        List<Throwable> drain = new ArrayList<>(uncaughtExceptions.size());
+        uncaughtExceptions.removeIf(e -> {
+            drain.add(e);
+            return true;
+        });
+        if (!drain.isEmpty())
+            throw new ShutdownException(drain);
+    }
+
     // We do not want this check to run every time until we fix problems with tread stops
     private void withThreadLeakCheck(List<Future<?>> futures)
     {
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java b/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
index 690e503..262da7a 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.distributed.impl;
 
 import java.io.Serializable;
 import java.net.InetSocketAddress;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.Future;
 import java.util.function.BiConsumer;
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 7ed29fd..b8bb60c 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -263,7 +263,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
                 int toNum = config().num();
 
 
-                IMessage msg = serializeMessage(message, id, from.broadcastAddress(), broadcastAddress());
+                IMessage msg = serializeMessage(message, id, from.config().broadcastAddress(), broadcastAddress());
 
                 return cluster.filters().permitInbound(fromNum, toNum, msg);
             }
@@ -826,3 +826,4 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
         return accumulate;
     }
 }
+
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
index d13a0b6..4e8a782 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
@@ -261,7 +261,7 @@ public class InstanceConfig implements IInstanceConfig
         return (String)params.get(name);
     }
 
-    public static InstanceConfig generate(int nodeNum, String ipAddress, NetworkTopology networkTopology, File root, String token, String seedIp)
+    public static InstanceConfig generate(int nodeNum, String ipAddress, NetworkTopology networkTopology, File root, String token, String seedIp, int datadirCount)
     {
         return new InstanceConfig(nodeNum,
                                   networkTopology,
@@ -271,13 +271,22 @@ public class InstanceConfig implements IInstanceConfig
                                   ipAddress,
                                   seedIp,
                                   String.format("%s/node%d/saved_caches", root, nodeNum),
-                                  new String[] { String.format("%s/node%d/data", root, nodeNum) },
+                                  datadirs(datadirCount, root, nodeNum),
                                   String.format("%s/node%d/commitlog", root, nodeNum),
 //                                  String.format("%s/node%d/hints", root, nodeNum),
 //                                  String.format("%s/node%d/cdc", root, nodeNum),
                                   token);
     }
 
+    private static String[] datadirs(int datadirCount, File root, int nodeNum)
+    {
+        String datadirFormat = String.format("%s/node%d/data%%d", root.getPath(), nodeNum);
+        String [] datadirs = new String[datadirCount];
+        for (int i = 0; i < datadirs.length; i++)
+            datadirs[i] = String.format(datadirFormat, i);
+        return datadirs;
+    }
+
     public InstanceConfig forVersion(Versions.Major major)
     {
         switch (major)
diff --git a/test/distributed/org/apache/cassandra/distributed/shared/ShutdownException.java b/test/distributed/org/apache/cassandra/distributed/shared/ShutdownException.java
new file mode 100644
index 0000000..d2b5bf7
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/shared/ShutdownException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.util.List;
+
+public class ShutdownException extends RuntimeException
+{
+    public ShutdownException(List<Throwable> uncaughtExceptions)
+    {
+        super("Uncaught exceptions were thrown during test");
+        uncaughtExceptions.forEach(super::addSuppressed);
+    }
+}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java b/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java
index 8230fd5..a4968c6 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.distributed.test;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -42,7 +43,7 @@ public class NetworkTopologyTest extends TestBaseImpl
                                                     .withRack("elsewhere", "firstrack", 1)
                                                     .withRack("elsewhere", "secondrack", 2)
                                                     .withDC("nearthere", 4)
-                                                    .start())
+                                                    .createWithoutStarting())
         {
             Assert.assertEquals(1, cluster.stream("somewhere").count());
             Assert.assertEquals(1, cluster.stream("elsewhere", "firstrack").count());
@@ -63,7 +64,7 @@ public class NetworkTopologyTest extends TestBaseImpl
 
     {
         try (ICluster cluster = builder().withRacks(2, 1, 3)
-                                         .start())
+                                         .createWithoutStarting())
         {
             Assert.assertEquals(6, cluster.stream().count());
             Assert.assertEquals(3, cluster.stream("datacenter1").count());
@@ -72,16 +73,18 @@ public class NetworkTopologyTest extends TestBaseImpl
     }
 
     @Test(expected = IllegalStateException.class)
-    public void noCountsAfterNamingDCsTest()
+    public void noCountsAfterNamingDCsTest() throws IOException
     {
         builder().withDC("nameddc", 1)
-                 .withDCs(1);
+                 .withDCs(1)
+                 .createWithoutStarting();
     }
 
     @Test(expected = IllegalStateException.class)
-    public void mustProvideNodeCountBeforeWithDCsTest()
+    public void mustProvideNodeCountBeforeWithDCsTest() throws IOException
     {
-        builder().withDCs(1);
+        builder().withDCs(1)
+                 .createWithoutStarting();
     }
 
     @Test(expected = IllegalStateException.class)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org