You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2019/06/03 15:05:03 UTC
[cassandra] branch cassandra-3.0 updated: Update token metadata for
non-normal state changes
This is an automated email from the ASF dual-hosted git repository.
samt pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
new e4b5d98 Update token metadata for non-normal state changes
e4b5d98 is described below
commit e4b5d9818f003be2b9091c48f8435d29202ffe2d
Author: Benedict Elliott Smith <be...@apache.org>
AuthorDate: Thu May 2 17:24:43 2019 +0100
Update token metadata for non-normal state changes
Patch by Benedict Elliot Smith; reviewed by Sam Tunnicliffe for CASSANDRA-15120
---
CHANGES.txt | 1 +
.../apache/cassandra/concurrent/SEPExecutor.java | 10 +-
.../org/apache/cassandra/concurrent/SEPWorker.java | 33 ++---
.../cassandra/concurrent/SharedExecutorPool.java | 8 +-
.../apache/cassandra/concurrent/StageManager.java | 2 +-
.../org/apache/cassandra/net/MessagingService.java | 10 ++
.../apache/cassandra/service/StorageService.java | 163 ++++++++++++---------
.../org/apache/cassandra/utils/ExpiringMap.java | 5 +
.../org/apache/cassandra/distributed/Cluster.java | 18 ++-
.../cassandra/distributed/UpgradeableCluster.java | 12 +-
.../apache/cassandra/distributed/api/Feature.java | 24 +++
.../cassandra/distributed/api/IInstance.java | 5 +-
.../distributed/impl/AbstractCluster.java | 58 ++++++--
.../impl/DelegatingInvokableInstance.java | 6 +-
.../cassandra/distributed/impl/Instance.java | 46 ++++--
.../distributed/impl/InstanceClassLoader.java | 2 +-
.../distributed/test/DistributedTestBase.java | 1 +
.../cassandra/distributed/test/GossipTest.java | 113 ++++++++++++++
.../cassandra/concurrent/SEPExecutorTest.java | 2 +-
.../org/apache/cassandra/service/MoveTest.java | 11 +-
20 files changed, 393 insertions(+), 137 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 6f6bd70..36eb9c2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.19
+ * Update token metadata when handling MOVING/REMOVING_TOKEN events (CASSANDRA-15120)
* Add ability to customize cassandra log directory using $CASSANDRA_LOG_DIR (CASSANDRA-15090)
* Skip cells with illegal column names when reading legacy sstables (CASSANDRA-15086)
* Fix assorted gossip races and add related runtime checks (CASSANDRA-15059)
diff --git a/src/java/org/apache/cassandra/concurrent/SEPExecutor.java b/src/java/org/apache/cassandra/concurrent/SEPExecutor.java
index 8b12b82..d5c7b14 100644
--- a/src/java/org/apache/cassandra/concurrent/SEPExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/SEPExecutor.java
@@ -174,7 +174,11 @@ public class SEPExecutor extends AbstractLocalAwareExecutorService
long current = permits.get();
int workPermits = workPermits(current);
if (permits.compareAndSet(current, updateWorkPermits(current, workPermits + 1)))
- return;
+ {
+ if (shuttingDown && workPermits + 1 == maxWorkers)
+ shutdown.signalAll();
+ break;
+ }
}
}
@@ -206,7 +210,7 @@ public class SEPExecutor extends AbstractLocalAwareExecutorService
{
shuttingDown = true;
pool.executors.remove(this);
- if (getActiveCount() == 0)
+ if (getActiveCount() == 0 && getPendingTasks() == 0)
shutdown.signalAll();
// release metrics
@@ -219,6 +223,8 @@ public class SEPExecutor extends AbstractLocalAwareExecutorService
List<Runnable> aborted = new ArrayList<>();
while (takeTaskPermit())
aborted.add(tasks.poll());
+ if (getActiveCount() == 0)
+ shutdown.signalAll();
return aborted;
}
diff --git a/src/java/org/apache/cassandra/concurrent/SEPWorker.java b/src/java/org/apache/cassandra/concurrent/SEPWorker.java
index edc31da..f7eb47a 100644
--- a/src/java/org/apache/cassandra/concurrent/SEPWorker.java
+++ b/src/java/org/apache/cassandra/concurrent/SEPWorker.java
@@ -98,7 +98,6 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
// if we do have tasks assigned, nobody will change our state so we can simply set it to WORKING
// (which is also a state that will never be interrupted externally)
set(Work.WORKING);
- boolean shutdown;
while (true)
{
// before we process any task, we maybe schedule a new worker _to our executor only_; this
@@ -111,19 +110,13 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
task = null;
// if we're shutting down, or we fail to take a permit, we don't perform any more work
- if ((shutdown = assigned.shuttingDown) || !assigned.takeTaskPermit())
+ if (!assigned.takeTaskPermit())
break;
task = assigned.tasks.poll();
}
// return our work permit, and maybe signal shutdown
assigned.returnWorkPermit();
- if (shutdown)
- {
- if (assigned.getActiveCount() == 0)
- assigned.shutdown.signalAll();
- return;
- }
assigned = null;
// try to immediately reassign ourselves some work; if we fail, start spinning
@@ -134,22 +127,24 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
catch (Throwable t)
{
JVMStabilityInspector.inspectThrowable(t);
- while (true)
+ if (task != null)
+ logger.error("Failed to execute task, unexpected exception killed worker: {}", t);
+ else
+ logger.error("Unexpected exception killed worker: {}", t);
+ }
+ finally
+ {
+ if (assigned != null)
+ assigned.returnWorkPermit();
+
+ do
{
if (get().assigned != null)
{
- assigned = get().assigned;
+ get().assigned.returnWorkPermit();
set(Work.WORKING);
}
- if (assign(Work.STOPPED, true))
- break;
- }
- if (assigned != null)
- assigned.returnWorkPermit();
- if (task != null)
- logger.error("Failed to execute task, unexpected exception killed worker: {}", t);
- else
- logger.error("Unexpected exception killed worker: {}", t);
+ } while (!assign(Work.STOPPED, true));
}
}
diff --git a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
index d355d77..3997c1a 100644
--- a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
+++ b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.concurrent;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -107,16 +108,17 @@ public class SharedExecutorPool
schedule(Work.SPINNING);
}
- public LocalAwareExecutorService newExecutor(int maxConcurrency, int maxQueuedTasks, String jmxPath, String name)
+ public synchronized LocalAwareExecutorService newExecutor(int maxConcurrency, int maxQueuedTasks, String jmxPath, String name)
{
SEPExecutor executor = new SEPExecutor(this, maxConcurrency, maxQueuedTasks, jmxPath, name);
executors.add(executor);
return executor;
}
- public void shutdown() throws InterruptedException
+ public synchronized void shutdownAndWait() throws InterruptedException
{
shuttingDown = true;
+ List<SEPExecutor> executors = new ArrayList<>(this.executors);
for (SEPExecutor executor : executors)
executor.shutdownNow();
@@ -127,7 +129,7 @@ public class SharedExecutorPool
executor.shutdown.await(until - System.nanoTime(), TimeUnit.NANOSECONDS);
}
- void terminateWorkers()
+ private void terminateWorkers()
{
assert shuttingDown;
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java
index 8603778..2f90a29 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -125,7 +125,7 @@ public class StageManager
public static void shutdownAndWait() throws InterruptedException
{
for (Stage stage : Stage.values())
- StageManager.stages.get(stage).shutdown();
+ StageManager.stages.get(stage).shutdownNow();
for (Stage stage : Stage.values())
StageManager.stages.get(stage).awaitTermination(60, TimeUnit.SECONDS);
}
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index a76df0d..82b26ea 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -816,11 +816,18 @@ public final class MessagingService implements MessagingServiceMBean
*/
public void shutdown()
{
+ shutdown(true);
+ }
+ public void shutdown(boolean gracefully)
+ {
logger.info("Waiting for messaging service to quiesce");
// We may need to schedule hints on the mutation stage, so it's erroneous to shut down the mutation stage first
assert !StageManager.getStage(Stage.MUTATION).isShutdown();
// the important part
+ if (!gracefully)
+ callbacks.reset();
+
if (!callbacks.shutdownBlocking())
logger.warn("Failed to wait for messaging service callbacks shutdown");
@@ -829,6 +836,7 @@ public final class MessagingService implements MessagingServiceMBean
{
clearMessageSinks();
for (SocketThread th : socketThreads)
+ {
try
{
th.close();
@@ -838,6 +846,8 @@ public final class MessagingService implements MessagingServiceMBean
// see https://issues.apache.org/jira/browse/CASSANDRA-10545
handleIOExceptionOnClose(e);
}
+ }
+ connectionManagers.values().forEach(OutboundTcpConnectionPool::close);
}
catch (IOException e)
{
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index a1f361d..4769b22 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -718,7 +718,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return DatabaseDescriptor.isAutoBootstrap() && !SystemKeyspace.bootstrapComplete() && !DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress());
}
- private void prepareToJoin() throws ConfigurationException
+ @VisibleForTesting
+ public void prepareToJoin() throws ConfigurationException
{
if (!joined)
{
@@ -787,7 +788,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
}
- private void joinTokenRing(int delay) throws ConfigurationException
+ @VisibleForTesting
+ public void joinTokenRing(int delay) throws ConfigurationException
{
joined = true;
@@ -2049,6 +2051,85 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
tokenMetadata.updateHostId(Gossiper.instance.getHostId(newNode), newNode);
}
+ private void ensureUpToDateTokenMetadata(String status, InetAddress endpoint)
+ {
+ Set<Token> tokens = new TreeSet<>(getTokensFor(endpoint));
+
+ if (logger.isDebugEnabled())
+ logger.debug("Node {} state {}, tokens {}", endpoint, status, tokens);
+
+ // If the node is previously unknown or tokens do not match, update tokenmetadata to
+ // have this node as 'normal' (it must have been using this token before the
+ // leave). This way we'll get pending ranges right.
+ if (!tokenMetadata.isMember(endpoint))
+ {
+ logger.info("Node {} state jump to {}", endpoint, status);
+ updateTokenMetadata(endpoint, tokens);
+ }
+ else if (!tokens.equals(new TreeSet<>(tokenMetadata.getTokens(endpoint))))
+ {
+ logger.warn("Node {} '{}' token mismatch. Long network partition?", endpoint, status);
+ updateTokenMetadata(endpoint, tokens);
+ }
+ }
+
+ private void updateTokenMetadata(InetAddress endpoint, Iterable<Token> tokens)
+ {
+ updateTokenMetadata(endpoint, tokens, new HashSet<>());
+ }
+
+ private void updateTokenMetadata(InetAddress endpoint, Iterable<Token> tokens, Set<InetAddress> endpointsToRemove)
+ {
+ Set<Token> tokensToUpdateInMetadata = new HashSet<>();
+ Set<Token> tokensToUpdateInSystemKeyspace = new HashSet<>();
+
+ for (final Token token : tokens)
+ {
+ // we don't want to update if this node is responsible for the token and it has a later startup time than endpoint.
+ InetAddress currentOwner = tokenMetadata.getEndpoint(token);
+ if (currentOwner == null)
+ {
+ logger.debug("New node {} at token {}", endpoint, token);
+ tokensToUpdateInMetadata.add(token);
+ tokensToUpdateInSystemKeyspace.add(token);
+ }
+ else if (endpoint.equals(currentOwner))
+ {
+ // set state back to normal, since the node may have tried to leave, but failed and is now back up
+ tokensToUpdateInMetadata.add(token);
+ tokensToUpdateInSystemKeyspace.add(token);
+ }
+ else if (Gossiper.instance.compareEndpointStartup(endpoint, currentOwner) > 0)
+ {
+ tokensToUpdateInMetadata.add(token);
+ tokensToUpdateInSystemKeyspace.add(token);
+
+ // currentOwner is no longer current, endpoint is. Keep track of these moves, because when
+ // a host no longer has any tokens, we'll want to remove it.
+ Multimap<InetAddress, Token> epToTokenCopy = getTokenMetadata().getEndpointToTokenMapForReading();
+ epToTokenCopy.get(currentOwner).remove(token);
+ if (epToTokenCopy.get(currentOwner).isEmpty())
+ endpointsToRemove.add(currentOwner);
+
+ logger.info("Nodes {} and {} have the same token {}. {} is the new owner", endpoint, currentOwner, token, endpoint);
+ }
+ else
+ {
+ logger.info("Nodes () and {} have the same token {}. Ignoring {}", endpoint, currentOwner, token, endpoint);
+ }
+ }
+
+ tokenMetadata.updateNormalTokens(tokensToUpdateInMetadata, endpoint);
+ for (InetAddress ep : endpointsToRemove)
+ {
+ removeEndpoint(ep);
+ if (replacing && ep.equals(DatabaseDescriptor.getReplaceAddress()))
+ Gossiper.instance.replacementQuarantine(ep); // quarantine locally longer than normally; see CASSANDRA-8260
+ }
+ if (!tokensToUpdateInSystemKeyspace.isEmpty())
+ SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace, StageManager.getStage(Stage.MUTATION));
+ }
+
/**
* Handle node move to normal state. That is, node is entering token ring and participating
* in reads.
@@ -2058,8 +2139,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private void handleStateNormal(final InetAddress endpoint, final String status)
{
Collection<Token> tokens = getTokensFor(endpoint);
- Set<Token> tokensToUpdateInMetadata = new HashSet<>();
- Set<Token> tokensToUpdateInSystemKeyspace = new HashSet<>();
Set<InetAddress> endpointsToRemove = new HashSet<>();
if (logger.isDebugEnabled())
@@ -2127,62 +2206,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
tokenMetadata.updateHostId(hostId, endpoint);
}
- for (final Token token : tokens)
- {
- // we don't want to update if this node is responsible for the token and it has a later startup time than endpoint.
- InetAddress currentOwner = tokenMetadata.getEndpoint(token);
- if (currentOwner == null)
- {
- logger.debug("New node {} at token {}", endpoint, token);
- tokensToUpdateInMetadata.add(token);
- tokensToUpdateInSystemKeyspace.add(token);
- }
- else if (endpoint.equals(currentOwner))
- {
- // set state back to normal, since the node may have tried to leave, but failed and is now back up
- tokensToUpdateInMetadata.add(token);
- tokensToUpdateInSystemKeyspace.add(token);
- }
- else if (Gossiper.instance.compareEndpointStartup(endpoint, currentOwner) > 0)
- {
- tokensToUpdateInMetadata.add(token);
- tokensToUpdateInSystemKeyspace.add(token);
-
- // currentOwner is no longer current, endpoint is. Keep track of these moves, because when
- // a host no longer has any tokens, we'll want to remove it.
- Multimap<InetAddress, Token> epToTokenCopy = getTokenMetadata().getEndpointToTokenMapForReading();
- epToTokenCopy.get(currentOwner).remove(token);
- if (epToTokenCopy.get(currentOwner).size() < 1)
- endpointsToRemove.add(currentOwner);
-
- logger.info(String.format("Nodes %s and %s have the same token %s. %s is the new owner",
- endpoint,
- currentOwner,
- token,
- endpoint));
- }
- else
- {
- logger.info(String.format("Nodes %s and %s have the same token %s. Ignoring %s",
- endpoint,
- currentOwner,
- token,
- endpoint));
- }
- }
-
// capture because updateNormalTokens clears moving and member status
boolean isMember = tokenMetadata.isMember(endpoint);
boolean isMoving = tokenMetadata.isMoving(endpoint);
- tokenMetadata.updateNormalTokens(tokensToUpdateInMetadata, endpoint);
- for (InetAddress ep : endpointsToRemove)
- {
- removeEndpoint(ep);
- if (replacing && DatabaseDescriptor.getReplaceAddress().equals(ep))
- Gossiper.instance.replacementQuarantine(ep); // quarantine locally longer than normally; see CASSANDRA-8260
- }
- if (!tokensToUpdateInSystemKeyspace.isEmpty())
- SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace, StageManager.getStage(Stage.MUTATION));
+
+ updateTokenMetadata(endpoint, tokens, endpointsToRemove);
if (isMoving || operationMode == Mode.MOVING)
{
@@ -2204,24 +2232,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
*/
private void handleStateLeaving(InetAddress endpoint)
{
- Collection<Token> tokens = getTokensFor(endpoint);
-
- if (logger.isDebugEnabled())
- logger.debug("Node {} state leaving, tokens {}", endpoint, tokens);
-
// If the node is previously unknown or tokens do not match, update tokenmetadata to
// have this node as 'normal' (it must have been using this token before the
// leave). This way we'll get pending ranges right.
- if (!tokenMetadata.isMember(endpoint))
- {
- logger.info("Node {} state jump to leaving", endpoint);
- tokenMetadata.updateNormalTokens(tokens, endpoint);
- }
- else if (!tokenMetadata.getTokens(endpoint).containsAll(tokens))
- {
- logger.warn("Node {} 'leaving' token mismatch. Long network partition?", endpoint);
- tokenMetadata.updateNormalTokens(tokens, endpoint);
- }
+
+ ensureUpToDateTokenMetadata(VersionedValue.STATUS_LEAVING, endpoint);
// at this point the endpoint is certainly a member with this token, so let's proceed
// normally
@@ -2254,6 +2269,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
*/
private void handleStateMoving(InetAddress endpoint, String[] pieces)
{
+ ensureUpToDateTokenMetadata(VersionedValue.STATUS_MOVING, endpoint);
+
assert pieces.length >= 2;
Token token = getTokenFactory().fromString(pieces[1]);
@@ -2299,6 +2316,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
else if (VersionedValue.REMOVING_TOKEN.equals(state))
{
+ ensureUpToDateTokenMetadata(state, endpoint);
+
if (logger.isDebugEnabled())
logger.debug("Tokens {} removed manually (endpoint was {})", removeTokens, endpoint);
diff --git a/src/java/org/apache/cassandra/utils/ExpiringMap.java b/src/java/org/apache/cassandra/utils/ExpiringMap.java
index ef013f5..a6895c5 100644
--- a/src/java/org/apache/cassandra/utils/ExpiringMap.java
+++ b/src/java/org/apache/cassandra/utils/ExpiringMap.java
@@ -121,6 +121,11 @@ public class ExpiringMap<K, V>
public void reset()
{
shutdown = false;
+ clear();
+ }
+
+ public void clear()
+ {
cache.clear();
}
diff --git a/test/distributed/org/apache/cassandra/distributed/Cluster.java b/test/distributed/org/apache/cassandra/distributed/Cluster.java
index c7f7675..4ae4e5d 100644
--- a/test/distributed/org/apache/cassandra/distributed/Cluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/Cluster.java
@@ -19,10 +19,10 @@
package org.apache.cassandra.distributed;
import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
import java.util.List;
+import java.util.Set;
+import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.impl.AbstractCluster;
import org.apache.cassandra.distributed.impl.IInvokableInstance;
@@ -35,9 +35,9 @@ import org.apache.cassandra.distributed.impl.Versions;
*/
public class Cluster extends AbstractCluster<IInvokableInstance> implements ICluster, AutoCloseable
{
- private Cluster(File root, Versions.Version version, List<InstanceConfig> configs, ClassLoader sharedClassLoader)
+ private Cluster(File root, Versions.Version version, List<InstanceConfig> configs, Set<Feature> features, ClassLoader sharedClassLoader)
{
- super(root, version, configs, sharedClassLoader);
+ super(root, version, configs, features, sharedClassLoader);
}
protected IInvokableInstance newInstanceWrapper(Versions.Version version, InstanceConfig config)
@@ -49,9 +49,17 @@ public class Cluster extends AbstractCluster<IInvokableInstance> implements IClu
{
return create(nodeCount, Cluster::new);
}
+ public static Cluster create(int nodeCount, Set<Feature> with) throws Throwable
+ {
+ return create(nodeCount, with, Cluster::new);
+ }
public static Cluster create(int nodeCount, File root)
{
- return create(nodeCount, Versions.CURRENT, root, Cluster::new);
+ return create(nodeCount, root, Cluster::new);
+ }
+ public static Cluster create(int nodeCount, File root, Set<Feature> with)
+ {
+ return create(nodeCount, Versions.CURRENT, root, with, Cluster::new);
}
}
diff --git a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
index 0c8e63a..d0613b1 100644
--- a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
@@ -22,7 +22,9 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.List;
+import java.util.Set;
+import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.impl.AbstractCluster;
import org.apache.cassandra.distributed.impl.IUpgradeableInstance;
@@ -38,9 +40,9 @@ import org.apache.cassandra.distributed.impl.Versions;
*/
public class UpgradeableCluster extends AbstractCluster<IUpgradeableInstance> implements ICluster, AutoCloseable
{
- private UpgradeableCluster(File root, Versions.Version version, List<InstanceConfig> configs, ClassLoader sharedClassLoader)
+ private UpgradeableCluster(File root, Versions.Version version, List<InstanceConfig> configs, Set<Feature> features, ClassLoader sharedClassLoader)
{
- super(root, version, configs, sharedClassLoader);
+ super(root, version, configs, features, sharedClassLoader);
}
protected IUpgradeableInstance newInstanceWrapper(Versions.Version version, InstanceConfig config)
@@ -52,15 +54,17 @@ public class UpgradeableCluster extends AbstractCluster<IUpgradeableInstance> im
{
return create(nodeCount, UpgradeableCluster::new);
}
+
public static UpgradeableCluster create(int nodeCount, File root)
{
- return create(nodeCount, Versions.CURRENT, root, UpgradeableCluster::new);
+ return create(nodeCount, root, UpgradeableCluster::new);
}
public static UpgradeableCluster create(int nodeCount, Versions.Version version) throws IOException
{
- return create(nodeCount, version, Files.createTempDirectory("dtests").toFile(), UpgradeableCluster::new);
+ return create(nodeCount, version, UpgradeableCluster::new);
}
+
public static UpgradeableCluster create(int nodeCount, Versions.Version version, File root)
{
return create(nodeCount, version, root, UpgradeableCluster::new);
diff --git a/test/distributed/org/apache/cassandra/distributed/api/Feature.java b/test/distributed/org/apache/cassandra/distributed/api/Feature.java
new file mode 100644
index 0000000..a5c9316
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/api/Feature.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+public enum Feature
+{
+ NETWORK, GOSSIP
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/api/IInstance.java b/test/distributed/org/apache/cassandra/distributed/api/IInstance.java
index 3834093..25e2c94 100644
--- a/test/distributed/org/apache/cassandra/distributed/api/IInstance.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/IInstance.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.distributed.api;
import org.apache.cassandra.locator.InetAddressAndPort;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Future;
@@ -37,10 +38,12 @@ public interface IInstance extends IIsolatedExecutor
UUID schemaVersion();
void startup();
+ boolean isShutdown();
Future<Void> shutdown();
+ Future<Void> shutdown(boolean graceful);
// these methods are not for external use, but for simplicity we leave them public and on the normal IInstance interface
- void startup(ICluster cluster);
+ void startup(ICluster cluster, Set<Feature> with);
void receiveMessage(IMessage message);
int getMessagingVersion();
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index c27d9bf..67c844f 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -30,9 +30,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@@ -47,6 +44,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.api.IInstanceConfig;
@@ -94,6 +92,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
private final File root;
private final ClassLoader sharedClassLoader;
+ private final Set<Feature> features;
// mutated by starting/stopping a node
private final List<I> instances;
@@ -136,23 +135,28 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
return config;
}
+ public boolean isShutdown()
+ {
+ return isShutdown;
+ }
+
@Override
public synchronized void startup()
{
if (!isShutdown)
throw new IllegalStateException();
- delegate().startup(AbstractCluster.this);
+ delegate().startup(AbstractCluster.this, features);
isShutdown = false;
updateMessagingVersions();
}
@Override
- public synchronized Future<Void> shutdown()
+ public synchronized Future<Void> shutdown(boolean graceful)
{
if (isShutdown)
throw new IllegalStateException();
isShutdown = true;
- Future<Void> future = delegate.shutdown();
+ Future<Void> future = delegate.shutdown(graceful);
delegate = null;
return future;
}
@@ -181,9 +185,10 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
}
}
- protected AbstractCluster(File root, Versions.Version version, List<InstanceConfig> configs, ClassLoader sharedClassLoader)
+ protected AbstractCluster(File root, Versions.Version version, List<InstanceConfig> configs, Set<Feature> features, ClassLoader sharedClassLoader)
{
this.root = root;
+ this.features = features;
this.sharedClassLoader = sharedClassLoader;
this.instances = new ArrayList<>();
this.instanceMap = new HashMap<>();
@@ -325,37 +330,60 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
get(instance).schemaChangeInternal(statement);
}
- void startup()
+ public void startup()
{
- parallelForEach(I::startup, 0, null);
+ forEach(I::startup);
}
protected interface Factory<I extends IInstance, C extends AbstractCluster<I>>
{
- C newCluster(File root, Versions.Version version, List<InstanceConfig> configs, ClassLoader sharedClassLoader);
+ C newCluster(File root, Versions.Version version, List<InstanceConfig> configs, Set<Feature> features, ClassLoader sharedClassLoader);
}
protected static <I extends IInstance, C extends AbstractCluster<I>> C
create(int nodeCount, Factory<I, C> factory) throws Throwable
{
- return create(nodeCount, Files.createTempDirectory("dtests").toFile(), factory);
+ return create(nodeCount, Collections.emptySet(), factory);
+ }
+
+ protected static <I extends IInstance, C extends AbstractCluster<I>> C
+ create(int nodeCount, Set<Feature> features, Factory<I, C> factory) throws Throwable
+ {
+ return create(nodeCount, Files.createTempDirectory("dtests").toFile(), features, factory);
}
protected static <I extends IInstance, C extends AbstractCluster<I>> C
create(int nodeCount, File root, Factory<I, C> factory)
{
- return create(nodeCount, Versions.CURRENT, root, factory);
+ return create(nodeCount, root, Collections.emptySet(), factory);
+ }
+
+ protected static <I extends IInstance, C extends AbstractCluster<I>> C
+ create(int nodeCount, File root, Set<Feature> features, Factory<I, C> factory)
+ {
+ return create(nodeCount, Versions.CURRENT, root, features, factory);
}
protected static <I extends IInstance, C extends AbstractCluster<I>> C
create(int nodeCount, Versions.Version version, Factory<I, C> factory) throws IOException
{
- return create(nodeCount, version, Files.createTempDirectory("dtests").toFile(), factory);
+ return create(nodeCount, version, Collections.emptySet(), factory);
+ }
+
+ protected static <I extends IInstance, C extends AbstractCluster<I>> C
+ create(int nodeCount, Versions.Version version, Set<Feature> features, Factory<I, C> factory) throws IOException
+ {
+ return create(nodeCount, version, Files.createTempDirectory("dtests").toFile(), features, factory);
}
protected static <I extends IInstance, C extends AbstractCluster<I>> C
create(int nodeCount, Versions.Version version, File root, Factory<I, C> factory)
{
+ return create(nodeCount, version, root, Collections.emptySet(), factory);
+ }
+ protected static <I extends IInstance, C extends AbstractCluster<I>> C
+ create(int nodeCount, Versions.Version version, File root, Set<Feature> features, Factory<I, C> factory)
+ {
root.mkdirs();
setupLogging(root);
@@ -370,8 +398,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
token += increment;
}
- C cluster = factory.newCluster(root, version, configs, sharedClassLoader);
- cluster.startup();
+ C cluster = factory.newCluster(root, version, configs, features, sharedClassLoader);
return cluster;
}
@@ -398,6 +425,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
public void close()
{
FBUtilities.waitOnFutures(instances.stream()
+ .filter(i -> !i.isShutdown())
.map(IInstance::shutdown)
.collect(Collectors.toList()),
1L, TimeUnit.MINUTES);
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java b/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
index e9e6844..94df6cd 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.distributed.impl;
import java.io.Serializable;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
@@ -26,6 +27,7 @@ import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
+import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.distributed.api.IInstanceConfig;
@@ -110,9 +112,9 @@ public abstract class DelegatingInvokableInstance implements IInvokableInstance
}
@Override
- public void startup(ICluster cluster)
+ public void startup(ICluster cluster, Set<Feature> with)
{
- delegate().startup(cluster);
+ delegate().startup(cluster, with);
}
@Override
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 94dbc96..1b385fb 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -26,6 +26,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -36,7 +37,6 @@ import java.util.function.Function;
import org.slf4j.LoggerFactory;
import ch.qos.logback.classic.LoggerContext;
-import com.codahale.metrics.MetricFilter;
import org.apache.cassandra.batchlog.BatchlogManager;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.SharedExecutorPool;
@@ -56,6 +56,7 @@ import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.distributed.api.IInstanceConfig;
@@ -98,6 +99,10 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
this.config = config;
InstanceIDDefiner.setInstanceId(config.num());
FBUtilities.setBroadcastInetAddress(config.broadcastAddressAndPort().address);
+ acceptsOnInstance((IInstanceConfig override) -> {
+ Config.setOverrideLoadConfig(() -> loadConfig(override));
+ DatabaseDescriptor.setDaemonInitialized();
+ }).accept(config);
}
public IInstanceConfig config()
@@ -145,6 +150,11 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
throw new UnsupportedOperationException();
}
+ public boolean isShutdown()
+ {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public void schemaChangeInternal(String query)
{
@@ -249,18 +259,15 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
}
@Override
- public void startup(ICluster cluster)
+ public void startup(ICluster cluster, Set<Feature> with)
{
sync(() -> {
try
{
mkdirs();
-
- Config.setOverrideLoadConfig(() -> loadConfig(config));
- DatabaseDescriptor.setDaemonInitialized();
DatabaseDescriptor.createAllDirectories();
- // We need to persist this as soon as possible after startup checks.
+ // We need to persist this as soon as possible after startup checks.
// This should be the first write to SystemKeyspace (CASSANDRA-11742)
SystemKeyspace.persistLocalMetadata();
LegacySchemaMigrator.migrate();
@@ -287,8 +294,17 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
throw new RuntimeException(e);
}
- initializeRing(cluster);
- registerMockMessaging(cluster);
+ // TODO: support each separately
+ if (with.contains(Feature.GOSSIP) || with.contains(Feature.NETWORK))
+ {
+ StorageService.instance.prepareToJoin();
+ StorageService.instance.joinTokenRing(1000);
+ }
+ else
+ {
+ initializeRing(cluster);
+ registerMockMessaging(cluster);
+ }
SystemKeyspace.finishStartup();
@@ -376,6 +392,14 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
public Future<Void> shutdown()
{
+ return shutdown(true);
+ }
+
+ public Future<Void> shutdown(boolean graceful)
+ {
+ if (!graceful)
+ MessagingService.instance().shutdown(false);
+
Future<?> future = async((ExecutorService executor) -> {
Throwable error = null;
error = parallelRun(error, executor,
@@ -383,7 +407,6 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
CompactionManager.instance::forceShutdown,
BatchlogManager.instance::shutdown,
HintsService.instance::shutdownBlocking,
- CommitLog.instance::shutdownBlocking,
SecondaryIndexManager::shutdownExecutors,
ColumnFamilyStore::shutdownFlushExecutor,
ColumnFamilyStore::shutdownPostFlushExecutor,
@@ -401,7 +424,10 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
);
error = parallelRun(error, executor,
StageManager::shutdownAndWait,
- SharedExecutorPool.SHARED::shutdown
+ SharedExecutorPool.SHARED::shutdownAndWait
+ );
+ error = parallelRun(error, executor,
+ CommitLog.instance::shutdownBlocking
);
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
index 56c8074..57530e0 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
@@ -47,7 +47,7 @@ public class InstanceClassLoader extends URLClassLoader
|| name.startsWith("sun.")
|| name.startsWith("oracle.")
|| name.startsWith("com.sun.")
- || name.startsWith("com.oracle.")
+ || name.startsWith("com.sun.")
|| name.startsWith("java.")
|| name.startsWith("javax.")
|| name.startsWith("jdk.")
diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
index 18ca17f..3945ec5 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
@@ -49,6 +49,7 @@ public class DistributedTestBase
protected static <C extends AbstractCluster<?>> C init(C cluster)
{
+ cluster.startup();
cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + cluster.size() + "};");
return cluster;
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java b/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
new file mode 100644
index 0000000..11e9985
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Iterables;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class GossipTest extends DistributedTestBase
+{
+
+ @Test
+ public void nodeDownDuringMove() throws Throwable
+ {
+ int liveCount = 1;
+ System.setProperty("cassandra.consistent.rangemovement", "false");
+ try (Cluster cluster = Cluster.create(2 + liveCount, EnumSet.of(Feature.GOSSIP)))
+ {
+ int fail = liveCount + 1;
+ int late = fail + 1;
+ for (int i = 1 ; i <= liveCount ; ++i)
+ cluster.get(i).startup();
+ cluster.get(fail).startup();
+ Collection<String> expectTokens = cluster.get(fail).callsOnInstance(() ->
+ StorageService.instance.getTokenMetadata().getTokens(FBUtilities.getBroadcastAddress())
+ .stream().map(Object::toString).collect(Collectors.toList())
+ ).call();
+
+ InetAddress failAddress = cluster.get(fail).broadcastAddressAndPort().address;
+ // wait for NORMAL state
+ for (int i = 1 ; i <= liveCount ; ++i)
+ {
+ cluster.get(i).acceptsOnInstance((InetAddress endpoint) -> {
+ EndpointState ep;
+ while (null == (ep = Gossiper.instance.getEndpointStateForEndpoint(endpoint))
+ || ep.getApplicationState(ApplicationState.STATUS) == null
+ || !ep.getApplicationState(ApplicationState.STATUS).value.startsWith("NORMAL"))
+ LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
+ }).accept(failAddress);
+ }
+
+ // set ourselves to MOVING, and wait for it to propagate
+ cluster.get(fail).runOnInstance(() -> {
+
+ Token token = Iterables.getFirst(StorageService.instance.getTokenMetadata().getTokens(FBUtilities.getBroadcastAddress()), null);
+ Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.moving(token));
+ });
+
+ for (int i = 1 ; i <= liveCount ; ++i)
+ {
+ cluster.get(i).acceptsOnInstance((InetAddress endpoint) -> {
+ EndpointState ep;
+ while (null == (ep = Gossiper.instance.getEndpointStateForEndpoint(endpoint))
+ || (ep.getApplicationState(ApplicationState.STATUS) == null
+ || !ep.getApplicationState(ApplicationState.STATUS).value.startsWith("MOVING")))
+ LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
+ }).accept(failAddress);
+ }
+
+ cluster.get(fail).shutdown(false).get();
+ cluster.get(late).startup();
+ cluster.get(late).acceptsOnInstance((InetAddress endpoint) -> {
+ EndpointState ep;
+ while (null == (ep = Gossiper.instance.getEndpointStateForEndpoint(endpoint))
+ || !ep.getApplicationState(ApplicationState.STATUS).value.startsWith("MOVING"))
+ LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
+ }).accept(failAddress);
+
+ Collection<String> tokens = cluster.get(late).appliesOnInstance((InetAddress endpoint) ->
+ StorageService.instance.getTokenMetadata().getTokens(failAddress)
+ .stream().map(Object::toString).collect(Collectors.toList())
+ ).apply(failAddress);
+
+ Assert.assertEquals(expectTokens, tokens);
+ }
+ }
+
+}
diff --git a/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java b/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java
index 011a8ba..e4c695c 100644
--- a/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java
+++ b/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java
@@ -56,7 +56,7 @@ public class SEPExecutorTest
}
// shutdown does not guarantee that threads are actually dead once it exits, only that they will stop promptly afterwards
- sharedPool.shutdown();
+ sharedPool.shutdownAndWait();
for (Thread thread : Thread.getAllStackTraces().keySet())
{
if (thread.getName().contains(MAGIC))
diff --git a/test/unit/org/apache/cassandra/service/MoveTest.java b/test/unit/org/apache/cassandra/service/MoveTest.java
index 53365aa..bc6c6d2 100644
--- a/test/unit/org/apache/cassandra/service/MoveTest.java
+++ b/test/unit/org/apache/cassandra/service/MoveTest.java
@@ -479,7 +479,16 @@ public class MoveTest
{
tmd.removeFromMoving(host);
assertTrue(!tmd.isMoving(host));
- tmd.updateNormalToken(new BigIntegerToken(String.valueOf(token)), host);
+ Token newToken = new BigIntegerToken(String.valueOf(token));
+ tmd.updateNormalToken(newToken, host);
+ // As well as upating TMD, update the host's tokens in gossip. Since CASSANDRA-15120, status changing to MOVING
+ // ensures that TMD is up to date with token assignments according to gossip. So we need to make sure gossip has
+ // the correct new token, as the moving node itself would do upon successful completion of the move operation.
+ // Without this, the next movement for that host will set the token in TMD's back to the old value from gossip
+ // and incorrect range movements will follow
+ Gossiper.instance.injectApplicationState(host,
+ ApplicationState.TOKENS,
+ new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(newToken)));
}
private Map.Entry<Range<Token>, Collection<InetAddress>> generatePendingMapEntry(int start, int end, String... endpoints) throws UnknownHostException
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org