You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2018/11/16 18:42:45 UTC
[1/3] cassandra git commit: Introduce in-jvm distributed tests
Repository: cassandra
Updated Branches:
refs/heads/trunk 787703508 -> f22fec927
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/MessageFilters.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/MessageFilters.java b/test/distributed/org/apache/cassandra/distributed/MessageFilters.java
new file mode 100644
index 0000000..f488e08
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/MessageFilters.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import java.util.Arrays;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.function.BiConsumer;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.MessagingService;
+
+public class MessageFilters
+{
+ private final TestCluster cluster;
+ private final Set<Filter> filters = new CopyOnWriteArraySet<>();
+
+ public MessageFilters(TestCluster cluster)
+ {
+ this.cluster = cluster;
+ }
+
+ BiConsumer<InetAddressAndPort, Message> filter(BiConsumer<InetAddressAndPort, Message> applyIfNotFiltered)
+ {
+ return (toAddress, message) ->
+ {
+ int from = cluster.get(message.from).config.num;
+ int to = cluster.get(toAddress).config.num;
+ int verb = message.verb;
+ for (Filter filter : filters)
+ {
+ if (filter.matches(from, to, verb))
+ return;
+ }
+
+ applyIfNotFiltered.accept(toAddress, message);
+ };
+ }
+
+ public class Filter
+ {
+ final int[] from;
+ final int[] to;
+ final int[] verbs;
+
+ Filter(int[] from, int[] to, int[] verbs)
+ {
+ if (from != null)
+ {
+ from = from.clone();
+ Arrays.sort(from);
+ }
+ if (to != null)
+ {
+ to = to.clone();
+ Arrays.sort(to);
+ }
+ if (verbs != null)
+ {
+ verbs = verbs.clone();
+ Arrays.sort(verbs);
+ }
+ this.from = from;
+ this.to = to;
+ this.verbs = verbs;
+ }
+
+ public int hashCode()
+ {
+ return (from == null ? 0 : Arrays.hashCode(from))
+ + (to == null ? 0 : Arrays.hashCode(to))
+ + (verbs == null ? 0 : Arrays.hashCode(verbs));
+ }
+
+ public boolean equals(Object that)
+ {
+ return that instanceof Filter && equals((Filter) that);
+ }
+
+ public boolean equals(Filter that)
+ {
+ return Arrays.equals(from, that.from)
+ && Arrays.equals(to, that.to)
+ && Arrays.equals(verbs, that.verbs);
+ }
+
+ public boolean matches(int from, int to, int verb)
+ {
+ return (this.from == null || Arrays.binarySearch(this.from, from) >= 0)
+ && (this.to == null || Arrays.binarySearch(this.to, to) >= 0)
+ && (this.verbs == null || Arrays.binarySearch(this.verbs, verb) >= 0);
+ }
+
+ public Filter restore()
+ {
+ filters.remove(this);
+ return this;
+ }
+
+ public Filter drop()
+ {
+ filters.add(this);
+ return this;
+ }
+ }
+
+ public class Builder
+ {
+ int[] from;
+ int[] to;
+ int[] verbs;
+
+ private Builder(int[] verbs)
+ {
+ this.verbs = verbs;
+ }
+
+ public Builder from(int ... nums)
+ {
+ from = nums;
+ return this;
+ }
+
+ public Builder to(int ... nums)
+ {
+ to = nums;
+ return this;
+ }
+
+ public Filter ready()
+ {
+ return new Filter(from, to, verbs);
+ }
+
+ public Filter drop()
+ {
+ return ready().drop();
+ }
+ }
+
+ public Builder verbs(MessagingService.Verb ... verbs)
+ {
+ int[] ids = new int[verbs.length];
+ for (int i = 0 ; i < verbs.length ; ++i)
+ ids[i] = verbs[i].getId();
+ return new Builder(ids);
+ }
+
+ public Builder allVerbs()
+ {
+ return new Builder(null);
+ }
+
+ public void reset()
+ {
+ filters.clear();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/RowUtil.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/RowUtil.java b/test/distributed/org/apache/cassandra/distributed/RowUtil.java
new file mode 100644
index 0000000..bce896d
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/RowUtil.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
+public class RowUtil
+{
+ public static Object[][] toObjects(ResultMessage.Rows rows)
+ {
+ Object[][] result = new Object[rows.result.rows.size()][];
+ List<ColumnSpecification> specs = rows.result.metadata.names;
+ for (int i = 0; i < rows.result.rows.size(); i++)
+ {
+ List<ByteBuffer> row = rows.result.rows.get(i);
+ result[i] = new Object[row.size()];
+ for (int j = 0; j < row.size(); j++)
+ {
+ ByteBuffer bb = row.get(j);
+
+ if (bb != null)
+ result[i][j] = specs.get(j).type.getSerializer().deserialize(bb);
+ }
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/TestCluster.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/TestCluster.java b/test/distributed/org/apache/cassandra/distributed/TestCluster.java
new file mode 100644
index 0000000..2b979ee
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/TestCluster.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URLClassLoader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.IntFunction;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.diag.DiagnosticEventService;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.SchemaEvent;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
+
+/**
+ * TestCluster creates, initializes and manages Cassandra instances ({@link Instance}.
+ *
+ * All instances created under the same cluster will have a shared ClassLoader that'll preload
+ * common classes required for configuration and communication (byte buffers, primitives, config
+ * objects etc). Shared classes are listed in {@link InstanceClassLoader#commonClasses}.
+ *
+ * Each instance has its own class loader that will load logging, yaml libraries and all non-shared
+ * Cassandra package classes. The rule of thumb is that we'd like to have all Cassandra-specific things
+ * (unless explitily shared through the common classloader) on a per-classloader basis in order to
+ * allow creating more than one instance of DatabaseDescriptor and other Cassandra singletones.
+ *
+ * All actions (reading, writing, schema changes, etc) are executed by serializing lambda/runnables,
+ * transferring them to instance-specific classloaders, deserializing and running them there. Most of
+ * the things can be simply captured in closure or passed through `apply` method of the wrapped serializable
+ * function/callable. You can use {@link InvokableInstance#{applies|runs|consumes}OnInstance} for executing
+ * code on specific instance.
+ *
+ * Each instance has its own logger. Each instance log line will contain INSTANCE{instance_id}.
+ *
+ * As of today, messaging is faked by hooking into MessagingService, so we're not using usual Cassandra
+ * handlers for internode to have more control over it. Messaging is wired by passing verbs manually.
+ * coordinator-handling code and hooks to the callbacks can be found in {@link Coordinator}.
+ */
+public class TestCluster implements AutoCloseable
+{
+ private static ExecutorService exec = Executors.newCachedThreadPool(new NamedThreadFactory("cluster-async-tasks"));
+
+ private final File root;
+ private final List<Instance> instances;
+ private final Coordinator coordinator;
+ private final Map<InetAddressAndPort, Instance> instanceMap;
+ private final MessageFilters filters;
+
+ private TestCluster(File root, List<Instance> instances)
+ {
+ this.root = root;
+ this.instances = instances;
+ this.instanceMap = new HashMap<>();
+ this.coordinator = new Coordinator(instances.get(0));
+ this.filters = new MessageFilters(this);
+ }
+
+ void launch()
+ {
+ FBUtilities.waitOnFutures(instances.stream()
+ .map(i -> exec.submit(() -> i.launch(this)))
+ .collect(Collectors.toList())
+ );
+ for (Instance instance : instances)
+ instanceMap.put(instance.getBroadcastAddress(), instance);
+ }
+
+ public int size()
+ {
+ return instances.size();
+ }
+
+ public Coordinator coordinator()
+ {
+ return coordinator;
+ }
+
+ /**
+ * WARNING: we index from 1 here, for consistency with inet address!
+ */
+ public Instance get(int idx)
+ {
+ return instances.get(idx - 1);
+ }
+
+ public Instance get(InetAddressAndPort addr)
+ {
+ return instanceMap.get(addr);
+ }
+
+ MessageFilters filters()
+ {
+ return filters;
+ }
+
+ MessageFilters.Builder verbs(MessagingService.Verb ... verbs)
+ {
+ return filters.verbs(verbs);
+ }
+
+ public void disableAutoCompaction(String keyspace)
+ {
+ for (Instance instance : instances)
+ {
+ instance.runOnInstance(() -> {
+ for (ColumnFamilyStore cs : Keyspace.open(keyspace).getColumnFamilyStores())
+ cs.disableAutoCompaction();
+ });
+ }
+ }
+
+ public void schemaChange(String query)
+ {
+ try (SchemaChangeMonitor monitor = new SchemaChangeMonitor())
+ {
+ // execute the schema change
+ coordinator().execute(query, ConsistencyLevel.ALL);
+ monitor.waitForAgreement();
+ }
+ }
+
+ /**
+ * Will wait for a schema change AND agreement that occurs after it is created
+ * (and precedes the invocation to waitForAgreement)
+ *
+ * Works by simply checking if all UUIDs agree after any schema version change event,
+ * so long as the waitForAgreement method has been entered (indicating the change has
+ * taken place on the coordinator)
+ *
+ * This could perhaps be made a little more robust, but this should more than suffice.
+ */
+ public class SchemaChangeMonitor implements AutoCloseable
+ {
+ final List<Runnable> cleanup;
+ volatile boolean schemaHasChanged;
+ final SimpleCondition agreement = new SimpleCondition();
+
+ public SchemaChangeMonitor()
+ {
+ this.cleanup = new ArrayList<>(instances.size());
+ for (Instance instance : instances)
+ {
+ cleanup.add(
+ instance.appliesOnInstance(
+ (Runnable runnable) -> {
+ Consumer<SchemaEvent> consumer = event -> runnable.run();
+ DiagnosticEventService.instance().subscribe(SchemaEvent.class, SchemaEvent.SchemaEventType.VERSION_UPDATED, consumer);
+ return (Runnable) () -> DiagnosticEventService.instance().unsubscribe(SchemaEvent.class, consumer);
+ }
+ ).apply(this::signal)
+ );
+ }
+ }
+
+ private void signal()
+ {
+ if (schemaHasChanged && 1 == instances.stream().map(Instance::getSchemaVersion).distinct().count())
+ agreement.signalAll();
+ }
+
+ @Override
+ public void close()
+ {
+ for (Runnable runnable : cleanup)
+ runnable.run();
+ }
+
+ public void waitForAgreement()
+ {
+ schemaHasChanged = true;
+ signal();
+ try
+ {
+ agreement.await(1L, TimeUnit.MINUTES);
+ } catch (InterruptedException e)
+ {
+ throw new IllegalStateException("Schema agreement not reached");
+ }
+ }
+ }
+
+ public void schemaChange(String statement, int instance)
+ {
+ get(instance).schemaChange(statement);
+ }
+
+ public static TestCluster create(int nodeCount) throws Throwable
+ {
+ return create(nodeCount, Files.createTempDirectory("dtests").toFile());
+ }
+
+ public static TestCluster create(int nodeCount, File root)
+ {
+ root.mkdirs();
+ setupLogging(root);
+
+ IntFunction<ClassLoader> classLoaderFactory = InstanceClassLoader.createFactory(
+ (URLClassLoader) Thread.currentThread().getContextClassLoader());
+ List<Instance> instances = new ArrayList<>();
+ long token = Long.MIN_VALUE + 1, increment = 2 * (Long.MAX_VALUE / nodeCount);
+ for (int i = 0 ; i < nodeCount ; ++i)
+ {
+ InstanceConfig instanceConfig = InstanceConfig.generate(i + 1, root, String.valueOf(token));
+ instances.add(new Instance(instanceConfig, classLoaderFactory.apply(i + 1)));
+ token += increment;
+ }
+
+ TestCluster cluster = new TestCluster(root, instances);
+ cluster.launch();
+ return cluster;
+ }
+
+ private static void setupLogging(File root)
+ {
+ try
+ {
+ String testConfPath = "test/conf/logback-dtest.xml";
+ Path logConfPath = Paths.get(root.getPath(), "/logback-dtest.xml");
+ if (!logConfPath.toFile().exists())
+ {
+ Files.copy(new File(testConfPath).toPath(),
+ logConfPath);
+ }
+ System.setProperty("logback.configurationFile", "file://" + logConfPath);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ List<Future<?>> futures = instances.stream()
+ .map(i -> exec.submit(i::shutdown))
+ .collect(Collectors.toList());
+
+// withThreadLeakCheck(futures);
+
+ // Make sure to only delete directory when threads are stopped
+ exec.submit(() -> {
+ FBUtilities.waitOnFutures(futures);
+ FileUtils.deleteRecursive(root);
+ });
+ }
+
+ // We do not want this check to run every time until we fix problems with tread stops
+ private void withThreadLeakCheck(List<Future<?>> futures)
+ {
+ FBUtilities.waitOnFutures(futures);
+
+ Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
+ threadSet = Sets.difference(threadSet, Collections.singletonMap(Thread.currentThread(), null).keySet());
+ if (!threadSet.isEmpty())
+ {
+ for (Thread thread : threadSet)
+ {
+ System.out.println(thread);
+ System.out.println(Arrays.toString(thread.getStackTrace()));
+ }
+ throw new RuntimeException(String.format("Not all threads have shut down. %d threads are still running: %s", threadSet.size(), threadSet));
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java b/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java
index 4607d5c..18d17e8 100644
--- a/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java
+++ b/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java
@@ -78,7 +78,7 @@ public class NettyFactoryTest
}
@After
- public void tearDown()
+ public void tearDown() throws Exception
{
if (factory != null)
factory.close();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/unit/org/apache/cassandra/streaming/async/StreamCompressionSerializerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/async/StreamCompressionSerializerTest.java b/test/unit/org/apache/cassandra/streaming/async/StreamCompressionSerializerTest.java
index fff7b17..e274f27 100644
--- a/test/unit/org/apache/cassandra/streaming/async/StreamCompressionSerializerTest.java
+++ b/test/unit/org/apache/cassandra/streaming/async/StreamCompressionSerializerTest.java
@@ -127,9 +127,5 @@ public class StreamCompressionSerializerTest
{
return true;
}
-
- @Override
- public void close()
- { }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[2/3] cassandra git commit: Introduce in-jvm distributed tests
Posted by if...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 630dc5d..439d8cf 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.service;
import java.io.File;
-import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.nio.file.Paths;
import java.util.*;
@@ -26,9 +25,6 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.cache.CacheLoader;
@@ -123,16 +119,7 @@ public class StorageProxy implements StorageProxyMBean
static
{
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- mbs.registerMBean(instance, new ObjectName(MBEAN_NAME));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
-
+ MBeanWrapper.instance.registerMBean(instance, MBEAN_NAME);
HintsService.instance.registerMBean();
HintedHandOffManager.instance.registerMBean();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index e7ca4be..68dba93 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -231,7 +231,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private final List<IEndpointLifecycleSubscriber> lifecycleSubscribers = new CopyOnWriteArrayList<>();
- private final ObjectName jmxObjectName;
+ private final String jmxObjectName;
private Collection<Token> bootstrapTokens = null;
@@ -270,17 +270,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
// use dedicated executor for handling JMX notifications
super(JMXBroadcastExecutor.executor);
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- jmxObjectName = new ObjectName("org.apache.cassandra.db:type=StorageService");
- mbs.registerMBean(this, jmxObjectName);
- mbs.registerMBean(StreamManager.instance, new ObjectName(StreamManager.OBJECT_NAME));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ jmxObjectName = "org.apache.cassandra.db:type=StorageService";
+ MBeanWrapper.instance.registerMBean(this, jmxObjectName);
+ MBeanWrapper.instance.registerMBean(StreamManager.instance, StreamManager.OBJECT_NAME);
ReadCommandVerbHandler readHandler = new ReadCommandVerbHandler();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index 151eb18..50d35bc 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -471,6 +471,32 @@ public class ByteBufferUtil
return bytes.getDouble(bytes.position());
}
+ public static ByteBuffer objectToBytes(Object obj)
+ {
+ if (obj instanceof Integer)
+ return ByteBufferUtil.bytes((int) obj);
+ else if (obj instanceof Byte)
+ return ByteBufferUtil.bytes((byte) obj);
+ else if (obj instanceof Short)
+ return ByteBufferUtil.bytes((short) obj);
+ else if (obj instanceof Long)
+ return ByteBufferUtil.bytes((long) obj);
+ else if (obj instanceof Float)
+ return ByteBufferUtil.bytes((float) obj);
+ else if (obj instanceof Double)
+ return ByteBufferUtil.bytes((double) obj);
+ else if (obj instanceof UUID)
+ return ByteBufferUtil.bytes((UUID) obj);
+ else if (obj instanceof InetAddress)
+ return ByteBufferUtil.bytes((InetAddress) obj);
+ else if (obj instanceof String)
+ return ByteBufferUtil.bytes((String) obj);
+ else
+ throw new IllegalArgumentException(String.format("Cannot convert value %s of type %s",
+ obj,
+ obj.getClass()));
+ }
+
public static ByteBuffer bytes(byte b)
{
return ByteBuffer.allocate(1).put(0, b);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/utils/MBeanWrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/MBeanWrapper.java b/src/java/org/apache/cassandra/utils/MBeanWrapper.java
new file mode 100644
index 0000000..edee6af
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/MBeanWrapper.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.lang.management.ManagementFactory;
+import java.util.function.Consumer;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class to avoid catching and rethrowing checked exceptions on MBean and
+ * allow turning of MBean registration for test purposes.
+ */
+public interface MBeanWrapper
+{
+ static final Logger logger = LoggerFactory.getLogger(MBeanWrapper.class);
+
+ static final MBeanWrapper instance = Boolean.getBoolean("org.apache.cassandra.disable_mbean_registration") ?
+ new NoOpMBeanWrapper() :
+ new PlatformMBeanWrapper();
+
+ // Passing true for graceful will log exceptions instead of rethrowing them
+ public void registerMBean(Object obj, ObjectName mbeanName, OnException onException);
+ default void registerMBean(Object obj, ObjectName mbeanName)
+ {
+ registerMBean(obj, mbeanName, OnException.THROW);
+ }
+
+ public void registerMBean(Object obj, String mbeanName, OnException onException);
+ default void registerMBean(Object obj, String mbeanName)
+ {
+ registerMBean(obj, mbeanName, OnException.THROW);
+ }
+
+ public boolean isRegistered(ObjectName mbeanName, OnException onException);
+ default boolean isRegistered(ObjectName mbeanName)
+ {
+ return isRegistered(mbeanName, OnException.THROW);
+ }
+
+ public boolean isRegistered(String mbeanName, OnException onException);
+ default boolean isRegistered(String mbeanName)
+ {
+ return isRegistered(mbeanName, OnException.THROW);
+ }
+
+ public void unregisterMBean(ObjectName mbeanName, OnException onException);
+ default void unregisterMBean(ObjectName mbeanName)
+ {
+ unregisterMBean(mbeanName, OnException.THROW);
+ }
+
+ public void unregisterMBean(String mbeanName, OnException onException);
+ default void unregisterMBean(String mbeanName)
+ {
+ unregisterMBean(mbeanName, OnException.THROW);
+ }
+
+ static class NoOpMBeanWrapper implements MBeanWrapper
+ {
+ public void registerMBean(Object obj, ObjectName mbeanName, OnException onException) {}
+ public void registerMBean(Object obj, String mbeanName, OnException onException) {}
+ public boolean isRegistered(ObjectName mbeanName, OnException onException) { return false; }
+ public boolean isRegistered(String mbeanName, OnException onException) { return false; }
+ public void unregisterMBean(ObjectName mbeanName, OnException onException) {}
+ public void unregisterMBean(String mbeanName, OnException onException) {}
+ }
+
+ static class PlatformMBeanWrapper implements MBeanWrapper
+ {
+ private final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ public void registerMBean(Object obj, ObjectName mbeanName, OnException onException)
+ {
+ try
+ {
+ mbs.registerMBean(obj, mbeanName);
+ }
+ catch (Exception e)
+ {
+ onException.handler.accept(e);
+ }
+ }
+
+ public void registerMBean(Object obj, String mbeanName, OnException onException)
+ {
+ try
+ {
+ mbs.registerMBean(obj, new ObjectName(mbeanName));
+ }
+ catch (Exception e)
+ {
+ onException.handler.accept(e);
+ }
+ }
+
+ public boolean isRegistered(ObjectName mbeanName, OnException onException)
+ {
+ try
+ {
+ return mbs.isRegistered(mbeanName);
+ }
+ catch (Exception e)
+ {
+ onException.handler.accept(e);
+ }
+ return false;
+ }
+
+ public boolean isRegistered(String mbeanName, OnException onException)
+ {
+ try
+ {
+ return mbs.isRegistered(new ObjectName(mbeanName));
+ }
+ catch (Exception e)
+ {
+ onException.handler.accept(e);
+ }
+ return false;
+ }
+
+ public void unregisterMBean(ObjectName mbeanName, OnException onException)
+ {
+ try
+ {
+ mbs.unregisterMBean(mbeanName);
+ }
+ catch (Exception e)
+ {
+ onException.handler.accept(e);
+ }
+ }
+
+ public void unregisterMBean(String mbeanName, OnException onException)
+ {
+ try
+ {
+ mbs.unregisterMBean(new ObjectName(mbeanName));
+ }
+ catch (Exception e)
+ {
+ onException.handler.accept(e);
+ }
+ }
+ }
+
+ public enum OnException
+ {
+ THROW(e -> { throw new RuntimeException(e); }),
+ LOG(e -> { logger.error("Error in MBean wrapper: ", e); }),
+ IGNORE(e -> {});
+
+ private Consumer<Exception> handler;
+ OnException(Consumer<Exception> handler)
+ {
+ this.handler = handler;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/utils/Mx4jTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Mx4jTool.java b/src/java/org/apache/cassandra/utils/Mx4jTool.java
index 5baaea2..eda6354 100644
--- a/src/java/org/apache/cassandra/utils/Mx4jTool.java
+++ b/src/java/org/apache/cassandra/utils/Mx4jTool.java
@@ -17,8 +17,6 @@
*/
package org.apache.cassandra.utils;
-import java.lang.management.ManagementFactory;
-import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.commons.lang3.StringUtils;
@@ -44,7 +42,7 @@ public class Mx4jTool
try
{
logger.trace("Will try to load mx4j now, if it's in the classpath");
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ MBeanWrapper mbs = MBeanWrapper.instance;
ObjectName processorName = new ObjectName("Server:name=XSLTProcessor");
Class<?> httpAdaptorClass = Class.forName("mx4j.tools.adaptor.http.HttpAdaptor");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/utils/concurrent/Ref.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Ref.java b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
index b69e6bf..1a17a1f 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Ref.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
@@ -31,6 +31,7 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.cassandra.concurrent.InfiniteLoopExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -354,11 +355,10 @@ public final class Ref<T> implements RefCounted<T>
static final Set<Class<?>> concurrentIterables = Collections.newSetFromMap(new IdentityHashMap<>());
private static final Set<GlobalState> globallyExtant = Collections.newSetFromMap(new ConcurrentHashMap<>());
static final ReferenceQueue<Object> referenceQueue = new ReferenceQueue<>();
- private static final ExecutorService EXEC = Executors.newFixedThreadPool(1, new NamedThreadFactory("Reference-Reaper"));
+ private static final InfiniteLoopExecutor EXEC = new InfiniteLoopExecutor("Reference-Reaper", Ref::reapOneReference).start();
static final ScheduledExecutorService STRONG_LEAK_DETECTOR = !DEBUG_ENABLED ? null : Executors.newScheduledThreadPool(1, new NamedThreadFactory("Strong-Reference-Leak-Detector"));
static
{
- EXEC.execute(new ReferenceReaper());
if (DEBUG_ENABLED)
{
STRONG_LEAK_DETECTOR.scheduleAtFixedRate(new Visitor(), 1, 15, TimeUnit.MINUTES);
@@ -367,28 +367,12 @@ public final class Ref<T> implements RefCounted<T>
concurrentIterables.addAll(Arrays.asList(concurrentIterableClasses));
}
- static final class ReferenceReaper implements Runnable
+ private static void reapOneReference() throws InterruptedException
{
- public void run()
+ Object obj = referenceQueue.remove(100);
+ if (obj instanceof Ref.State)
{
- try
- {
- while (true)
- {
- Object obj = referenceQueue.remove();
- if (obj instanceof Ref.State)
- {
- ((Ref.State) obj).release(true);
- }
- }
- }
- catch (InterruptedException e)
- {
- }
- finally
- {
- EXEC.execute(this);
- }
+ ((Ref.State) obj).release(true);
}
}
@@ -719,4 +703,11 @@ public final class Ref<T> implements RefCounted<T>
}
}
}
+
+ @VisibleForTesting
+ public static void shutdownReferenceReaper() throws InterruptedException
+ {
+ EXEC.shutdown();
+ EXEC.awaitTermination(60, TimeUnit.SECONDS);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/utils/memory/BufferPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPool.java b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
index f9ec40c..c8ad078 100644
--- a/src/java/org/apache/cassandra/utils/memory/BufferPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
@@ -27,11 +27,11 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.concurrent.InfiniteLoopExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.FastThreadLocal;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.util.FileUtils;
@@ -493,34 +493,16 @@ public class BufferPool
private static final ConcurrentLinkedQueue<LocalPoolRef> localPoolReferences = new ConcurrentLinkedQueue<>();
private static final ReferenceQueue<Object> localPoolRefQueue = new ReferenceQueue<>();
- private static final ExecutorService EXEC = Executors.newFixedThreadPool(1, new NamedThreadFactory("LocalPool-Cleaner"));
- static
+ private static final InfiniteLoopExecutor EXEC = new InfiniteLoopExecutor("LocalPool-Cleaner", BufferPool::cleanupOneReference).start();
+
+ private static void cleanupOneReference() throws InterruptedException
{
- EXEC.execute(new Runnable()
+ Object obj = localPoolRefQueue.remove(100);
+ if (obj instanceof LocalPoolRef)
{
- public void run()
- {
- try
- {
- while (true)
- {
- Object obj = localPoolRefQueue.remove();
- if (obj instanceof LocalPoolRef)
- {
- ((LocalPoolRef) obj).release();
- localPoolReferences.remove(obj);
- }
- }
- }
- catch (InterruptedException e)
- {
- }
- finally
- {
- EXEC.execute(this);
- }
- }
- });
+ ((LocalPoolRef) obj).release();
+ localPoolReferences.remove(obj);
+ }
}
private static ByteBuffer allocateDirectAligned(int capacity)
@@ -872,4 +854,11 @@ public class BufferPool
int mask = unit - 1;
return (size + mask) & ~mask;
}
+
+ @VisibleForTesting
+ public static void shutdownLocalCleaner() throws InterruptedException
+ {
+ EXEC.shutdown();
+ EXEC.awaitTermination(60, TimeUnit.SECONDS);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java b/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java
index 5a90463..b905d2c 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java
@@ -18,57 +18,70 @@
*/
package org.apache.cassandra.utils.memory;
+import org.apache.cassandra.concurrent.InfiniteLoopExecutor;
import org.apache.cassandra.utils.concurrent.WaitQueue;
/**
* A thread that reclaims memory from a MemtablePool on demand. The actual reclaiming work is delegated to the
* cleaner Runnable, e.g., FlushLargestColumnFamily
*/
-class MemtableCleanerThread<P extends MemtablePool> extends Thread
+public class MemtableCleanerThread<P extends MemtablePool> extends InfiniteLoopExecutor
{
- /** The pool we're cleaning */
- final P pool;
-
- /** should ensure that at least some memory has been marked reclaiming after completion */
- final Runnable cleaner;
+ private static class Clean<P extends MemtablePool> implements InterruptibleRunnable
+ {
+ /** The pool we're cleaning */
+ final P pool;
- /** signalled whenever needsCleaning() may return true */
- final WaitQueue wait = new WaitQueue();
+ /** should ensure that at least some memory has been marked reclaiming after completion */
+ final Runnable cleaner;
- MemtableCleanerThread(P pool, Runnable cleaner)
- {
- super(pool.getClass().getSimpleName() + "Cleaner");
- this.pool = pool;
- this.cleaner = cleaner;
- setDaemon(true);
- }
+ /** signalled whenever needsCleaning() may return true */
+ final WaitQueue wait = new WaitQueue();
- boolean needsCleaning()
- {
- return pool.onHeap.needsCleaning() || pool.offHeap.needsCleaning();
- }
+ private Clean(P pool, Runnable cleaner)
+ {
+ this.pool = pool;
+ this.cleaner = cleaner;
+ }
- // should ONLY be called when we really think it already needs cleaning
- void trigger()
- {
- wait.signal();
- }
+ boolean needsCleaning()
+ {
+ return pool.onHeap.needsCleaning() || pool.offHeap.needsCleaning();
+ }
- @Override
- public void run()
- {
- while (true)
+ @Override
+ public void run() throws InterruptedException
{
- while (!needsCleaning())
+ if (needsCleaning())
+ {
+ cleaner.run();
+ }
+ else
{
final WaitQueue.Signal signal = wait.register();
if (!needsCleaning())
- signal.awaitUninterruptibly();
+ signal.await();
else
signal.cancel();
}
-
- cleaner.run();
}
}
+
+ private final Runnable trigger;
+ private MemtableCleanerThread(Clean<P> clean)
+ {
+ super(clean.pool.getClass().getSimpleName() + "Cleaner", clean);
+ this.trigger = clean.wait::signal;
+ }
+
+ MemtableCleanerThread(P pool, Runnable cleaner)
+ {
+ this(new Clean<>(pool, cleaner));
+ }
+
+ // should ONLY be called when we really think it already needs cleaning
+ public void trigger()
+ {
+ trigger.run();
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
index c082856..684db93 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
@@ -18,8 +18,11 @@
*/
package org.apache.cassandra.utils.memory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import com.google.common.annotations.VisibleForTesting;
+
import com.codahale.metrics.Timer;
import org.apache.cassandra.metrics.CassandraMetricsRegistry;
import org.apache.cassandra.metrics.DefaultNameFactory;
@@ -63,6 +66,12 @@ public abstract class MemtablePool
return cleaner == null ? null : new MemtableCleanerThread<>(this, cleaner);
}
+ public void shutdown() throws InterruptedException
+ {
+ cleaner.shutdown();
+ cleaner.awaitTermination(60, TimeUnit.SECONDS);
+ }
+
public abstract MemtableAllocator newAllocator();
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/conf/logback-dtest.xml
----------------------------------------------------------------------
diff --git a/test/conf/logback-dtest.xml b/test/conf/logback-dtest.xml
new file mode 100644
index 0000000..b8019f6
--- /dev/null
+++ b/test/conf/logback-dtest.xml
@@ -0,0 +1,79 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<configuration debug="false" scan="true" scanPeriod="60 seconds">
+ <define name="instance_id" class="org.apache.cassandra.distributed.InstanceIDDefiner" />
+
+ <!-- Shutdown hook ensures that async appender flushes -->
+ <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
+
+ <!-- Status listener is used to wrap stdout/stderr and tee to log file -->
+ <statusListener class="org.apache.cassandra.LogbackStatusListener" />
+
+ <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
+
+ <file>./build/test/logs/${cassandra.testtag}/TEST-${suitename}.log</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+ <fileNamePattern>./build/test/logs/${cassandra.testtag}/TEST-${suitename}.log.%i.gz</fileNamePattern>
+ <minIndex>1</minIndex>
+ <maxIndex>20</maxIndex>
+ </rollingPolicy>
+
+ <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ <maxFileSize>20MB</maxFileSize>
+ </triggeringPolicy>
+
+ <encoder>
+ <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %msg%n</pattern>
+ <immediateFlush>false</immediateFlush>
+ </encoder>
+ </appender>
+
+ <appender name="STDOUT" target="System.out" class="org.apache.cassandra.ConsoleAppender">
+ <encoder>
+ <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %F:%L - %msg%n</pattern>
+ </encoder>
+ <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+ <level>DEBUG</level>
+ </filter>
+ </appender>
+
+ <appender name="TEE" class="org.apache.cassandra.TeeingAppender">
+ <appender-ref ref="FILE"/>
+ <appender-ref ref="STDOUT"/>
+ </appender>
+
+ <logger name="org.apache.hadoop" level="WARN"/>
+
+ <logger name="org.apache.cassandra.db.monitoring" level="DEBUG"/>
+
+ <!-- Do not change the name of this appender. LogbackStatusListener uses the thread name
+ tied to the appender name to know when to write to real stdout/stderr vs forwarding to logback -->
+ <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
+ <discardingThreshold>0</discardingThreshold>
+ <maxFlushTime>0</maxFlushTime>
+ <queueSize>1024</queueSize>
+ <appender-ref ref="TEE"/>
+ <includeCallerData>true</includeCallerData>
+ </appender>
+
+ <root level="DEBUG">
+ <appender-ref ref="ASYNC" />
+ </root>
+</configuration>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/Coordinator.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/Coordinator.java b/test/distributed/org/apache/cassandra/distributed/Coordinator.java
new file mode 100644
index 0000000..91ab480
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/Coordinator.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+
+public class Coordinator
+{
+ final Instance instance;
+
+ public Coordinator(Instance instance)
+ {
+ this.instance = instance;
+ }
+
+ private static Object[][] coordinatorExecute(String query, int consistencyLevel, Object[] bindings)
+ {
+ CQLStatement prepared = QueryProcessor.getStatement(query, ClientState.forInternalCalls());
+ List<ByteBuffer> boundValues = new ArrayList<>();
+ for (Object binding : bindings)
+ {
+ boundValues.add(ByteBufferUtil.objectToBytes(binding));
+ }
+
+ ResultMessage res = prepared.execute(QueryState.forInternalCalls(),
+ QueryOptions.create(ConsistencyLevel.fromCode(consistencyLevel),
+ boundValues,
+ false,
+ 10,
+ null,
+ null,
+ ProtocolVersion.V4,
+ null),
+ System.nanoTime());
+
+ if (res != null && res.kind == ResultMessage.Kind.ROWS)
+ {
+ return RowUtil.toObjects((ResultMessage.Rows) res);
+ }
+ else
+ {
+ return new Object[][]{};
+ }
+ }
+
+ public Object[][] execute(String query, ConsistencyLevel consistencyLevel, Object... boundValues)
+ {
+ return instance.appliesOnInstance(Coordinator::coordinatorExecute).apply(query, consistencyLevel.code, boundValues);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/DistributedReadWritePathTest.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/DistributedReadWritePathTest.java b/test/distributed/org/apache/cassandra/distributed/DistributedReadWritePathTest.java
new file mode 100644
index 0000000..a61c8af
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/DistributedReadWritePathTest.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+
+import static org.apache.cassandra.net.MessagingService.Verb.READ_REPAIR;
+
+public class DistributedReadWritePathTest extends DistributedTestBase
+{
+ @Test
+ public void coordinatorRead() throws Throwable
+ {
+ try (TestCluster cluster = createCluster(3))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'");
+
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)");
+ cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)");
+
+ assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
+ ConsistencyLevel.ALL,
+ 1),
+ row(1, 1, 1),
+ row(1, 2, 2),
+ row(1, 3, 3));
+ }
+ }
+
+ @Test
+ public void coordinatorWrite() throws Throwable
+ {
+ try (TestCluster cluster = createCluster(3))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'");
+
+ cluster.coordinator().execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)",
+ ConsistencyLevel.QUORUM);
+
+ for (int i = 0; i < 3; i++)
+ {
+ assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
+ row(1, 1, 1));
+ }
+
+ assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+ ConsistencyLevel.QUORUM),
+ row(1, 1, 1));
+ }
+ }
+
+ @Test
+ public void readRepairTest() throws Throwable
+ {
+ try (TestCluster cluster = createCluster(3))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
+
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+
+ assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
+
+ assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+ ConsistencyLevel.QUORUM),
+ row(1, 1, 1));
+
+ // Verify that data got repaired to the third node
+ assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
+ row(1, 1, 1));
+ }
+ }
+
+ @Test
+ public void failingReadRepairTest() throws Throwable
+ {
+ try (TestCluster cluster = createCluster(3))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
+
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+
+ assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
+
+ cluster.verbs(READ_REPAIR).to(3).drop();
+ assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+ ConsistencyLevel.QUORUM),
+ row(1, 1, 1));
+
+ // Data was not repaired
+ assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
+ }
+ }
+
+ @Test
+ public void writeWithSchemaDisagreement() throws Throwable
+ {
+ try (TestCluster cluster = createCluster(3))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
+
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+ cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+
+ // Introduce schema disagreement
+ cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
+
+ Exception thrown = null;
+ try
+ {
+ cluster.coordinator().execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
+ ConsistencyLevel.QUORUM);
+ }
+ catch (RuntimeException e)
+ {
+ thrown = e;
+ }
+
+ Assert.assertTrue(thrown.getMessage().contains("Exception occurred on the node"));
+ Assert.assertTrue(thrown.getCause().getMessage().contains("Unknown column v2 during deserialization"));
+ }
+ }
+
+ @Test
+ public void readWithSchemaDisagreement() throws Throwable
+ {
+ try (TestCluster cluster = createCluster(3))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
+
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+ cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+
+ // Introduce schema disagreement
+ cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
+
+ Exception thrown = null;
+ try
+ {
+ assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+ ConsistencyLevel.ALL),
+ row(1, 1, 1, null));
+ }
+ catch (Exception e)
+ {
+ thrown = e;
+ }
+ Assert.assertTrue(thrown.getMessage().contains("Exception occurred on the node"));
+ Assert.assertTrue(thrown.getCause().getMessage().contains("Unknown column v2 during deserialization"));
+ }
+ }
+
+ @Test
+ public void reAddColumnAsStatic() throws Throwable
+ {
+ try (TestCluster cluster = createCluster(3))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
+
+ for (int i = 1; i <= 3; i++)
+ {
+ cluster.coordinator().execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, ?, ?)",
+ ConsistencyLevel.ALL,
+ 1, i, i);
+ }
+
+ // Drop column
+ cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl DROP v1");
+
+ assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+ ConsistencyLevel.ALL),
+ row(1, 1),
+ row(1, 2),
+ row(1, 3));
+
+ // Drop column
+ cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v1 int static");
+
+ assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+ ConsistencyLevel.ALL),
+ row(1, 1, null),
+ row(1, 2, null),
+ row(1, 3, null));
+
+ cluster.coordinator().execute("INSERT INTO " + KEYSPACE + ".tbl (pk, v1) VALUES (?, ?)",
+ ConsistencyLevel.ALL,
+ 1, 1);
+
+ assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+ ConsistencyLevel.ALL),
+ row(1, 1, 1),
+ row(1, 2, 1),
+ row(1, 3, 1));
+ }
+ }
+
+ @Test
+ public void reAddColumnAsStaticDisagreementCoordinatorSide() throws Throwable
+ {
+ try (TestCluster cluster = createCluster(3))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
+
+ for (int i = 1; i <= 3; i++)
+ {
+ cluster.coordinator().execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, ?, ?)",
+ ConsistencyLevel.ALL,
+ 1, i, i);
+ }
+
+ // Drop column
+ cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl DROP v1", 1);
+
+ Exception thrown = null;
+ try
+ {
+ cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+ ConsistencyLevel.ALL);
+ }
+ catch (Exception e)
+ {
+ thrown = e;
+ }
+
+ Assert.assertTrue(thrown.getCause().getMessage().contains("[v1] is not a subset of"));
+
+ cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v1 int static", 1);
+
+ try
+ {
+ cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+ ConsistencyLevel.ALL);
+ }
+ catch (Exception e)
+ {
+ thrown = e;
+ }
+
+ Assert.assertTrue(thrown.getCause().getMessage().contains("[v1] is not a subset of"));
+ }
+ }
+
+ @Test
+ public void reAddColumnAsStaticDisagreementReplicaSide() throws Throwable
+ {
+ try (TestCluster cluster = createCluster(2))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
+
+ for (int i = 1; i <= 3; i++)
+ {
+ cluster.coordinator().execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, ?, ?)",
+ ConsistencyLevel.ALL,
+ 1, i, i);
+ }
+
+ // Drop column on the replica
+ cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl DROP v1", 2);
+
+ // Columns are going to be read and read-repaired as long as they're available
+ assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+ ConsistencyLevel.ALL),
+ row(1, 1, 1),
+ row(1, 2, 2),
+ row(1, 3, 3));
+
+ assertRows(cluster.get(2).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
+ row(1, 1),
+ row(1, 2),
+ row(1, 3));
+
+ // Re-add as static on the replica
+ cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v1 int static", 2);
+
+ // Try reading
+ assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+ ConsistencyLevel.ALL),
+ row(1, 1, 1),
+ row(1, 2, 2),
+ row(1, 3, 3));
+
+ // Make sure read-repair did not corrupt the data
+ assertRows(cluster.get(2).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
+ row(1, 1, null),
+ row(1, 2, null),
+ row(1, 3, null));
+
+ // Writing to the replica with disagreeing schema should not work
+ Exception thrown = null;
+ try
+ {
+ cluster.coordinator().execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, ?, ?)",
+ ConsistencyLevel.ALL,
+ 1, 1, 5);
+ }
+ catch (Exception e)
+ {
+ thrown = e;
+ }
+
+ Assert.assertNotNull(thrown);
+
+ thrown = null;
+
+ // If somehow replica got new data, reading that data should not be possible, either
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, ?, ?)",
+ 1, 1, 100);
+
+ try
+ {
+ assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+ ConsistencyLevel.ALL),
+ row(1, 1, 1),
+ row(1, 2, 2),
+ row(1, 3, 3));
+ }
+ catch (Exception e)
+ {
+ thrown = e;
+ }
+
+ Assert.assertNotNull(thrown);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/DistributedTestBase.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/DistributedTestBase.java b/test/distributed/org/apache/cassandra/distributed/DistributedTestBase.java
new file mode 100644
index 0000000..f873fce
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/DistributedTestBase.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import java.util.Arrays;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+
+public class DistributedTestBase
+{
+ static String KEYSPACE = "distributed_test_keyspace";
+
+ @BeforeClass
+ public static void setup()
+ {
+ System.setProperty("org.apache.cassandra.disable_mbean_registration", "true");
+ }
+
+ TestCluster createCluster(int nodeCount) throws Throwable
+ {
+ TestCluster cluster = TestCluster.create(nodeCount);
+ cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + nodeCount + "};");
+
+ return cluster;
+ }
+
+ public static void assertRows(Object[][] actual, Object[]... expected)
+ {
+ Assert.assertEquals(rowsNotEqualErrorMessage(expected, actual),
+ expected.length, actual.length);
+
+ for (int i = 0; i < expected.length; i++)
+ {
+ Object[] expectedRow = expected[i];
+ Object[] actualRow = actual[i];
+ Assert.assertTrue(rowsNotEqualErrorMessage(actual, expected),
+ Arrays.equals(expectedRow, actualRow));
+ }
+ }
+
+ public static String rowsNotEqualErrorMessage(Object[][] actual, Object[][] expected)
+ {
+ return String.format("Expected: %s\nActual:%s\n",
+ rowsToString(expected),
+ rowsToString(actual));
+ }
+
+ public static String rowsToString(Object[][] rows)
+ {
+ StringBuilder builder = new StringBuilder();
+ builder.append("[");
+ boolean isFirst = true;
+ for (Object[] row : rows)
+ {
+ if (isFirst)
+ isFirst = false;
+ else
+ builder.append(",");
+ builder.append(Arrays.toString(row));
+ }
+ builder.append("]");
+ return builder.toString();
+ }
+
+ public static Object[] row(Object... expected)
+ {
+ return expected;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/Instance.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/Instance.java b/test/distributed/org/apache/cassandra/distributed/Instance.java
new file mode 100644
index 0000000..f9ee5bb
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/Instance.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+
+import org.apache.cassandra.batchlog.BatchlogManager;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.concurrent.SharedExecutorPool;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QueryHandler;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.hints.HintsService;
+import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.SimpleSeedProvider;
+import org.apache.cassandra.locator.SimpleSnitch;
+import org.apache.cassandra.net.IMessageSink;
+import org.apache.cassandra.net.MessageDeliveryTask;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.async.MessageInHandler;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.PendingRangeCalculatorService;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.concurrent.Ref;
+import org.apache.cassandra.utils.memory.BufferPool;
+
+public class Instance extends InvokableInstance
+{
+ public final InstanceConfig config;
+
+ public Instance(InstanceConfig config, ClassLoader classLoader)
+ {
+ super(classLoader);
+ this.config = config;
+ }
+
+ public InetAddressAndPort getBroadcastAddress() { return callOnInstance(FBUtilities::getBroadcastAddressAndPort); }
+
+ public Object[][] executeInternal(String query, Object... args)
+ {
+ return callOnInstance(() ->
+ {
+ QueryHandler.Prepared prepared = QueryProcessor.prepareInternal(query);
+ ResultMessage result = prepared.statement.executeLocally(QueryProcessor.internalQueryState(),
+ QueryProcessor.makeInternalOptions(prepared.statement, args));
+
+ if (result instanceof ResultMessage.Rows)
+ return RowUtil.toObjects((ResultMessage.Rows)result);
+ else
+ return null;
+ });
+ }
+
+ public UUID getSchemaVersion()
+ {
+ // we do not use method reference syntax here, because we need to invoke on the node-local schema instance
+ //noinspection Convert2MethodRef
+ return callOnInstance(() -> Schema.instance.getVersion());
+ }
+
+ public void schemaChange(String query)
+ {
+ runOnInstance(() ->
+ {
+ try
+ {
+ ClientState state = ClientState.forInternalCalls(SchemaConstants.SYSTEM_KEYSPACE_NAME);
+ QueryState queryState = new QueryState(state);
+
+ CQLStatement statement = QueryProcessor.parseStatement(query, queryState.getClientState());
+ statement.validate(state);
+
+ QueryOptions options = QueryOptions.forInternalCalls(Collections.emptyList());
+ statement.executeLocally(queryState, options);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Error setting schema for test (query was: " + query + ")", e);
+ }
+ });
+ }
+
+ private void registerMockMessaging(TestCluster cluster)
+ {
+ BiConsumer<InetAddressAndPort, Message> deliverToInstance = (to, message) -> cluster.get(to).receiveMessage(message);
+ BiConsumer<InetAddressAndPort, Message> deliverToInstanceIfNotFiltered = cluster.filters().filter(deliverToInstance);
+
+ acceptsOnInstance((BiConsumer<InetAddressAndPort, Message> deliver) ->
+ MessagingService.instance().addMessageSink(new MessageDeliverySink(deliver))
+ ).accept(deliverToInstanceIfNotFiltered);
+ }
+
+ private static class MessageDeliverySink implements IMessageSink
+ {
+ private final BiConsumer<InetAddressAndPort, Message> deliver;
+ MessageDeliverySink(BiConsumer<InetAddressAndPort, Message> deliver)
+ {
+ this.deliver = deliver;
+ }
+
+ public boolean allowOutgoingMessage(MessageOut messageOut, int id, InetAddressAndPort to)
+ {
+ try (DataOutputBuffer out = new DataOutputBuffer(1024))
+ {
+ InetAddressAndPort from = FBUtilities.getBroadcastAddressAndPort();
+ messageOut.serialize(out, MessagingService.current_version);
+ deliver.accept(to, new Message(messageOut.verb.getId(), out.toByteArray(), id, MessagingService.current_version, from));
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ return false;
+ }
+
+ public boolean allowIncomingMessage(MessageIn message, int id)
+ {
+ // we can filter to our heart's content on the outgoing message; no need to worry about incoming
+ return true;
+ }
+ }
+
+ private void receiveMessage(Message message)
+ {
+ acceptsOnInstance((Message m) ->
+ {
+ try (DataInputBuffer in = new DataInputBuffer(m.bytes))
+ {
+ MessageIn<?> messageIn = MessageInHandler.deserialize(in, m.id, m.version, m.from);
+ Runnable deliver = new MessageDeliveryTask(messageIn, m.id);
+ deliver.run();
+ }
+ catch (Throwable t)
+ {
+ throw new RuntimeException("Exception occurred on the node " + FBUtilities.getBroadcastAddressAndPort(), t);
+ }
+
+ }).accept(message);
+ }
+
+ void launch(TestCluster cluster)
+ {
+ try
+ {
+ mkdirs();
+ int id = config.num;
+ runOnInstance(() -> InstanceIDDefiner.instanceId = id); // for logging
+
+ startup();
+ initializeRing(cluster);
+ registerMockMessaging(cluster);
+ }
+ catch (Throwable t)
+ {
+ if (t instanceof RuntimeException)
+ throw (RuntimeException) t;
+ throw new RuntimeException(t);
+ }
+ }
+
+ private void mkdirs()
+ {
+ new File(config.saved_caches_directory).mkdirs();
+ new File(config.hints_directory).mkdirs();
+ new File(config.commitlog_directory).mkdirs();
+ for (String dir : config.data_file_directories)
+ new File(dir).mkdirs();
+ }
+
+ private void startup()
+ {
+ acceptsOnInstance((InstanceConfig config) ->
+ {
+ DatabaseDescriptor.daemonInitialization(() -> loadConfig(config));
+
+ DatabaseDescriptor.createAllDirectories();
+ Keyspace.setInitialized();
+ SystemKeyspace.persistLocalMetadata();
+ }).accept(config);
+ }
+
+
+ public static Config loadConfig(InstanceConfig overrides)
+ {
+ Config config = new Config();
+ // Defaults
+ config.commitlog_sync = Config.CommitLogSync.batch;
+ config.endpoint_snitch = SimpleSnitch.class.getName();
+ config.seed_provider = new ParameterizedClass(SimpleSeedProvider.class.getName(),
+ Collections.singletonMap("seeds", "127.0.0.1:7010"));
+ config.diagnostic_events_enabled = true; // necessary for schema change monitoring
+
+ // Overrides
+ config.partitioner = overrides.partitioner;
+ config.broadcast_address = overrides.broadcast_address;
+ config.listen_address = overrides.listen_address;
+ config.broadcast_rpc_address = overrides.broadcast_rpc_address;
+ config.rpc_address = overrides.rpc_address;
+ config.saved_caches_directory = overrides.saved_caches_directory;
+ config.data_file_directories = overrides.data_file_directories;
+ config.commitlog_directory = overrides.commitlog_directory;
+ config.hints_directory = overrides.hints_directory;
+ config.cdc_raw_directory = overrides.cdc_directory;
+ config.concurrent_writes = overrides.concurrent_writes;
+ config.concurrent_counter_writes = overrides.concurrent_counter_writes;
+ config.concurrent_materialized_view_writes = overrides.concurrent_materialized_view_writes;
+ config.concurrent_reads = overrides.concurrent_reads;
+ config.memtable_flush_writers = overrides.memtable_flush_writers;
+ config.concurrent_compactors = overrides.concurrent_compactors;
+ config.memtable_heap_space_in_mb = overrides.memtable_heap_space_in_mb;
+ config.initial_token = overrides.initial_token;
+ return config;
+ }
+
+ private void initializeRing(TestCluster cluster)
+ {
+ // This should be done outside instance in order to avoid serializing config
+ String partitionerName = config.partitioner;
+ List<String> initialTokens = new ArrayList<>();
+ List<InetAddressAndPort> hosts = new ArrayList<>();
+ List<UUID> hostIds = new ArrayList<>();
+ for (int i = 1 ; i <= cluster.size() ; ++i)
+ {
+ InstanceConfig config = cluster.get(i).config;
+ initialTokens.add(config.initial_token);
+ try
+ {
+ hosts.add(InetAddressAndPort.getByName(config.broadcast_address));
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ hostIds.add(config.hostId);
+ }
+
+ runOnInstance(() ->
+ {
+ try
+ {
+ IPartitioner partitioner = FBUtilities.newPartitioner(partitionerName);
+ StorageService storageService = StorageService.instance;
+ List<Token> tokens = new ArrayList<>();
+ for (String token : initialTokens)
+ tokens.add(partitioner.getTokenFactory().fromString(token));
+
+ for (int i = 0; i < tokens.size(); i++)
+ {
+ InetAddressAndPort ep = hosts.get(i);
+ Gossiper.instance.initializeNodeUnsafe(ep, hostIds.get(i), 1);
+ Gossiper.instance.injectApplicationState(ep,
+ ApplicationState.TOKENS,
+ new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(tokens.get(i))));
+ storageService.onChange(ep,
+ ApplicationState.STATUS_WITH_PORT,
+ new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(tokens.get(i))));
+ storageService.onChange(ep,
+ ApplicationState.STATUS,
+ new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(tokens.get(i))));
+ Gossiper.instance.realMarkAlive(ep, Gossiper.instance.getEndpointStateForEndpoint(ep));
+ MessagingService.instance().setVersion(ep, MessagingService.current_version);
+ }
+
+ // check that all nodes are in token metadata
+ for (int i = 0; i < tokens.size(); ++i)
+ assert storageService.getTokenMetadata().isMember(hosts.get(i));
+ }
+ catch (Throwable e) // UnknownHostException
+ {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ void shutdown()
+ {
+ runOnInstance(() -> {
+ Throwable error = null;
+ error = runAndMergeThrowable(error,
+ BatchlogManager.instance::shutdown,
+ HintsService.instance::shutdownBlocking,
+ CommitLog.instance::shutdownBlocking,
+ CompactionManager.instance::forceShutdown,
+ Gossiper.instance::stop,
+ SecondaryIndexManager::shutdownExecutors,
+ MessagingService.instance()::shutdown,
+ ColumnFamilyStore::shutdownFlushExecutor,
+ ColumnFamilyStore::shutdownPostFlushExecutor,
+ ColumnFamilyStore::shutdownReclaimExecutor,
+ ColumnFamilyStore::shutdownPerDiskFlushExecutors,
+ PendingRangeCalculatorService.instance::shutdownExecutor,
+ BufferPool::shutdownLocalCleaner,
+ Ref::shutdownReferenceReaper,
+ StageManager::shutdownAndWait,
+ SharedExecutorPool::shutdownSharedPool,
+ Memtable.MEMORY_POOL::shutdown,
+ ScheduledExecutors::shutdownAndWait);
+ error = shutdownAndWait(error, ActiveRepairService.repairCommandExecutor);
+ Throwables.maybeFail(error);
+ });
+ }
+
+ private static Throwable shutdownAndWait(Throwable existing, ExecutorService executor)
+ {
+ return runAndMergeThrowable(existing, () -> {
+ executor.shutdownNow();
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ assert executor.isTerminated() && executor.isShutdown() : executor;
+ });
+ }
+
+ private static Throwable runAndMergeThrowable(Throwable existing, ThrowingRunnable runnable)
+ {
+ try
+ {
+ runnable.run();
+ }
+ catch (Throwable t)
+ {
+ return Throwables.merge(existing, t);
+ }
+
+ return existing;
+ }
+
+ private static Throwable runAndMergeThrowable(Throwable existing, ThrowingRunnable ... runnables)
+ {
+ for (ThrowingRunnable runnable : runnables)
+ {
+ try
+ {
+ runnable.run();
+ }
+ catch (Throwable t)
+ {
+ existing = Throwables.merge(existing, t);
+ }
+ }
+ return existing;
+ }
+
+ public static interface ThrowingRunnable
+ {
+ public void run() throws Throwable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/InstanceClassLoader.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/InstanceClassLoader.java b/test/distributed/org/apache/cassandra/distributed/InstanceClassLoader.java
new file mode 100644
index 0000000..6349d5a
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/InstanceClassLoader.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import com.google.common.base.Predicate;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.utils.Pair;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.IntFunction;
+
+public class InstanceClassLoader extends URLClassLoader
+{
+ // Classes that have to be shared between instances, for configuration or returning values
+ private final static Class<?>[] commonClasses = new Class[]
+ {
+ Pair.class,
+ InstanceConfig.class,
+ Message.class,
+ InetAddressAndPort.class,
+ InvokableInstance.SerializableCallable.class,
+ InvokableInstance.SerializableRunnable.class,
+ InvokableInstance.SerializableConsumer.class,
+ InvokableInstance.SerializableBiConsumer.class,
+ InvokableInstance.SerializableFunction.class,
+ InvokableInstance.SerializableBiFunction.class,
+ InvokableInstance.SerializableTriFunction.class,
+ InvokableInstance.InstanceFunction.class
+ };
+
+ private final int id; // for debug purposes
+ private final ClassLoader commonClassLoader;
+ private final Predicate<String> isCommonClassName;
+
+ InstanceClassLoader(int id, URL[] urls, Predicate<String> isCommonClassName, ClassLoader commonClassLoader)
+ {
+ super(urls, null);
+ this.id = id;
+ this.commonClassLoader = commonClassLoader;
+ this.isCommonClassName = isCommonClassName;
+ }
+
+ @Override
+ public Class<?> loadClass(String name) throws ClassNotFoundException
+ {
+ // Do not share:
+ // * yaml, which is a rare exception because it does mess with loading org.cassandra...Config class instances
+ // * most of the rest of Cassandra classes (unless they were explicitly shared) g
+ if (name.startsWith("org.slf4j") ||
+ name.startsWith("ch.qos.logback") ||
+ name.startsWith("org.yaml") ||
+ (name.startsWith("org.apache.cassandra") && !isCommonClassName.test(name)))
+ return loadClassInternal(name);
+
+ return commonClassLoader.loadClass(name);
+ }
+
+ Class<?> loadClassInternal(String name) throws ClassNotFoundException
+ {
+ synchronized (getClassLoadingLock(name))
+ {
+ // First, check if the class has already been loaded
+ Class<?> c = findLoadedClass(name);
+
+ if (c == null)
+ c = findClass(name);
+
+ return c;
+ }
+ }
+
+ public static IntFunction<ClassLoader> createFactory(URLClassLoader contextClassLoader)
+ {
+ Set<String> commonClassNames = new HashSet<>();
+ for (Class<?> k : commonClasses)
+ commonClassNames.add(k.getName());
+
+ URL[] urls = contextClassLoader.getURLs();
+ return id -> new InstanceClassLoader(id, urls, commonClassNames::contains, contextClassLoader);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/InstanceConfig.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/InstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/InstanceConfig.java
new file mode 100644
index 0000000..49c2e1f
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/InstanceConfig.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.UUID;
+
+public class InstanceConfig implements Serializable
+{
+ public final int num;
+ public final UUID hostId =java.util.UUID.randomUUID();
+ public final String partitioner = "org.apache.cassandra.dht.Murmur3Partitioner";
+ public final String broadcast_address;
+ public final String listen_address;
+ public final String broadcast_rpc_address;
+ public final String rpc_address;
+ public final String saved_caches_directory;
+ public final String[] data_file_directories;
+ public final String commitlog_directory;
+ public final String hints_directory;
+ public final String cdc_directory;
+ public final int concurrent_writes = 2;
+ public final int concurrent_counter_writes = 2;
+ public final int concurrent_materialized_view_writes = 2;
+ public final int concurrent_reads = 2;
+ public final int memtable_flush_writers = 1;
+ public final int concurrent_compactors = 1;
+ public final int memtable_heap_space_in_mb = 10;
+ public final String initial_token;
+
+ private InstanceConfig(int num,
+ String broadcast_address,
+ String listen_address,
+ String broadcast_rpc_address,
+ String rpc_address,
+ String saved_caches_directory,
+ String[] data_file_directories,
+ String commitlog_directory,
+ String hints_directory,
+ String cdc_directory,
+ String initial_token)
+ {
+ this.num = num;
+ this.broadcast_address = broadcast_address;
+ this.listen_address = listen_address;
+ this.broadcast_rpc_address = broadcast_rpc_address;
+ this.rpc_address = rpc_address;
+ this.saved_caches_directory = saved_caches_directory;
+ this.data_file_directories = data_file_directories;
+ this.commitlog_directory = commitlog_directory;
+ this.hints_directory = hints_directory;
+ this.cdc_directory = cdc_directory;
+ this.initial_token = initial_token;
+ }
+
+ public static InstanceConfig generate(int nodeNum, File root, String token)
+ {
+ return new InstanceConfig(nodeNum,
+ "127.0.0." + nodeNum,
+ "127.0.0." + nodeNum,
+ "127.0.0." + nodeNum,
+ "127.0.0." + nodeNum,
+ String.format("%s/node%d/saved_caches", root, nodeNum),
+ new String[] { String.format("%s/node%d/data", root, nodeNum) },
+ String.format("%s/node%d/commitlog", root, nodeNum),
+ String.format("%s/node%d/hints", root, nodeNum),
+ String.format("%s/node%d/cdc", root, nodeNum),
+ token);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/InstanceIDDefiner.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/InstanceIDDefiner.java b/test/distributed/org/apache/cassandra/distributed/InstanceIDDefiner.java
new file mode 100644
index 0000000..1167748
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/InstanceIDDefiner.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import ch.qos.logback.core.PropertyDefinerBase;
+
+/**
+ * Used by logback to find/define property value, see logback-dtest.xml
+ */
+public class InstanceIDDefiner extends PropertyDefinerBase
+{
+ // Instantiated per classloader, set by Instance
+ public static int instanceId = -1;
+
+ public String getPropertyValue()
+ {
+ if (instanceId == -1)
+ return "<main>";
+ else
+ return "INSTANCE" + instanceId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/InvokableInstance.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/InvokableInstance.java b/test/distributed/org/apache/cassandra/distributed/InvokableInstance.java
new file mode 100644
index 0000000..f646ae1
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/InvokableInstance.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class InvokableInstance
+{
+ private final ClassLoader classLoader;
+ private final Method deserializeOnInstance;
+
+ public InvokableInstance(ClassLoader classLoader)
+ {
+ this.classLoader = classLoader;
+ try
+ {
+ this.deserializeOnInstance = classLoader.loadClass(InvokableInstance.class.getName()).getDeclaredMethod("deserializeOneObject", byte[].class);
+ }
+ catch (ClassNotFoundException | NoSuchMethodException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public interface SerializableCallable<T> extends Callable<T>, Serializable { public T call(); }
+ public <T> SerializableCallable<T> callsOnInstance(SerializableCallable<T> call) { return (SerializableCallable<T>) transferOneObject(call); }
+ public <T> T callOnInstance(SerializableCallable<T> call) { return callsOnInstance(call).call(); }
+
+ public interface SerializableRunnable extends Runnable, Serializable {}
+ public SerializableRunnable runsOnInstance(SerializableRunnable run) { return (SerializableRunnable) transferOneObject(run); }
+ public void runOnInstance(SerializableRunnable run) { runsOnInstance(run).run(); }
+
+ public interface SerializableConsumer<T> extends Consumer<T>, Serializable {}
+ public <T> SerializableConsumer<T> acceptsOnInstance(SerializableConsumer<T> consumer) { return (SerializableConsumer<T>) transferOneObject(consumer); }
+
+ public interface SerializableBiConsumer<T1, T2> extends BiConsumer<T1, T2>, Serializable {}
+ public <T1, T2> SerializableBiConsumer<T1, T2> acceptsOnInstance(SerializableBiConsumer<T1, T2> consumer) { return (SerializableBiConsumer<T1, T2>) transferOneObject(consumer); }
+
+ public interface SerializableFunction<I, O> extends Function<I, O>, Serializable {}
+ public <I, O> SerializableFunction<I, O> appliesOnInstance(SerializableFunction<I, O> f) { return (SerializableFunction<I, O>) transferOneObject(f); }
+
+ public interface SerializableBiFunction<I1, I2, O> extends BiFunction<I1, I2, O>, Serializable {}
+ public <I1, I2, O> SerializableBiFunction<I1, I2, O> appliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return (SerializableBiFunction<I1, I2, O>) transferOneObject(f); }
+
+ public interface SerializableTriFunction<I1, I2, I3, O> extends Serializable
+ {
+ O apply(I1 i1, I2 i2, I3 i3);
+ }
+ public <I1, I2, I3, O> SerializableTriFunction<I1, I2, I3, O> appliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return (SerializableTriFunction<I1, I2, I3, O>) transferOneObject(f); }
+
+ public interface InstanceFunction<I, O> extends SerializableBiFunction<Instance, I, O> {}
+
+ // E must be a functional interface, and lambda must be implemented by a lambda function
+ public <E extends Serializable> E invokesOnInstance(E lambda)
+ {
+ return (E) transferOneObject(lambda);
+ }
+
+ public Object transferOneObject(Object object)
+ {
+ byte[] bytes = serializeOneObject(object);
+ try
+ {
+ Object onInstance = deserializeOnInstance.invoke(null, bytes);
+ if (onInstance.getClass().getClassLoader() != classLoader)
+ throw new IllegalStateException(onInstance + " seemingly from wrong class loader: " + onInstance.getClass().getClassLoader() + ", but expected " + classLoader);
+
+ return onInstance;
+ }
+ catch (IllegalAccessException | InvocationTargetException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private byte[] serializeOneObject(Object object)
+ {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos))
+ {
+ oos.writeObject(object);
+ oos.close();
+ return baos.toByteArray();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @SuppressWarnings("unused") // called through method invocation
+ public static Object deserializeOneObject(byte[] bytes)
+ {
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ ObjectInputStream ois = new ObjectInputStream(bais);)
+ {
+ return ois.readObject();
+ }
+ catch (IOException | ClassNotFoundException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/Message.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/Message.java b/test/distributed/org/apache/cassandra/distributed/Message.java
new file mode 100644
index 0000000..b5492a2
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/Message.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+// a container for simplifying the method signature for per-instance message handling/delivery
+public class Message
+{
+ public final int verb;
+ public final byte[] bytes;
+ public final int id;
+ public final int version;
+ public final InetAddressAndPort from;
+
+ public Message(int verb, byte[] bytes, int id, int version, InetAddressAndPort from)
+ {
+ this.verb = verb;
+ this.bytes = bytes;
+ this.id = id;
+ this.version = version;
+ this.from = from;
+ }
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[3/3] cassandra git commit: Introduce in-jvm distributed tests
Posted by if...@apache.org.
Introduce in-jvm distributed tests
Patch by Alex Petrov and Benedict Elliott Smith; reviewed by Benedict Elliott Smith and Dinesh Joshi for CASSANDRA-14821.
Co-authored-by: Benedict Elliott Smith <be...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f22fec92
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f22fec92
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f22fec92
Branch: refs/heads/trunk
Commit: f22fec927de7ac291266660c2f34de5b8cc1c695
Parents: 7877035
Author: Alex Petrov <ol...@gmail.com>
Authored: Fri Nov 16 19:41:58 2018 +0100
Committer: Alex Petrov <ol...@gmail.com>
Committed: Fri Nov 16 19:41:58 2018 +0100
----------------------------------------------------------------------
.circleci/config.yml | 14 +-
build.xml | 10 +-
ide/idea-iml-file.xml | 1 +
.../org/apache/cassandra/auth/AuthCache.java | 33 +-
.../cassandra/batchlog/BatchlogManager.java | 48 ++-
.../concurrent/InfiniteLoopExecutor.java | 83 ++++
.../JMXEnabledThreadPoolExecutor.java | 24 +-
.../concurrent/ScheduledExecutors.java | 15 +
.../concurrent/SharedExecutorPool.java | 15 +-
.../cassandra/concurrent/StageManager.java | 10 +
.../cassandra/config/DatabaseDescriptor.java | 8 +-
.../apache/cassandra/cql3/QueryProcessor.java | 6 +-
.../cassandra/db/BlacklistedDirectories.java | 17 +-
.../apache/cassandra/db/ColumnFamilyStore.java | 48 ++-
.../cassandra/db/HintedHandOffManager.java | 15 +-
.../cassandra/db/commitlog/CommitLog.java | 15 +-
.../db/compaction/CompactionManager.java | 18 +-
.../cassandra/diag/DiagnosticEventService.java | 13 +-
.../cassandra/diag/LastEventIdBroadcaster.java | 16 +-
.../apache/cassandra/gms/FailureDetector.java | 14 +-
src/java/org/apache/cassandra/gms/Gossiper.java | 14 +-
.../apache/cassandra/hints/HintsService.java | 15 +-
.../cassandra/index/SecondaryIndexManager.java | 10 +
.../io/sstable/IndexSummaryManager.java | 30 +-
.../cassandra/io/util/DataInputBuffer.java | 8 +-
.../locator/DynamicEndpointSnitch.java | 24 +-
.../cassandra/locator/EndpointSnitchInfo.java | 16 +-
.../metrics/CassandraMetricsRegistry.java | 30 +-
.../cassandra/net/ForwardToContainer.java | 3 +-
.../org/apache/cassandra/net/MessageIn.java | 12 +-
.../apache/cassandra/net/MessagingService.java | 28 +-
.../net/async/ByteBufDataInputPlus.java | 8 +
.../cassandra/net/async/MessageInHandler.java | 67 +++-
.../net/async/MessageInHandlerPre40.java | 50 ++-
.../cassandra/net/async/NettyFactory.java | 12 +-
.../apache/cassandra/schema/SchemaEvent.java | 5 +-
.../cassandra/service/ActiveRepairService.java | 15 +-
.../apache/cassandra/service/CacheService.java | 16 +-
.../cassandra/service/CassandraDaemon.java | 11 +-
.../service/PendingRangeCalculatorService.java | 9 +
.../apache/cassandra/service/StorageProxy.java | 15 +-
.../cassandra/service/StorageService.java | 16 +-
.../apache/cassandra/utils/ByteBufferUtil.java | 26 ++
.../apache/cassandra/utils/MBeanWrapper.java | 179 +++++++++
.../org/apache/cassandra/utils/Mx4jTool.java | 4 +-
.../apache/cassandra/utils/concurrent/Ref.java | 35 +-
.../cassandra/utils/memory/BufferPool.java | 43 +-
.../utils/memory/MemtableCleanerThread.java | 77 ++--
.../cassandra/utils/memory/MemtablePool.java | 9 +
test/conf/logback-dtest.xml | 79 ++++
.../cassandra/distributed/Coordinator.java | 80 ++++
.../DistributedReadWritePathTest.java | 348 ++++++++++++++++
.../distributed/DistributedTestBase.java | 86 ++++
.../apache/cassandra/distributed/Instance.java | 399 +++++++++++++++++++
.../distributed/InstanceClassLoader.java | 101 +++++
.../cassandra/distributed/InstanceConfig.java | 87 ++++
.../distributed/InstanceIDDefiner.java | 38 ++
.../distributed/InvokableInstance.java | 133 +++++++
.../apache/cassandra/distributed/Message.java | 41 ++
.../cassandra/distributed/MessageFilters.java | 175 ++++++++
.../apache/cassandra/distributed/RowUtil.java | 47 +++
.../cassandra/distributed/TestCluster.java | 308 ++++++++++++++
.../cassandra/net/async/NettyFactoryTest.java | 2 +-
.../async/StreamCompressionSerializerTest.java | 4 -
64 files changed, 2661 insertions(+), 477 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/.circleci/config.yml
----------------------------------------------------------------------
diff --git a/.circleci/config.yml b/.circleci/config.yml
index 430354a..3b2b978 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -164,12 +164,20 @@ jobs:
# get all of our unit test filenames
set -eo pipefail && circleci tests glob "$HOME/cassandra/test/unit/**/*.java" > /tmp/all_java_unit_tests.txt
+ # append distributed tests
+ set -eo pipefail && circleci tests glob "$HOME/cassandra/test/distributed/**/*.java" > /tmp/all_java_distributed_tests.txt
# split up the unit tests into groups based on the number of containers we have
set -eo pipefail && circleci tests split --split-by=timings --timings-type=filename --index=${CIRCLE_NODE_INDEX} --total=${CIRCLE_NODE_TOTAL} /tmp/all_java_unit_tests.txt > /tmp/java_tests_${CIRCLE_NODE_INDEX}.txt
set -eo pipefail && cat /tmp/java_tests_${CIRCLE_NODE_INDEX}.txt | cut -c 37-1000000 | grep "Test\.java$" > /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt
echo "** /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt"
cat /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt
+
+ set -eo pipefail && circleci tests split --split-by=timings --timings-type=filename --index=${CIRCLE_NODE_INDEX} --total=${CIRCLE_NODE_TOTAL} /tmp/all_java_distributed_tests.txt > /tmp/java_dtests_${CIRCLE_NODE_INDEX}.txt
+ set +eo pipefail && cat /tmp/java_dtests_${CIRCLE_NODE_INDEX}.txt | cut -c 44-1000000 | grep "Test\.java$" > /tmp/java_dtests_${CIRCLE_NODE_INDEX}_final.txt
+ echo "** /tmp/java_dtests_${CIRCLE_NODE_INDEX}_final.txt"
+ cat /tmp/java_dtests_${CIRCLE_NODE_INDEX}_final.txt
+
- run:
name: Run Unit Tests
command: |
@@ -181,7 +189,11 @@ jobs:
time mv ~/cassandra /tmp
cd /tmp/cassandra
- ant testclasslist -Dtest.classlistfile=/tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt
+ ant testclasslist -Dtest.classlistfile=/tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt -Dtest.classlistprefix=unit
+
+ if [ -s "/tmp/java_dtests_${CIRCLE_NODE_INDEX}_final.txt" ]; then
+ ant testclasslist -Dtest.classlistfile=/tmp/java_dtests_${CIRCLE_NODE_INDEX}_final.txt -Dtest.classlistprefix=distributed
+ fi
no_output_timeout: 15m
- store_test_results:
path: /tmp/cassandra/build/test/output/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 3d3014c..f24647e 100644
--- a/build.xml
+++ b/build.xml
@@ -56,12 +56,14 @@
<property name="test.data" value="${test.dir}/data"/>
<property name="test.name" value="*Test"/>
<property name="test.classlistfile" value="testlist.txt"/>
+ <property name="test.classlistprefix" value="unit"/>
<property name="benchmark.name" value=""/>
<property name="test.methods" value=""/>
<property name="test.unit.src" value="${test.dir}/unit"/>
<property name="test.long.src" value="${test.dir}/long"/>
<property name="test.burn.src" value="${test.dir}/burn"/>
<property name="test.microbench.src" value="${test.dir}/microbench"/>
+ <property name="test.distributed.src" value="${test.dir}/distributed"/>
<property name="dist.dir" value="${build.dir}/dist"/>
<property name="tmp.dir" value="${java.io.tmpdir}"/>
@@ -103,6 +105,7 @@
<property name="test.timeout" value="240000" />
<property name="test.long.timeout" value="600000" />
<property name="test.burn.timeout" value="60000000" />
+ <property name="test.distributed.timeout" value="600000" />
<!-- default for cql tests. Can be override by -Dcassandra.test.use_prepared=false -->
<property name="cassandra.test.use_prepared" value="true" />
@@ -1253,6 +1256,7 @@
<src path="${test.long.src}"/>
<src path="${test.burn.src}"/>
<src path="${test.microbench.src}"/>
+ <src path="${test.distributed.src}"/>
</javac>
<!-- Non-java resources needed by the test suite -->
@@ -1364,7 +1368,7 @@
<attribute name="test.file.list"/>
<attribute name="testlist.offset"/>
<sequential>
- <testmacrohelper inputdir="${test.unit.src}" filelist="@{test.file.list}" poffset="@{testlist.offset}" exclude="**/*.java" timeout="${test.timeout}">
+ <testmacrohelper inputdir="${test.dir}/${test.classlistprefix}" filelist="@{test.file.list}" poffset="@{testlist.offset}" exclude="**/*.java" timeout="${test.timeout}">
<jvmarg value="-Dlegacy-sstable-root=${test.data}/legacy-sstables"/>
<jvmarg value="-Dinvalid-legacy-sstable-root=${test.data}/invalid-legacy-sstables"/>
<jvmarg value="-Dcassandra.ring_delay_ms=1000"/>
@@ -1468,6 +1472,7 @@
</concat>
<path id="all-test-classes-path">
<fileset dir="${test.unit.src}" includes="**/${test.name}.java" />
+ <fileset dir="${test.distributed.src}" includes="**/${test.name}.java" />
</path>
<property name="all-test-classes" refid="all-test-classes-path"/>
<testparallel testdelegate="testlist-compression" />
@@ -1844,7 +1849,7 @@
e.g. org/apache/cassandra/hints/HintMessageTest.java -->
<target name="testclasslist" depends="build-test" description="Parallel-run tests given in file -Dtest.classlistfile (one-class-per-line, e.g. org/apache/cassandra/db/SomeTest.java)">
<path id="all-test-classes-path">
- <fileset dir="${test.unit.src}" includesfile="${test.classlistfile}"/>
+ <fileset dir="${test.dir}/${test.classlistprefix}" includesfile="${test.classlistfile}"/>
</path>
<property name="all-test-classes" refid="all-test-classes-path"/>
<testparallel testdelegate="testlist"/>
@@ -1939,6 +1944,7 @@
<classpathentry kind="src" path="conf" including="hotspot_compiler"/>
<classpathentry kind="src" output="build/test/classes" path="test/unit"/>
<classpathentry kind="src" output="build/test/classes" path="test/long"/>
+ <classpathentry kind="src" output="build/test/classes" path="test/distributed"/>
<classpathentry kind="src" output="build/test/classes" path="test/resources" />
<classpathentry kind="src" path="tools/stress/src"/>
<classpathentry kind="src" path="tools/fqltool/src"/>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/ide/idea-iml-file.xml
----------------------------------------------------------------------
diff --git a/ide/idea-iml-file.xml b/ide/idea-iml-file.xml
index b83abfa..0045ae6 100644
--- a/ide/idea-iml-file.xml
+++ b/ide/idea-iml-file.xml
@@ -35,6 +35,7 @@
<sourceFolder url="file://$MODULE_DIR$/test/long" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/test/microbench" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/test/burn" isTestSource="true" />
+ <sourceFolder url="file://$MODULE_DIR$/test/distributed" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/test/resources" type="java-test-resource" />
<sourceFolder url="file://$MODULE_DIR$/test/conf" type="java-test-resource" />
<excludeFolder url="file://$MODULE_DIR$/.idea" />
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/auth/AuthCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/AuthCache.java b/src/java/org/apache/cassandra/auth/AuthCache.java
index 3adf914..4bf15c1 100644
--- a/src/java/org/apache/cassandra/auth/AuthCache.java
+++ b/src/java/org/apache/cassandra/auth/AuthCache.java
@@ -18,22 +18,19 @@
package org.apache.cassandra.auth;
-import java.lang.management.ManagementFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.IntSupplier;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
import com.google.common.util.concurrent.MoreExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.cassandra.utils.MBeanWrapper;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -97,33 +94,17 @@ public class AuthCache<K, V> implements AuthCacheMBean
protected void init()
{
cache = initCache(null);
- try
- {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- mbs.registerMBean(this, getObjectName());
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.registerMBean(this, getObjectName());
}
protected void unregisterMBean()
{
- try
- {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- mbs.unregisterMBean(getObjectName());
- }
- catch (Exception e)
- {
- logger.warn("Error unregistering {} cache mbean", name, e);
- }
+ MBeanWrapper.instance.unregisterMBean(getObjectName(), MBeanWrapper.OnException.LOG);
}
- protected ObjectName getObjectName() throws MalformedObjectNameException
+ protected String getObjectName()
{
- return new ObjectName(MBEAN_NAME_BASE + name);
+ return MBEAN_NAME_BASE + name;
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index 91129ed..b2b851d 100644
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@ -18,28 +18,37 @@
package org.apache.cassandra.batchlog;
import java.io.IOException;
-import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.*;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.RateLimiter;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.Replicas;
-import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.db.partitions.PartitionUpdate;
@@ -51,15 +60,20 @@ import org.apache.cassandra.hints.Hint;
import org.apache.cassandra.hints.HintsService;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.Replicas;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.WriteResponseHandler;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.UUIDGen;
import static com.google.common.collect.Iterables.transform;
@@ -93,15 +107,7 @@ public class BatchlogManager implements BatchlogManagerMBean
public void start()
{
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
batchlogTasks.scheduleWithFixedDelay(this::replayFailedBatches,
StorageService.RING_DELAY,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java b/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java
new file mode 100644
index 0000000..1b8173e
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+public class InfiniteLoopExecutor
+{
+ private static final Logger logger = LoggerFactory.getLogger(InfiniteLoopExecutor.class);
+
+ public interface InterruptibleRunnable
+ {
+ void run() throws InterruptedException;
+ }
+
+ private final Thread thread;
+ private final InterruptibleRunnable runnable;
+ private volatile boolean isShutdown = false;
+
+ public InfiniteLoopExecutor(String name, InterruptibleRunnable runnable)
+ {
+ this.runnable = runnable;
+ this.thread = new Thread(this::loop, name);
+ this.thread.setDaemon(true);
+ }
+
+ private void loop()
+ {
+ while (!isShutdown)
+ {
+ try
+ {
+ runnable.run();
+ }
+ catch (InterruptedException ie)
+ {
+ if (isShutdown)
+ return;
+ logger.error("Interrupted while executing {}, but not shutdown; continuing with loop", runnable, ie);
+ }
+ catch (Throwable t)
+ {
+ logger.error("Exception thrown by runnable, continuing with loop", t);
+ }
+ }
+ }
+
+ public InfiniteLoopExecutor start()
+ {
+ thread.start();
+ return this;
+ }
+
+ public void shutdown()
+ {
+ isShutdown = true;
+ thread.interrupt();
+ }
+
+ public void awaitTermination(long time, TimeUnit unit) throws InterruptedException
+ {
+ thread.join(unit.toMillis(time));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
index 278b399..0e61de9 100644
--- a/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
@@ -17,16 +17,14 @@
*/
package org.apache.cassandra.concurrent;
-import java.lang.management.ManagementFactory;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.TimeUnit;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
import org.apache.cassandra.metrics.ThreadPoolMetrics;
+import org.apache.cassandra.utils.MBeanWrapper;
/**
* This is a wrapper class for the <i>ScheduledThreadPoolExecutor</i>. It provides an implementation
@@ -81,17 +79,8 @@ public class JMXEnabledThreadPoolExecutor extends DebuggableThreadPoolExecutor i
super.prestartAllCoreThreads();
metrics = new ThreadPoolMetrics(this, jmxPath, threadFactory.id).register();
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
mbeanName = "org.apache.cassandra." + jmxPath + ":type=" + threadFactory.id;
-
- try
- {
- mbs.registerMBean(this, new ObjectName(mbeanName));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.registerMBean(this, mbeanName);
}
public JMXEnabledThreadPoolExecutor(int corePoolSize,
@@ -114,14 +103,7 @@ public class JMXEnabledThreadPoolExecutor extends DebuggableThreadPoolExecutor i
private void unregisterMBean()
{
- try
- {
- ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(mbeanName));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.unregisterMBean(mbeanName);
// release metrics
metrics.release();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java b/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
index 22dc769..e51e4c2 100644
--- a/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
+++ b/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
@@ -17,6 +17,11 @@
*/
package org.apache.cassandra.concurrent;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Centralized location for shared executors
*/
@@ -41,4 +46,14 @@ public class ScheduledExecutors
* This executor is used for tasks that do not need to be waited for on shutdown/drain.
*/
public static final DebuggableScheduledThreadPoolExecutor optionalTasks = new DebuggableScheduledThreadPoolExecutor("OptionalTasks");
+
+ @VisibleForTesting
+ public static void shutdownAndWait() throws InterruptedException
+ {
+ ExecutorService[] executors = new ExecutorService[] { scheduledFastTasks, scheduledTasks, nonPeriodicTasks, optionalTasks };
+ for (ExecutorService executor : executors)
+ executor.shutdown();
+ for (ExecutorService executor : executors)
+ executor.awaitTermination(60, TimeUnit.SECONDS);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
index 3b0600f..5352ad7 100644
--- a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
+++ b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
@@ -21,9 +21,12 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.annotations.VisibleForTesting;
+
import static org.apache.cassandra.concurrent.SEPWorker.Work;
/**
@@ -61,7 +64,7 @@ public class SharedExecutorPool
final AtomicLong workerId = new AtomicLong();
// the collection of executors serviced by this pool; periodically ordered by traffic volume
- final List<SEPExecutor> executors = new CopyOnWriteArrayList<>();
+ public final List<SEPExecutor> executors = new CopyOnWriteArrayList<>();
// the number of workers currently in a spinning state
final AtomicInteger spinningCount = new AtomicInteger();
@@ -109,4 +112,14 @@ public class SharedExecutorPool
executors.add(executor);
return executor;
}
+
+ @VisibleForTesting
+ public static void shutdownSharedPool() throws InterruptedException
+ {
+ for (SEPExecutor executor : SHARED.executors)
+ executor.shutdown();
+
+ for (SEPExecutor executor : SHARED.executors)
+ executor.awaitTermination(60, TimeUnit.SECONDS);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java
index c102042..608a005 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.concurrent;
import java.util.EnumMap;
import java.util.concurrent.*;
+import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -112,6 +113,15 @@ public class StageManager
}
}
+ @VisibleForTesting
+ public static void shutdownAndWait() throws InterruptedException
+ {
+ for (Stage stage : Stage.values())
+ StageManager.stages.get(stage).shutdown();
+ for (Stage stage : Stage.values())
+ StageManager.stages.get(stage).awaitTermination(60, TimeUnit.SECONDS);
+ }
+
/**
* The executor used for tracing.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index bc1e5a2..2f5f49f 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -27,6 +27,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -142,6 +143,11 @@ public class DatabaseDescriptor
public static void daemonInitialization() throws ConfigurationException
{
+ daemonInitialization(DatabaseDescriptor::loadConfig);
+ }
+
+ public static void daemonInitialization(Supplier<Config> config) throws ConfigurationException
+ {
if (toolInitialized)
throw new AssertionError("toolInitialization() already called");
if (clientInitialized)
@@ -152,7 +158,7 @@ public class DatabaseDescriptor
return;
daemonInitialized = true;
- setConfig(loadConfig());
+ setConfig(config.get());
applyAll();
AuthConfig.applyAuth();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 45db947..b8ec648 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -162,7 +162,8 @@ public class QueryProcessor implements QueryHandler
SystemKeyspace.resetPreparedStatements();
}
- private static QueryState internalQueryState()
+ @VisibleForTesting
+ public static QueryState internalQueryState()
{
return new QueryState(InternalStateInstance.INSTANCE.clientState);
}
@@ -265,7 +266,8 @@ public class QueryProcessor implements QueryHandler
return null;
}
- private static QueryOptions makeInternalOptions(CQLStatement prepared, Object[] values)
+ @VisibleForTesting
+ public static QueryOptions makeInternalOptions(CQLStatement prepared, Object[] values)
{
return makeInternalOptions(prepared, values, ConsistencyLevel.ONE);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
index f090013..cff9a78 100644
--- a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
+++ b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
@@ -21,18 +21,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.lang.management.ManagementFactory;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.MBeanWrapper;
public class BlacklistedDirectories implements BlacklistedDirectoriesMBean
{
@@ -48,17 +45,7 @@ public class BlacklistedDirectories implements BlacklistedDirectoriesMBean
private BlacklistedDirectories()
{
// Register this instance with JMX
- try
- {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
- }
- catch (Exception e)
- {
- JVMStabilityInspector.inspectThrowable(e);
- logger.error("error registering MBean {}", MBEAN_NAME, e);
- //Allow the server to start even if the bean can't be registered
- }
+ MBeanWrapper.instance.registerMBean(this, MBEAN_NAME, MBeanWrapper.OnException.LOG);
}
public Set<File> getUnreadableDirectories()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 877a3c5..c5149cf 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
-import java.lang.management.ManagementFactory;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
@@ -41,7 +40,6 @@ import com.google.common.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.codahale.metrics.Snapshot;
import org.apache.cassandra.cache.*;
import org.apache.cassandra.concurrent.*;
import org.apache.cassandra.config.*;
@@ -72,7 +70,6 @@ import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.*;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.metrics.Sampler;
import org.apache.cassandra.metrics.Sampler.Sample;
import org.apache.cassandra.metrics.Sampler.SamplerType;
@@ -220,12 +217,33 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
private volatile boolean neverPurgeTombstones = false;
+ public static void shutdownFlushExecutor() throws InterruptedException
+ {
+ flushExecutor.shutdown();
+ flushExecutor.awaitTermination(60, TimeUnit.SECONDS);
+ }
+
+
public static void shutdownPostFlushExecutor() throws InterruptedException
{
postFlushExecutor.shutdown();
postFlushExecutor.awaitTermination(60, TimeUnit.SECONDS);
}
+ public static void shutdownReclaimExecutor() throws InterruptedException
+ {
+ reclaimExecutor.shutdown();
+ reclaimExecutor.awaitTermination(60, TimeUnit.SECONDS);
+ }
+
+ public static void shutdownPerDiskFlushExecutors() throws InterruptedException
+ {
+ for (ExecutorService executorService : perDiskflushExecutors)
+ executorService.shutdown();
+ for (ExecutorService executorService : perDiskflushExecutors)
+ executorService.awaitTermination(60, TimeUnit.SECONDS);
+ }
+
public void reload()
{
// metadata object has been mutated directly. make all the members jibe with new settings.
@@ -425,19 +443,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
oldMBeanName = String.format("org.apache.cassandra.db:type=%s,keyspace=%s,columnfamily=%s",
isIndex() ? "IndexColumnFamilies" : "ColumnFamilies",
keyspace.getName(), name);
- try
- {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- ObjectName[] objectNames = {new ObjectName(mbeanName), new ObjectName(oldMBeanName)};
- for (ObjectName objectName : objectNames)
- {
- mbs.registerMBean(this, objectName);
- }
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+
+ String[] objectNames = {mbeanName, oldMBeanName};
+ for (String objectName : objectNames)
+ MBeanWrapper.instance.registerMBean(this, objectName);
}
else
{
@@ -548,14 +557,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
data.removeUnreadableSSTables(directory);
}
- void unregisterMBean() throws MalformedObjectNameException, InstanceNotFoundException, MBeanRegistrationException
+ void unregisterMBean() throws MalformedObjectNameException
{
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName[] objectNames = {new ObjectName(mbeanName), new ObjectName(oldMBeanName)};
for (ObjectName objectName : objectNames)
{
- if (mbs.isRegistered(objectName))
- mbs.unregisterMBean(objectName);
+ if (MBeanWrapper.instance.isRegistered(objectName))
+ MBeanWrapper.instance.unregisterMBean(objectName);
}
// unregister metrics
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 3279acf..e26f658 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -17,13 +17,10 @@
*/
package org.apache.cassandra.db;
-import java.lang.management.ManagementFactory;
import java.util.List;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
import org.apache.cassandra.hints.HintsService;
+import org.apache.cassandra.utils.MBeanWrapper;
/**
* A proxy class that implement the deprecated legacy HintedHandoffManagerMBean interface.
@@ -44,15 +41,7 @@ public final class HintedHandOffManager implements HintedHandOffManagerMBean
public void registerMBean()
{
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
}
public void deleteHintsForEndpoint(String host)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 6537adc..9d2a369 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -18,19 +18,15 @@
package org.apache.cassandra.db.commitlog;
import java.io.*;
-import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.zip.CRC32;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.db.*;
@@ -49,6 +45,7 @@ import org.apache.cassandra.security.EncryptionContext;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.MBeanWrapper;
import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
import static org.apache.cassandra.db.commitlog.CommitLogSegment.CommitLogSegmentFileComparator;
@@ -82,15 +79,7 @@ public class CommitLog implements CommitLogMBean
{
CommitLog log = new CommitLog(CommitLogArchiver.construct());
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- mbs.registerMBean(log, new ObjectName("org.apache.cassandra.db:type=Commitlog"));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.registerMBean(log, "org.apache.cassandra.db:type=Commitlog");
return log.start();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index e56ed60..bc5a883 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -19,14 +19,11 @@ package org.apache.cassandra.db.compaction;
import java.io.File;
import java.io.IOException;
-import java.lang.management.ManagementFactory;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Collectors;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
@@ -36,7 +33,6 @@ import com.google.common.collect.*;
import com.google.common.util.concurrent.*;
import org.apache.cassandra.locator.RangesAtEndpoint;
-import org.apache.cassandra.locator.Replica;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -119,15 +115,8 @@ public class CompactionManager implements CompactionManagerMBean
static
{
instance = new CompactionManager();
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- mbs.registerMBean(instance, new ObjectName(MBEAN_OBJECT_NAME));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+
+ MBeanWrapper.instance.registerMBean(instance, MBEAN_OBJECT_NAME);
}
private final CompactionExecutor executor = new CompactionExecutor();
@@ -232,6 +221,7 @@ public class CompactionManager implements CompactionManagerMBean
executor.shutdown();
validationExecutor.shutdown();
viewBuildExecutor.shutdown();
+ cacheCleanupExecutor.shutdown();
// interrupt compactions and validations
for (Holder compactionHolder : CompactionMetrics.getCompactions())
@@ -242,7 +232,7 @@ public class CompactionManager implements CompactionManagerMBean
// wait for tasks to terminate
// compaction tasks are interrupted above, so it shuold be fairy quick
// until not interrupted tasks to complete.
- for (ExecutorService exec : Arrays.asList(executor, validationExecutor, viewBuildExecutor))
+ for (ExecutorService exec : Arrays.asList(executor, validationExecutor, viewBuildExecutor, cacheCleanupExecutor))
{
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/diag/DiagnosticEventService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/diag/DiagnosticEventService.java b/src/java/org/apache/cassandra/diag/DiagnosticEventService.java
index 3f3de7c..5953a1d 100644
--- a/src/java/org/apache/cassandra/diag/DiagnosticEventService.java
+++ b/src/java/org/apache/cassandra/diag/DiagnosticEventService.java
@@ -41,6 +41,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.MBeanWrapper;
/**
* Service for publishing and consuming {@link DiagnosticEvent}s.
@@ -62,17 +63,7 @@ public final class DiagnosticEventService implements DiagnosticEventServiceMBean
private DiagnosticEventService()
{
-
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- ObjectName jmxObjectName = new ObjectName("org.apache.cassandra.diag:type=DiagnosticEventService");
- mbs.registerMBean(this, jmxObjectName);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.registerMBean(this,"org.apache.cassandra.diag:type=DiagnosticEventService");
// register broadcasters for JMX events
DiagnosticEventPersistence.start();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java b/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java
index 9fe5c48..8e991e6 100644
--- a/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java
+++ b/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java
@@ -18,21 +18,19 @@
package org.apache.cassandra.diag;
-import java.lang.management.ManagementFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import javax.management.MBeanServer;
import javax.management.Notification;
import javax.management.NotificationBroadcasterSupport;
import javax.management.NotificationFilter;
import javax.management.NotificationListener;
-import javax.management.ObjectName;
import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.progress.jmx.JMXBroadcastExecutor;
/**
@@ -61,16 +59,8 @@ final class LastEventIdBroadcaster extends NotificationBroadcasterSupport implem
super(JMXBroadcastExecutor.executor);
summary.put("last_updated_at", 0L);
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- ObjectName jmxObjectName = new ObjectName("org.apache.cassandra.diag:type=LastEventIdBroadcaster");
- mbs.registerMBean(this, jmxObjectName);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+
+ MBeanWrapper.instance.registerMBean(this, "org.apache.cassandra.diag:type=LastEventIdBroadcaster");
}
public static LastEventIdBroadcaster instance()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/gms/FailureDetector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java
index d7f73ab..4a16f2a 100644
--- a/src/java/org/apache/cassandra/gms/FailureDetector.java
+++ b/src/java/org/apache/cassandra/gms/FailureDetector.java
@@ -21,15 +21,12 @@ import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.nio.file.Path;
import java.io.*;
-import java.lang.management.ManagementFactory;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.*;
@@ -42,6 +39,7 @@ import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
/**
* This FailureDetector is an implementation of the paper titled
@@ -88,15 +86,7 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean
public FailureDetector()
{
// Register this instance with JMX
- try
- {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
}
private static long getInitialValue()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index aedcb04..b789fe7 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.gms;
-import java.lang.management.ManagementFactory;
import java.net.UnknownHostException;
import java.util.*;
import java.util.Map.Entry;
@@ -28,8 +27,6 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Suppliers;
@@ -41,6 +38,7 @@ import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.utils.CassandraVersion;
+import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -248,15 +246,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
// Register this instance with JMX
if (registerJmx)
{
- try
- {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/hints/HintsService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java
index 73840d3..1a352c2 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.hints;
import java.io.File;
-import java.lang.management.ManagementFactory;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Collections;
@@ -29,9 +28,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.apache.cassandra.db.Keyspace;
@@ -51,6 +47,7 @@ import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.MBeanWrapper;
import static com.google.common.collect.Iterables.transform;
@@ -138,15 +135,7 @@ public final class HintsService implements HintsServiceMBean
public void registerMBean()
{
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index c9a7cc6..ec54a65 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -1485,4 +1485,14 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
false);
}
}
+
+ @VisibleForTesting
+ public static void shutdownExecutors() throws InterruptedException
+ {
+ ExecutorService[] executors = new ExecutorService[]{ asyncExecutor, blockingExecutor };
+ for (ExecutorService executor : executors)
+ executor.shutdown();
+ for (ExecutorService executor : executors)
+ executor.awaitTermination(60, TimeUnit.SECONDS);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index b8d236a..3630c2a 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -18,29 +18,32 @@
package org.apache.cassandra.io.sstable;
import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.*;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
-import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
@@ -65,16 +68,7 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
static
{
instance = new IndexSummaryManager();
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-
- try
- {
- mbs.registerMBean(instance, new ObjectName(MBEAN_NAME));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.registerMBean(instance, MBEAN_NAME);
}
private IndexSummaryManager()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/io/util/DataInputBuffer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataInputBuffer.java b/src/java/org/apache/cassandra/io/util/DataInputBuffer.java
index a68dcc2..9df9861 100644
--- a/src/java/org/apache/cassandra/io/util/DataInputBuffer.java
+++ b/src/java/org/apache/cassandra/io/util/DataInputBuffer.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.io.util;
-import java.io.IOException;
import java.nio.ByteBuffer;
/**
@@ -57,14 +56,17 @@ public class DataInputBuffer extends RebufferingInputStream
}
@Override
- protected void reBuffer() throws IOException
+ protected void reBuffer()
{
//nope, we don't rebuffer, we are done!
}
@Override
- public int available() throws IOException
+ public int available()
{
return buffer.remaining();
}
+
+ @Override
+ public void close() {}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
index d35f1fb..ddc8fba 100644
--- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.locator;
-import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
@@ -28,8 +27,6 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.codahale.metrics.ExponentiallyDecayingReservoir;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
import com.codahale.metrics.Snapshot;
import org.apache.cassandra.concurrent.ScheduledExecutors;
@@ -41,6 +38,7 @@ import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
/**
* A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector
@@ -141,15 +139,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
private void registerMBean()
{
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- mbs.registerMBean(this, new ObjectName(mbeanName));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.registerMBean(this, mbeanName);
}
public void close()
@@ -157,15 +147,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
updateSchedular.cancel(false);
resetSchedular.cancel(false);
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- mbs.unregisterMBean(new ObjectName(mbeanName));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.unregisterMBean(mbeanName);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java b/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
index da90a79..d836cd1 100644
--- a/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
+++ b/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
@@ -17,28 +17,16 @@
*/
package org.apache.cassandra.locator;
-
-import java.lang.management.ManagementFactory;
import java.net.UnknownHostException;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
public class EndpointSnitchInfo implements EndpointSnitchInfoMBean
{
public static void create()
{
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- mbs.registerMBean(new EndpointSnitchInfo(), new ObjectName("org.apache.cassandra.db:type=EndpointSnitchInfo"));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.registerMBean(new EndpointSnitchInfo(), "org.apache.cassandra.db:type=EndpointSnitchInfo");
}
public String getDatacenter(String host) throws UnknownHostException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
index 43d6609..74c3367 100644
--- a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
+++ b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.metrics;
-import java.lang.management.ManagementFactory;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Collections;
@@ -26,14 +25,14 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-
-import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
-import com.codahale.metrics.*;
import com.google.common.annotations.VisibleForTesting;
+import com.codahale.metrics.*;
+import org.apache.cassandra.utils.MBeanWrapper;
+
/**
* Makes integrating 3.0 metrics API with 2.0.
* <p>
@@ -45,7 +44,7 @@ public class CassandraMetricsRegistry extends MetricRegistry
public static final CassandraMetricsRegistry Metrics = new CassandraMetricsRegistry();
private final Map<String, ThreadPoolMetrics> threadPoolMetrics = new ConcurrentHashMap<>();
- private final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
+ private final MBeanWrapper mBeanServer = MBeanWrapper.instance;
private CassandraMetricsRegistry()
{
@@ -159,11 +158,7 @@ public class CassandraMetricsRegistry extends MetricRegistry
{
boolean removed = remove(name.getMetricName());
- try
- {
- mBeanServer.unregisterMBean(name.getMBeanName());
- } catch (Exception ignore) {}
-
+ mBeanServer.unregisterMBean(name.getMBeanName(), MBeanWrapper.OnException.IGNORE);
return removed;
}
@@ -194,13 +189,8 @@ public class CassandraMetricsRegistry extends MetricRegistry
else
throw new IllegalArgumentException("Unknown metric type: " + metric.getClass());
- try
- {
- mBeanServer.registerMBean(mbean, name);
- }
- catch (Exception ignored)
- {
- }
+ if (!mBeanServer.isRegistered(name))
+ mBeanServer.registerMBean(mbean, name, MBeanWrapper.OnException.LOG);
}
private void registerAlias(MetricName existingName, MetricName aliasName)
@@ -213,10 +203,8 @@ public class CassandraMetricsRegistry extends MetricRegistry
private void removeAlias(MetricName name)
{
- try
- {
- mBeanServer.unregisterMBean(name.getMBeanName());
- } catch (Exception ignore) {}
+ if (mBeanServer.isRegistered(name.getMBeanName()))
+ MBeanWrapper.instance.unregisterMBean(name.getMBeanName(), MBeanWrapper.OnException.IGNORE);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/ForwardToContainer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/ForwardToContainer.java b/src/java/org/apache/cassandra/net/ForwardToContainer.java
index ac9e725..b22eed6 100644
--- a/src/java/org/apache/cassandra/net/ForwardToContainer.java
+++ b/src/java/org/apache/cassandra/net/ForwardToContainer.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.net;
+import java.io.Serializable;
import java.util.Collection;
import com.google.common.base.Preconditions;
@@ -28,7 +29,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
* Contains forward to information until it can be serialized as part of a message using a version
* specific serialization
*/
-public class ForwardToContainer
+public class ForwardToContainer implements Serializable
{
public final Collection<InetAddressAndPort> targets;
public final int[] messageIds;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/MessageIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java
index 1cd39f3..c8f4bfc 100644
--- a/src/java/org/apache/cassandra/net/MessageIn.java
+++ b/src/java/org/apache/cassandra/net/MessageIn.java
@@ -49,12 +49,12 @@ public class MessageIn<T>
public final int version;
public final long constructionTime;
- private MessageIn(InetAddressAndPort from,
- T payload,
- Map<ParameterType, Object> parameters,
- Verb verb,
- int version,
- long constructionTime)
+ public MessageIn(InetAddressAndPort from,
+ T payload,
+ Map<ParameterType, Object> parameters,
+ Verb verb,
+ int version,
+ long constructionTime)
{
this.from = from;
this.payload = payload;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index c6e8496..761e210 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.net;
import java.io.IOError;
import java.io.IOException;
-import java.lang.management.ManagementFactory;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
@@ -35,8 +34,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
@@ -112,6 +109,7 @@ import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.BooleanSerializer;
import org.apache.cassandra.utils.ExpiringMap;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.NativeLibrary;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.StatusLogger;
@@ -464,6 +462,20 @@ public final class MessagingService implements MessagingServiceMBean
}
}
+ public static IVersionedSerializer<?> getVerbSerializer(Verb verb, int id)
+ {
+ IVersionedSerializer serializer = verbSerializers.get(verb);
+ if (serializer instanceof MessagingService.CallbackDeterminedSerializer)
+ {
+ CallbackInfo callback = MessagingService.instance().getRegisteredCallback(id);
+ if (callback == null)
+ return null;
+
+ serializer = callback.serializer;
+ }
+ return serializer;
+ }
+
/* Lookup table for registering message handlers based on the verb. */
private final Map<Verb, IVerbHandler> verbHandlers;
@@ -618,15 +630,7 @@ public final class MessagingService implements MessagingServiceMBean
if (!testOnly)
{
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java b/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java
index 23e532c..e0be715 100644
--- a/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java
+++ b/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java
@@ -22,6 +22,8 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import org.apache.cassandra.io.util.DataInputPlus;
+import java.io.IOException;
+
public class ByteBufDataInputPlus extends ByteBufInputStream implements DataInputPlus
{
/**
@@ -40,4 +42,10 @@ public class ByteBufDataInputPlus extends ByteBufInputStream implements DataInpu
{
return buf;
}
+
+ @Override
+ public String readUTF() throws IOException
+ {
+ return DataInputStreamPlus.readUTF(this);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/async/MessageInHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/MessageInHandler.java b/src/java/org/apache/cassandra/net/async/MessageInHandler.java
index 0a194d4..dafa993 100644
--- a/src/java/org/apache/cassandra/net/async/MessageInHandler.java
+++ b/src/java/org/apache/cassandra/net/async/MessageInHandler.java
@@ -18,15 +18,17 @@
package org.apache.cassandra.net.async;
-import java.io.DataInputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
+import java.util.function.BooleanSupplier;
import com.google.common.primitives.Ints;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -102,7 +104,7 @@ public class MessageInHandler extends BaseMessageInHandler
{
if (in.readableBytes() < messageHeader.parameterLength)
return;
- readParameters(in, inputPlus, messageHeader.parameterLength, messageHeader.parameters);
+ readParameters(in, inputPlus, messagingVersion, messageHeader.parameterLength, messageHeader.parameters);
}
state = State.READ_PAYLOAD_SIZE;
// fall-through
@@ -134,17 +136,17 @@ public class MessageInHandler extends BaseMessageInHandler
}
}
- private void readParameters(ByteBuf in, ByteBufDataInputPlus inputPlus, int parameterLength, Map<ParameterType, Object> parameters) throws IOException
+ private static void readParameters(ByteBuf buf, DataInputPlus in, int messagingVersion, int parameterLength, Map<ParameterType, Object> parameters) throws IOException
{
// makes the assumption we have all the bytes required to read the headers
- final int endIndex = in.readerIndex() + parameterLength;
- while (in.readerIndex() < endIndex)
+ final int endIndex = buf.readerIndex() + parameterLength;
+ while (buf.readerIndex() < endIndex)
{
- String key = DataInputStream.readUTF(inputPlus);
+ String key = in.readUTF();
ParameterType parameterType = ParameterType.byName.get(key);
- long valueLength = VIntCoding.readUnsignedVInt(in);
+ long valueLength = in.readUnsignedVInt();
byte[] value = new byte[Ints.checkedCast(valueLength)];
- in.readBytes(value);
+ in.readFully(value);
try (DataInputBuffer buffer = new DataInputBuffer(value))
{
parameters.put(parameterType, parameterType.serializer.deserialize(buffer, messagingVersion));
@@ -152,6 +154,55 @@ public class MessageInHandler extends BaseMessageInHandler
}
}
+ private static void readParameters(BooleanSupplier isDone, DataInputPlus in, int messagingVersion, Map<ParameterType, Object> parameters) throws IOException
+ {
+ // makes the assumption we have all the bytes required to read the headers
+ while (!isDone.getAsBoolean())
+ {
+ String key = in.readUTF();
+ ParameterType parameterType = ParameterType.byName.get(key);
+ in.readUnsignedVInt();
+ parameters.put(parameterType, parameterType.serializer.deserialize(in, messagingVersion));
+ }
+ }
+
+ public static MessageIn<?> deserialize(DataInputPlus in, int id, int version, InetAddressAndPort from) throws IOException
+ {
+ if (version >= MessagingService.VERSION_40)
+ return deserialize40(in, id, version, from);
+ else
+ return MessageInHandlerPre40.deserializePre40(in, id, version, from);
+ }
+
+ private static MessageIn<?> deserialize40(DataInputPlus in, int id, int version, InetAddressAndPort from) throws IOException
+ {
+ MessagingService.Verb verb = MessagingService.Verb.fromId(in.readInt());
+
+ Map<ParameterType, Object> parameters = Collections.emptyMap();
+ int parameterLength = (int) in.readUnsignedVInt();
+ if (parameterLength != 0)
+ {
+ parameters = new EnumMap<>(ParameterType.class);
+ byte[] bytes = new byte[parameterLength];
+ in.readFully(bytes);
+ try (DataInputBuffer buffer = new DataInputBuffer(bytes))
+ {
+ readParameters(() -> buffer.available() == 0, buffer, version, parameters);
+ }
+ }
+
+ Object payload = null;
+ int payloadSize = (int) in.readUnsignedVInt();
+ if (payloadSize > 0)
+ {
+ IVersionedSerializer serializer = MessagingService.getVerbSerializer(verb, id);
+ if (serializer == null) in.skipBytesFully(payloadSize);
+ else payload = serializer.deserialize(in, version);
+ }
+
+ return new MessageIn<>(from, payload, parameters, verb, version, System.nanoTime());
+ }
+
@Override
MessageHeader getMessageHeader()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/async/MessageInHandlerPre40.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/MessageInHandlerPre40.java b/src/java/org/apache/cassandra/net/async/MessageInHandlerPre40.java
index f5b6fc4..6eeeea7 100644
--- a/src/java/org/apache/cassandra/net/async/MessageInHandlerPre40.java
+++ b/src/java/org/apache/cassandra/net/async/MessageInHandlerPre40.java
@@ -25,8 +25,11 @@ import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
+import java.util.function.BooleanSupplier;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -155,10 +158,10 @@ public class MessageInHandlerPre40 extends BaseMessageInHandler
if (!canReadNextParam(in))
return false;
- String key = DataInputStream.readUTF(inputPlus);
+ String key = inputPlus.readUTF();
ParameterType parameterType = ParameterType.byName.get(key);
- byte[] value = new byte[in.readInt()];
- in.readBytes(value);
+ byte[] value = new byte[inputPlus.readInt()];
+ inputPlus.readFully(value);
try (DataInputBuffer buffer = new DataInputBuffer(value))
{
parameters.put(parameterType, parameterType.serializer.deserialize(buffer, messagingVersion));
@@ -168,6 +171,47 @@ public class MessageInHandlerPre40 extends BaseMessageInHandler
return true;
}
+ private static boolean readParameters(DataInputPlus in, int messagingVersion, int parameterCount, Map<ParameterType, Object> parameters) throws IOException
+ {
+ // makes the assumption that map.size() is a constant time function (HashMap.size() is)
+ while (parameters.size() < parameterCount)
+ {
+ String key = in.readUTF();
+ ParameterType parameterType = ParameterType.byName.get(key);
+ in.readInt();
+ parameters.put(parameterType, parameterType.serializer.deserialize(in, messagingVersion));
+ }
+
+ return true;
+ }
+
+ static MessageIn<?> deserializePre40(DataInputPlus in, int id, int version, InetAddressAndPort from) throws IOException
+ {
+ assert from.equals(CompactEndpointSerializationHelper.instance.deserialize(in, version));
+ MessagingService.Verb verb = MessagingService.Verb.fromId(in.readInt());
+
+ Map<ParameterType, Object> parameters = Collections.emptyMap();
+ int parameterCount = in.readInt();
+ if (parameterCount != 0)
+ {
+ parameters = new EnumMap<>(ParameterType.class);
+ readParameters(in, version, parameterCount, parameters);
+ }
+
+ Object payload = null;
+ int payloadSize = in.readInt();
+ if (payloadSize > 0)
+ {
+ IVersionedSerializer serializer = MessagingService.getVerbSerializer(verb, id);
+ if (serializer == null) in.skipBytesFully(payloadSize);
+ else payload = serializer.deserialize(in, version);
+ }
+
+ return new MessageIn<>(from, payload, parameters, verb, version, System.nanoTime());
+ }
+
+
+
/**
* Determine if we can read the next parameter from the {@link ByteBuf}. This method will *always* set the {@code in}
* readIndex back to where it was when this method was invoked.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/async/NettyFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/NettyFactory.java b/src/java/org/apache/cassandra/net/async/NettyFactory.java
index 989e33c..2366722 100644
--- a/src/java/org/apache/cassandra/net/async/NettyFactory.java
+++ b/src/java/org/apache/cassandra/net/async/NettyFactory.java
@@ -2,6 +2,7 @@ package org.apache.cassandra.net.async;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
import java.util.zip.Checksum;
import javax.annotation.Nullable;
@@ -384,12 +385,13 @@ public final class NettyFactory
}
}
- public void close()
+ public void close() throws InterruptedException
{
- acceptGroup.shutdownGracefully();
- outboundGroup.shutdownGracefully();
- inboundGroup.shutdownGracefully();
- streamingGroup.shutdownGracefully();
+ EventLoopGroup[] groups = new EventLoopGroup[] { acceptGroup, outboundGroup, inboundGroup, streamingGroup };
+ for (EventLoopGroup group : groups)
+ group.shutdownGracefully();
+ for (EventLoopGroup group : groups)
+ group.awaitTermination(60, TimeUnit.SECONDS);
}
static Lz4FrameEncoder createLz4Encoder(int protocolVersion)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/schema/SchemaEvent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaEvent.java b/src/java/org/apache/cassandra/schema/SchemaEvent.java
index e26cee5..00c8136 100644
--- a/src/java/org/apache/cassandra/schema/SchemaEvent.java
+++ b/src/java/org/apache/cassandra/schema/SchemaEvent.java
@@ -29,13 +29,14 @@ import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.MapDifference;
import org.apache.cassandra.diag.DiagnosticEvent;
import org.apache.cassandra.utils.Pair;
-final class SchemaEvent extends DiagnosticEvent
+public final class SchemaEvent extends DiagnosticEvent
{
private final SchemaEventType type;
@@ -62,7 +63,7 @@ final class SchemaEvent extends DiagnosticEvent
@Nullable
private final MapDifference<String,TableMetadata> indexesDiff;
- enum SchemaEventType
+ public enum SchemaEventType
{
KS_METADATA_LOADED,
KS_METADATA_RELOADED,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index b32f67e..1a54e75 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -18,15 +18,11 @@
package org.apache.cassandra.service;
import java.io.IOException;
-import java.lang.management.ManagementFactory;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
@@ -78,6 +74,7 @@ import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.utils.CassandraVersion;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.UUIDGen;
@@ -169,15 +166,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
.maximumSize(Long.getLong("cassandra.parent_repair_status_cache_size", 100_000))
.build();
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
}
public void start()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index 479470c..5eeaf20 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.service;
import java.io.IOException;
-import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
@@ -27,9 +26,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
import com.google.common.util.concurrent.Futures;
import org.slf4j.Logger;
@@ -53,6 +49,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.Pair;
public class CacheService implements CacheServiceMBean
@@ -88,16 +85,7 @@ public class CacheService implements CacheServiceMBean
private CacheService()
{
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-
- try
- {
- mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
keyCache = initKeyCache();
rowCache = initRowCache();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index f0b2dc1..592419a 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -569,16 +569,7 @@ public class CassandraDaemon
{
applyConfig();
- try
- {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- mbs.registerMBean(new StandardMBean(new NativeAccess(), NativeAccessMBean.class), new ObjectName(MBEAN_NAME));
- }
- catch (Exception e)
- {
- logger.error("error registering MBean {}", MBEAN_NAME, e);
- //Allow the server to start even if the bean can't be registered
- }
+ MBeanWrapper.instance.registerMBean(new StandardMBean(new NativeAccess(), NativeAccessMBean.class), MBEAN_NAME, MBeanWrapper.OnException.LOG);
if (FBUtilities.isWindows)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
index 7b6bd58..a3f6b52 100644
--- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
+++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
@@ -30,6 +30,8 @@ import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.annotations.VisibleForTesting;
+
public class PendingRangeCalculatorService
{
public static final PendingRangeCalculatorService instance = new PendingRangeCalculatorService();
@@ -117,4 +119,11 @@ public class PendingRangeCalculatorService
{
StorageService.instance.getTokenMetadata().calculatePendingRanges(strategy, keyspaceName);
}
+
+ @VisibleForTesting
+ public void shutdownExecutor() throws InterruptedException
+ {
+ executor.shutdown();
+ executor.awaitTermination(60, TimeUnit.SECONDS);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org