You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2020/10/06 02:07:34 UTC
[cassandra] branch cassandra-2.2 updated: Don't adjust nodeCount
when setting node id topology in in-jvm dtests. Make sure we don't throw
any uncaught exceptions during in-jvm dtests.
This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch cassandra-2.2
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-2.2 by this push:
new 4d173e0 Don't adjust nodeCount when setting node id topology in in-jvm dtests. Make sure we don't throw any uncaught exceptions during in-jvm dtests.
4d173e0 is described below
commit 4d173e0a3f97b68b2ce0fb72befe2912efd31102
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Mon Oct 5 16:25:56 2020 -0700
Don't adjust nodeCount when setting node id topology in in-jvm dtests.
Make sure we don't throw any uncaught exceptions during in-jvm dtests.
patch by Marcus Eriksson; reviewed by Alex Petrov, David Capwell for CASSANDRA-16109,CASSANDRA-16101
---
build.xml | 2 +-
.../distributed/impl/AbstractCluster.java | 36 +++++++++++++++++++++-
.../impl/DelegatingInvokableInstance.java | 1 +
.../cassandra/distributed/impl/Instance.java | 3 +-
.../cassandra/distributed/impl/InstanceConfig.java | 13 ++++++--
.../distributed/shared/ShutdownException.java | 30 ++++++++++++++++++
.../distributed/test/NetworkTopologyTest.java | 15 +++++----
7 files changed, 89 insertions(+), 11 deletions(-)
diff --git a/build.xml b/build.xml
index 693cc8f..d003edf 100644
--- a/build.xml
+++ b/build.xml
@@ -396,7 +396,7 @@
</dependency>
<dependency groupId="junit" artifactId="junit" version="4.6" />
<dependency groupId="org.mockito" artifactId="mockito-core" version="3.2.4" />
- <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.3" />
+ <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.5" />
<dependency groupId="org.apache.rat" artifactId="apache-rat" version="0.10">
<exclusion groupId="commons-lang" artifactId="commons-lang"/>
</dependency>
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 0085f1c..9793add 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -22,16 +22,20 @@ import java.io.File;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
+import java.util.function.BiPredicate;
import java.util.function.Consumer;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -61,6 +65,7 @@ import org.apache.cassandra.distributed.shared.AbstractBuilder;
import org.apache.cassandra.distributed.shared.InstanceClassLoader;
import org.apache.cassandra.distributed.shared.MessageFilters;
import org.apache.cassandra.distributed.shared.NetworkTopology;
+import org.apache.cassandra.distributed.shared.ShutdownException;
import org.apache.cassandra.distributed.shared.Versions;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.net.MessagingService;
@@ -118,6 +123,9 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
// mutated by user-facing API
private final MessageFilters filters;
private final BiConsumer<ClassLoader, Integer> instanceInitializer;
+ private final int datadirCount;
+ private volatile BiPredicate<Integer, Throwable> ignoreUncaughtThrowable = null;
+ private final List<Throwable> uncaughtExceptions = new CopyOnWriteArrayList<>();
private volatile Thread.UncaughtExceptionHandler previousHandler = null;
@@ -267,6 +275,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
this.initialVersion = builder.getVersion();
this.filters = new MessageFilters();
this.instanceInitializer = builder.getInstanceInitializer();
+ this.datadirCount = builder.getDatadirCount();
int generation = GENERATION.incrementAndGet();
for (int i = 0; i < builder.getNodeCount(); ++i)
@@ -297,7 +306,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
NetworkTopology topology = NetworkTopology.build(ipPrefix, broadcastPort, nodeIdTopology);
- InstanceConfig config = InstanceConfig.generate(nodeNum, ipAddress, topology, root, String.valueOf(token), seedIp);
+ InstanceConfig config = InstanceConfig.generate(nodeNum, ipAddress, topology, root, String.valueOf(token), seedIp, datadirCount);
if (configUpdater != null)
configUpdater.accept(config);
@@ -610,8 +619,20 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
handler.uncaughtException(thread, error);
return;
}
+
InstanceClassLoader cl = (InstanceClassLoader) thread.getContextClassLoader();
get(cl.getInstanceId()).uncaughtException(thread, error);
+
+ BiPredicate<Integer, Throwable> ignore = ignoreUncaughtThrowable;
+ I instance = get(cl.getInstanceId());
+ if ((ignore == null || !ignore.test(cl.getInstanceId(), error)) && instance != null && !instance.isShutdown())
+ uncaughtExceptions.add(error);
+ }
+
+ @Override
+ public void setUncaughtExceptionsFilter(BiPredicate<Integer, Throwable> ignoreUncaughtThrowable)
+ {
+ this.ignoreUncaughtThrowable = ignoreUncaughtThrowable;
}
@Override
@@ -630,10 +651,23 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
FileUtils.deleteRecursive(root);
Thread.setDefaultUncaughtExceptionHandler(previousHandler);
previousHandler = null;
+ checkAndResetUncaughtExceptions();
//withThreadLeakCheck(futures);
}
+ @Override
+ public void checkAndResetUncaughtExceptions()
+ {
+ List<Throwable> drain = new ArrayList<>(uncaughtExceptions.size());
+ uncaughtExceptions.removeIf(e -> {
+ drain.add(e);
+ return true;
+ });
+ if (!drain.isEmpty())
+ throw new ShutdownException(drain);
+ }
+
// We do not want this check to run every time until we fix problems with tread stops
private void withThreadLeakCheck(List<Future<?>> futures)
{
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java b/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
index 690e503..262da7a 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.distributed.impl;
import java.io.Serializable;
import java.net.InetSocketAddress;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 7ed29fd..b8bb60c 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -263,7 +263,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
int toNum = config().num();
- IMessage msg = serializeMessage(message, id, from.broadcastAddress(), broadcastAddress());
+ IMessage msg = serializeMessage(message, id, from.config().broadcastAddress(), broadcastAddress());
return cluster.filters().permitInbound(fromNum, toNum, msg);
}
@@ -826,3 +826,4 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
return accumulate;
}
}
+
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
index d13a0b6..4e8a782 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
@@ -261,7 +261,7 @@ public class InstanceConfig implements IInstanceConfig
return (String)params.get(name);
}
- public static InstanceConfig generate(int nodeNum, String ipAddress, NetworkTopology networkTopology, File root, String token, String seedIp)
+ public static InstanceConfig generate(int nodeNum, String ipAddress, NetworkTopology networkTopology, File root, String token, String seedIp, int datadirCount)
{
return new InstanceConfig(nodeNum,
networkTopology,
@@ -271,13 +271,22 @@ public class InstanceConfig implements IInstanceConfig
ipAddress,
seedIp,
String.format("%s/node%d/saved_caches", root, nodeNum),
- new String[] { String.format("%s/node%d/data", root, nodeNum) },
+ datadirs(datadirCount, root, nodeNum),
String.format("%s/node%d/commitlog", root, nodeNum),
// String.format("%s/node%d/hints", root, nodeNum),
// String.format("%s/node%d/cdc", root, nodeNum),
token);
}
+ private static String[] datadirs(int datadirCount, File root, int nodeNum)
+ {
+ String datadirFormat = String.format("%s/node%d/data%%d", root.getPath(), nodeNum);
+ String [] datadirs = new String[datadirCount];
+ for (int i = 0; i < datadirs.length; i++)
+ datadirs[i] = String.format(datadirFormat, i);
+ return datadirs;
+ }
+
public InstanceConfig forVersion(Versions.Major major)
{
switch (major)
diff --git a/test/distributed/org/apache/cassandra/distributed/shared/ShutdownException.java b/test/distributed/org/apache/cassandra/distributed/shared/ShutdownException.java
new file mode 100644
index 0000000..d2b5bf7
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/shared/ShutdownException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.util.List;
+
+public class ShutdownException extends RuntimeException
+{
+ public ShutdownException(List<Throwable> uncaughtExceptions)
+ {
+ super("Uncaught exceptions were thrown during test");
+ uncaughtExceptions.forEach(super::addSuppressed);
+ }
+}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java b/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java
index 8230fd5..a4968c6 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/NetworkTopologyTest.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.distributed.test;
+import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.stream.Collectors;
@@ -42,7 +43,7 @@ public class NetworkTopologyTest extends TestBaseImpl
.withRack("elsewhere", "firstrack", 1)
.withRack("elsewhere", "secondrack", 2)
.withDC("nearthere", 4)
- .start())
+ .createWithoutStarting())
{
Assert.assertEquals(1, cluster.stream("somewhere").count());
Assert.assertEquals(1, cluster.stream("elsewhere", "firstrack").count());
@@ -63,7 +64,7 @@ public class NetworkTopologyTest extends TestBaseImpl
{
try (ICluster cluster = builder().withRacks(2, 1, 3)
- .start())
+ .createWithoutStarting())
{
Assert.assertEquals(6, cluster.stream().count());
Assert.assertEquals(3, cluster.stream("datacenter1").count());
@@ -72,16 +73,18 @@ public class NetworkTopologyTest extends TestBaseImpl
}
@Test(expected = IllegalStateException.class)
- public void noCountsAfterNamingDCsTest()
+ public void noCountsAfterNamingDCsTest() throws IOException
{
builder().withDC("nameddc", 1)
- .withDCs(1);
+ .withDCs(1)
+ .createWithoutStarting();
}
@Test(expected = IllegalStateException.class)
- public void mustProvideNodeCountBeforeWithDCsTest()
+ public void mustProvideNodeCountBeforeWithDCsTest() throws IOException
{
- builder().withDCs(1);
+ builder().withDCs(1)
+ .createWithoutStarting();
}
@Test(expected = IllegalStateException.class)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org