You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2020/11/09 20:25:33 UTC

[cassandra] 01/02: add schema coordinator

This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit a960d87e05f01000a758d8e7e5a58be57d13eb33
Author: Blake Eggleston <bd...@gmail.com>
AuthorDate: Mon May 18 23:02:51 2020 +0200

    add schema coordinator
    
    Co-authored-by: Stefan Miklosovic <st...@instaclustr.com>
---
 src/java/org/apache/cassandra/config/Schema.java   |   8 +
 .../cassandra/service/MigrationCoordinator.java    | 501 +++++++++++++++++++++
 .../apache/cassandra/service/MigrationManager.java | 124 +----
 .../apache/cassandra/service/MigrationTask.java    | 116 -----
 .../apache/cassandra/service/StorageService.java   |  69 ++-
 .../cassandra/service/StorageServiceMBean.java     |   4 +
 .../service/MigrationCoordinatorTest.java          | 319 +++++++++++++
 7 files changed, 895 insertions(+), 246 deletions(-)

diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index 6d91d8d..2d50b32 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -574,6 +574,14 @@ public class Schema
     }
 
     /**
+     * Checks whether the current schema is empty.
+     */
+    public boolean isEmpty()
+    {
+        return emptyVersion.equals(version);
+    }
+
+    /**
      * Read schema from system keyspace and calculate MD5 digest of every row, resulting digest
      * will be converted into UUID which would act as content-based version of the schema.
      */
diff --git a/src/java/org/apache/cassandra/service/MigrationCoordinator.java b/src/java/org/apache/cassandra/service/MigrationCoordinator.java
new file mode 100644
index 0000000..18ffdbb
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/MigrationCoordinator.java
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.net.InetAddress;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.LocalAwareExecutorService;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
+
+public class MigrationCoordinator
+{
+    private static final Logger logger = LoggerFactory.getLogger(MigrationCoordinator.class);
+    private static final RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();
+    private static final Future<Void> FINISHED_FUTURE = Futures.immediateFuture(null);
+
+    private static final int MIGRATION_DELAY_IN_MS = 60000;
+    private static final int MAX_OUTSTANDING_VERSION_REQUESTS = 3;
+
+    public static final MigrationCoordinator instance = new MigrationCoordinator();
+
+    static class VersionInfo
+    {
+        final UUID version;
+
+        final Set<InetAddress> endpoints           = Sets.newConcurrentHashSet();
+        final Set<InetAddress> outstandingRequests = Sets.newConcurrentHashSet();
+        final Deque<InetAddress> requestQueue      = new ArrayDeque<>();
+
+        private final WaitQueue waitQueue = new WaitQueue();
+
+        volatile boolean receivedSchema;
+
+        VersionInfo(UUID version)
+        {
+            this.version = version;
+        }
+
+        WaitQueue.Signal register()
+        {
+            return waitQueue.register();
+        }
+
+        void markReceived()
+        {
+            if (receivedSchema)
+                return;
+
+            receivedSchema = true;
+            waitQueue.signalAll();
+        }
+
+        boolean wasReceived()
+        {
+            return receivedSchema;
+        }
+    }
+
+    private final Map<UUID, VersionInfo> versionInfo = new HashMap<>();
+    private final Map<InetAddress, UUID> endpointVersions = new HashMap<>();
+
+    public void start()
+    {
+        ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(this::pullUnreceivedSchemaVersions, 1, 1, TimeUnit.MINUTES);
+    }
+
+    public synchronized void reset()
+    {
+        versionInfo.clear();
+    }
+
+    synchronized List<Future<Void>> pullUnreceivedSchemaVersions()
+    {
+        List<Future<Void>> futures = new ArrayList<>();
+        for (VersionInfo info : versionInfo.values())
+        {
+            if (info.wasReceived() || info.outstandingRequests.size() > 0)
+                continue;
+
+            Future<Void> future = maybePullSchema(info);
+            if (future != null && future != FINISHED_FUTURE)
+                futures.add(future);
+        }
+
+        return futures;
+    }
+
+    synchronized Future<Void> maybePullSchema(VersionInfo info)
+    {
+        if (info.endpoints.isEmpty() || info.wasReceived() || !shouldPullSchema(info.version))
+            return FINISHED_FUTURE;
+
+        if (info.outstandingRequests.size() >= getMaxOutstandingVersionRequests())
+            return FINISHED_FUTURE;
+
+        for (int i=0, isize=info.requestQueue.size(); i<isize; i++)
+        {
+            InetAddress endpoint = info.requestQueue.remove();
+            if (!info.endpoints.contains(endpoint))
+                continue;
+
+            if (shouldPullFromEndpoint(endpoint) && info.outstandingRequests.add(endpoint))
+            {
+                return scheduleSchemaPull(endpoint, info);
+            }
+            else
+            {
+                // return to queue
+                info.requestQueue.offer(endpoint);
+            }
+        }
+
+        // no suitable endpoints were found, check again in a minute, the periodic task will pick it up
+        return null;
+    }
+
+    public synchronized Map<UUID, Set<InetAddress>> outstandingVersions()
+    {
+        HashMap<UUID, Set<InetAddress>> map = new HashMap<>();
+        for (VersionInfo info : versionInfo.values())
+            if (!info.wasReceived())
+                map.put(info.version, ImmutableSet.copyOf(info.endpoints));
+        return map;
+    }
+
+    @VisibleForTesting
+    protected VersionInfo getVersionInfoUnsafe(UUID version)
+    {
+        return versionInfo.get(version);
+    }
+
+    @VisibleForTesting
+    protected int getMaxOutstandingVersionRequests()
+    {
+        return MAX_OUTSTANDING_VERSION_REQUESTS;
+    }
+
+    @VisibleForTesting
+    protected boolean isAlive(InetAddress endpoint)
+    {
+        return FailureDetector.instance.isAlive(endpoint);
+    }
+
+    @VisibleForTesting
+    protected boolean shouldPullSchema(UUID version)
+    {
+        if (Schema.instance.getVersion() == null)
+        {
+            logger.debug("Not pulling schema for version {}, because local schama version is not known yet", version);
+            return false;
+        }
+
+        if (Schema.instance.getVersion().equals(version))
+        {
+            logger.debug("Not pulling schema for version {}, because schema versions match: " +
+                         "local={}, remote={}",
+                         version, Schema.instance.getVersion(),
+                         version);
+            return false;
+        }
+        return true;
+    }
+
+    // Since 3.0.14 protocol contains only a CASSANDRA-13004 bugfix, it is safe to accept schema changes
+    // from both 3.0 and 3.0.14.
+    private static boolean is30Compatible(int version)
+    {
+        return version == MessagingService.current_version || version == MessagingService.VERSION_3014;
+    }
+
+    @VisibleForTesting
+    protected boolean shouldPullFromEndpoint(InetAddress endpoint)
+    {
+        if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+            return false;
+
+        EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+        if (state == null)
+            return false;
+
+        final String releaseVersion = state.getApplicationState(ApplicationState.RELEASE_VERSION).value;
+        final String ourMajorVersion = FBUtilities.getReleaseVersionMajor();
+
+        if (!releaseVersion.startsWith(ourMajorVersion))
+        {
+            logger.debug("Not pulling schema from {} because release version in Gossip is not major version {}, it is {}",
+                         endpoint, ourMajorVersion, releaseVersion);
+            return false;
+        }
+
+        if (!MessagingService.instance().knowsVersion(endpoint))
+        {
+            logger.debug("Not pulling schema from {} because their messaging version is unknown", endpoint);
+            return false;
+        }
+
+        if (!is30Compatible(MessagingService.instance().getRawVersion(endpoint)))
+        {
+            logger.debug("Not pulling schema from {} because their schema format is incompatible", endpoint);
+            return false;
+        }
+
+        if (Gossiper.instance.isGossipOnlyMember(endpoint))
+        {
+            logger.debug("Not pulling schema from {} because it's a gossip only member", endpoint);
+            return false;
+        }
+        return true;
+    }
+
+    @VisibleForTesting
+    protected boolean shouldPullImmediately(InetAddress endpoint, UUID version)
+    {
+        if (Schema.instance.isEmpty() || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
+        {
+            // If we think we may be bootstrapping or have recently started, submit MigrationTask immediately
+            logger.debug("Immediately submitting migration task for {}, " +
+                         "schema versions: local={}, remote={}",
+                         endpoint,
+                         Schema.instance.getVersion(),
+                         version);
+            return true;
+        }
+        return false;
+    }
+
+    @VisibleForTesting
+    protected boolean isLocalVersion(UUID version)
+    {
+        return Schema.instance.getVersion().equals(version);
+    }
+
+    /**
+     * If a previous schema update brought our version the same as the incoming schema, don't apply it
+     */
+    synchronized boolean shouldApplySchemaFor(VersionInfo info)
+    {
+        if (info.wasReceived())
+            return false;
+        return !isLocalVersion(info.version);
+    }
+
+    synchronized Future<Void> reportEndpointVersion(InetAddress endpoint, UUID version)
+    {
+        UUID current = endpointVersions.put(endpoint, version);
+        if (current != null && current.equals(version))
+            return FINISHED_FUTURE;
+
+        VersionInfo info = versionInfo.computeIfAbsent(version, VersionInfo::new);
+        if (isLocalVersion(version))
+            info.markReceived();
+        info.endpoints.add(endpoint);
+        info.requestQueue.addFirst(endpoint);
+
+        // disassociate this endpoint from its (now) previous schema version
+        removeEndpointFromVersion(endpoint, current);
+
+        return maybePullSchema(info);
+    }
+
+    Future<Void> reportEndpointVersion(InetAddress endpoint, EndpointState state)
+    {
+        if (state == null)
+            return FINISHED_FUTURE;
+
+        VersionedValue version = state.getApplicationState(ApplicationState.SCHEMA);
+
+        if (version == null)
+            return FINISHED_FUTURE;
+
+        return reportEndpointVersion(endpoint, UUID.fromString(version.value));
+    }
+
+    private synchronized void removeEndpointFromVersion(InetAddress endpoint, UUID version)
+    {
+        if (version == null)
+            return;
+
+        VersionInfo info = versionInfo.get(version);
+
+        if (info == null)
+            return;
+
+        info.endpoints.remove(endpoint);
+        if (info.endpoints.isEmpty())
+        {
+            info.waitQueue.signalAll();
+            versionInfo.remove(version);
+        }
+    }
+
+    Future<Void> scheduleSchemaPull(InetAddress endpoint, VersionInfo info)
+    {
+        LocalAwareExecutorService stage = StageManager.getStage(Stage.MIGRATION);
+        FutureTask<Void> task = new FutureTask<>(() -> pullSchema(new Callback(endpoint, info)), null);
+        if (shouldPullImmediately(endpoint, info.version))
+        {
+            stage.submit(task);
+        }
+        else
+        {
+            ScheduledExecutors.nonPeriodicTasks.schedule(() -> stage.submit(task), MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);
+        }
+        return task;
+    }
+
+    @VisibleForTesting
+    protected void mergeSchemaFrom(InetAddress endpoint, Collection<Mutation> mutations)
+    {
+        SchemaKeyspace.mergeSchemaAndAnnounceVersion(mutations);
+    }
+
+    class Callback implements IAsyncCallbackWithFailure<Collection<Mutation>>
+    {
+        final InetAddress endpoint;
+        final VersionInfo info;
+
+        public Callback(InetAddress endpoint, VersionInfo info)
+        {
+            this.endpoint = endpoint;
+            this.info = info;
+        }
+
+        public void onFailure(InetAddress from)
+        {
+            fail();
+        }
+
+        Future<Void> fail()
+        {
+            return pullComplete(endpoint, info, false);
+        }
+
+        public void response(MessageIn<Collection<Mutation>> message)
+        {
+            response(message.payload);
+        }
+
+        Future<Void> response(Collection<Mutation> mutations)
+        {
+            synchronized (info)
+            {
+                if (shouldApplySchemaFor(info))
+                {
+                    try
+                    {
+                        mergeSchemaFrom(endpoint, mutations);
+                    }
+                    catch (Exception e)
+                    {
+                        logger.error(String.format("Unable to merge schema from %s", endpoint), e);
+                        return fail();
+                    }
+                }
+                return pullComplete(endpoint, info, true);
+            }
+        }
+
+        public boolean isLatencyForSnitch()
+        {
+            return false;
+        }
+    }
+
+    private void pullSchema(Callback callback)
+    {
+        if (!isAlive(callback.endpoint))
+        {
+            logger.warn("Can't send schema pull request: node {} is down.", callback.endpoint);
+            callback.fail();
+            return;
+        }
+
+        // There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(),
+        // potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with
+        // a higher major.
+        if (!shouldPullFromEndpoint(callback.endpoint))
+        {
+            logger.info("Skipped sending a migration request: node {} has a higher major version now.", callback.endpoint);
+            callback.fail();
+            return;
+        }
+
+        logger.debug("Requesting schema from {}", callback.endpoint);
+        sendMigrationMessage(callback);
+    }
+
+    protected void sendMigrationMessage(Callback callback)
+    {
+        MessageOut<?> message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);
+        logger.info("Sending schema pull request to {} at {} with timeout {}", callback.endpoint, System.currentTimeMillis(), message.getTimeout());
+        MessagingService.instance().sendRR(message, callback.endpoint, callback, message.getTimeout(), true);
+    }
+
+    private synchronized Future<Void> pullComplete(InetAddress endpoint, VersionInfo info, boolean wasSuccessful)
+    {
+        if (wasSuccessful)
+            info.markReceived();
+
+        info.outstandingRequests.remove(endpoint);
+        info.requestQueue.add(endpoint);
+        return maybePullSchema(info);
+    }
+
+    /**
+     * Wait until we've received schema responses for all versions we're aware of
+     * @param waitMillis
+     * @return true if response for all schemas were received, false if we timed out waiting
+     */
+    public boolean awaitSchemaRequests(long waitMillis)
+    {
+        if (!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress()))
+            CassandraDaemon.waitForGossipToSettle();
+
+        WaitQueue.Signal signal = null;
+        try
+        {
+            synchronized (this)
+            {
+                List<WaitQueue.Signal> signalList = new ArrayList<>(versionInfo.size());
+                for (VersionInfo version : versionInfo.values())
+                {
+                    if (version.wasReceived())
+                        continue;
+
+                    signalList.add(version.register());
+                }
+
+                if (signalList.isEmpty())
+                    return true;
+
+                WaitQueue.Signal[] signals = new WaitQueue.Signal[signalList.size()];
+                signalList.toArray(signals);
+                signal = WaitQueue.all(signals);
+            }
+
+            return signal.awaitUntil(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(waitMillis));
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+        finally
+        {
+            if (signal != null)
+                signal.cancel();
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 26b1aed..2b0c834 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -21,12 +21,9 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.*;
-import java.lang.management.ManagementFactory;
-import java.lang.management.RuntimeMXBean;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
@@ -56,12 +53,6 @@ public class MigrationManager
 
     public static final MigrationManager instance = new MigrationManager();
 
-    private static final RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();
-
-    public static final int MIGRATION_DELAY_IN_MS = 60000;
-
-    private static final int MIGRATION_TASK_WAIT_IN_SECONDS = Integer.parseInt(System.getProperty("cassandra.migration_task_wait_in_seconds", "1"));
-
     private final List<MigrationListener> listeners = new CopyOnWriteArrayList<>();
 
     private MigrationManager() {}
@@ -76,86 +67,6 @@ public class MigrationManager
         listeners.remove(listener);
     }
 
-    public static void scheduleSchemaPull(InetAddress endpoint, EndpointState state)
-    {
-        VersionedValue value = state.getApplicationState(ApplicationState.SCHEMA);
-
-        if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && value != null)
-            maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint, state.getApplicationState(ApplicationState.RELEASE_VERSION).value);
-    }
-
-    /**
-     * If versions differ this node sends request with local migration list to the endpoint
-     * and expecting to receive a list of migrations to apply locally.
-     */
-    private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint, String  releaseVersion)
-    {
-        String ourMajorVersion = FBUtilities.getReleaseVersionMajor();
-        if (!releaseVersion.startsWith(ourMajorVersion))
-        {
-            logger.debug("Not pulling schema because release version in Gossip is not major version {}, it is {}", ourMajorVersion, releaseVersion);
-            return;
-        }
-
-        if ((Schema.instance.getVersion() != null && Schema.instance.getVersion().equals(theirVersion)) || !shouldPullSchemaFrom(endpoint))
-        {
-            logger.debug("Not pulling schema because versions match or shouldPullSchemaFrom returned false");
-            return;
-        }
-
-        if (Schema.emptyVersion.equals(Schema.instance.getVersion()) || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
-        {
-            // If we think we may be bootstrapping or have recently started, submit MigrationTask immediately
-            logger.debug("Submitting migration task for {}", endpoint);
-            submitMigrationTask(endpoint);
-        }
-        else
-        {
-            // Include a delay to make sure we have a chance to apply any changes being
-            // pushed out simultaneously. See CASSANDRA-5025
-            Runnable runnable = () ->
-            {
-                // grab the latest version of the schema since it may have changed again since the initial scheduling
-                EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
-                if (epState == null)
-                {
-                    logger.debug("epState vanished for {}, not submitting migration task", endpoint);
-                    return;
-                }
-                VersionedValue value = epState.getApplicationState(ApplicationState.SCHEMA);
-                UUID currentVersion = UUID.fromString(value.value);
-                if (Schema.instance.getVersion().equals(currentVersion))
-                {
-                    logger.debug("not submitting migration task for {} because our versions match", endpoint);
-                    return;
-                }
-                logger.debug("submitting migration task for {}", endpoint);
-                submitMigrationTask(endpoint);
-            };
-            ScheduledExecutors.nonPeriodicTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);
-        }
-    }
-
-    private static Future<?> submitMigrationTask(InetAddress endpoint)
-    {
-        /*
-         * Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are
-         * running in the gossip stage.
-         */
-        return StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint));
-    }
-
-    public static boolean shouldPullSchemaFrom(InetAddress endpoint)
-    {
-        /*
-         * Don't request schema from nodes with a differnt or unknonw major version (may have incompatible schema)
-         * Don't request schema from fat clients
-         */
-        return MessagingService.instance().knowsVersion(endpoint)
-                && is30Compatible(MessagingService.instance().getRawVersion(endpoint))
-                && !Gossiper.instance.isGossipOnlyMember(endpoint);
-    }
-
     // Since 3.0.14 protocol contains only a CASSANDRA-13004 bugfix, it is safe to accept schema changes
     // from both 3.0 and 3.0.14.
     private static boolean is30Compatible(int version)
@@ -163,29 +74,6 @@ public class MigrationManager
         return version == MessagingService.current_version || version == MessagingService.VERSION_3014;
     }
 
-    public static boolean isReadyForBootstrap()
-    {
-        return MigrationTask.getInflightTasks().isEmpty();
-    }
-
-    public static void waitUntilReadyForBootstrap()
-    {
-        CountDownLatch completionLatch;
-        while ((completionLatch = MigrationTask.getInflightTasks().poll()) != null)
-        {
-            try
-            {
-                if (!completionLatch.await(MIGRATION_TASK_WAIT_IN_SECONDS, TimeUnit.SECONDS))
-                    logger.error("Migration task failed to complete");
-            }
-            catch (InterruptedException e)
-            {
-                Thread.currentThread().interrupt();
-                logger.error("Migration task was interrupted");
-            }
-        }
-    }
-
     public void notifyCreateKeyspace(KeyspaceMetadata ksm)
     {
         for (MigrationListener listener : listeners)
@@ -610,15 +498,15 @@ public class MigrationManager
         Set<InetAddress> liveEndpoints = Gossiper.instance.getLiveMembers();
         liveEndpoints.remove(FBUtilities.getBroadcastAddress());
 
+        MigrationCoordinator.instance.reset();
+
         // force migration if there are nodes around
         for (InetAddress node : liveEndpoints)
         {
-            if (shouldPullSchemaFrom(node))
-            {
-                logger.debug("Requesting schema from {}", node);
-                FBUtilities.waitOnFuture(submitMigrationTask(node));
-                break;
-            }
+            EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(node);
+            Future<Void> pull = MigrationCoordinator.instance.reportEndpointVersion(node, state);
+            if (pull != null)
+                FBUtilities.waitOnFuture(pull);
         }
 
         logger.info("Local schema reset is complete.");
diff --git a/src/java/org/apache/cassandra/service/MigrationTask.java b/src/java/org/apache/cassandra/service/MigrationTask.java
deleted file mode 100644
index 6b04756..0000000
--- a/src/java/org/apache/cassandra/service/MigrationTask.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.Collection;
-import java.util.EnumSet;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.SystemKeyspace.BootstrapState;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.schema.SchemaKeyspace;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-
-class MigrationTask extends WrappedRunnable
-{
-    private static final Logger logger = LoggerFactory.getLogger(MigrationTask.class);
-
-    private static final ConcurrentLinkedQueue<CountDownLatch> inflightTasks = new ConcurrentLinkedQueue<>();
-
-    private static final Set<BootstrapState> monitoringBootstrapStates = EnumSet.of(BootstrapState.NEEDS_BOOTSTRAP, BootstrapState.IN_PROGRESS);
-
-    private final InetAddress endpoint;
-
-    MigrationTask(InetAddress endpoint)
-    {
-        this.endpoint = endpoint;
-    }
-
-    public static ConcurrentLinkedQueue<CountDownLatch> getInflightTasks()
-    {
-        return inflightTasks;
-    }
-
-    public void runMayThrow() throws Exception
-    {
-        if (!FailureDetector.instance.isAlive(endpoint))
-        {
-            logger.warn("Can't send schema pull request: node {} is down.", endpoint);
-            return;
-        }
-
-        // There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(),
-        // potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with
-        // a higher major.
-        if (!MigrationManager.shouldPullSchemaFrom(endpoint))
-        {
-            logger.info("Skipped sending a migration request: node {} has a higher major version now.", endpoint);
-            return;
-        }
-
-        MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);
-
-        final CountDownLatch completionLatch = new CountDownLatch(1);
-
-        IAsyncCallback<Collection<Mutation>> cb = new IAsyncCallback<Collection<Mutation>>()
-        {
-            @Override
-            public void response(MessageIn<Collection<Mutation>> message)
-            {
-                try
-                {
-                    SchemaKeyspace.mergeSchemaAndAnnounceVersion(message.payload);
-                }
-                catch (ConfigurationException e)
-                {
-                    logger.error("Configuration exception merging remote schema", e);
-                }
-                finally
-                {
-                    completionLatch.countDown();
-                }
-            }
-
-            public boolean isLatencyForSnitch()
-            {
-                return false;
-            }
-        };
-
-        // Only save the latches if we need bootstrap or are bootstrapping
-        if (monitoringBootstrapStates.contains(SystemKeyspace.getBootstrapState()))
-            inflightTasks.offer(completionLatch);
-
-        MessagingService.instance().sendRR(message, endpoint, cb);
-    }
-}
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index c4309f8..3718e8c 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -27,6 +27,7 @@ import java.util.Map.Entry;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import javax.management.*;
 import javax.management.openmbean.TabularData;
@@ -112,6 +113,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     private static final Logger logger = LoggerFactory.getLogger(StorageService.class);
 
     public static final int RING_DELAY = getRingDelay(); // delay after which we assume ring has stablized
+    public static final int SCHEMA_DELAY = getRingDelay(); // delay after which we assume ring has stablized
+
+    private static final boolean REQUIRE_SCHEMAS = !Boolean.getBoolean("cassandra.skip_schema_check");
 
     private final JMXProgressSupport progressSupport = new JMXProgressSupport(this);
 
@@ -134,6 +138,20 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             return 30 * 1000;
     }
 
+    private static int getSchemaDelay()
+    {
+        String newdelay = System.getProperty("cassandra.schema_delay_ms");
+        if (newdelay != null)
+        {
+            logger.info("Overriding SCHEMA_DELAY to {}ms", newdelay);
+            return Integer.parseInt(newdelay);
+        }
+        else
+        {
+            return 30 * 1000;
+        }
+    }
+
     /* This abstraction maintains the token/endpoint metadata information */
     private TokenMetadata tokenMetadata = new TokenMetadata();
 
@@ -764,6 +782,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     @VisibleForTesting
     public void prepareToJoin() throws ConfigurationException
     {
+        MigrationCoordinator.instance.start();
         if (!joined)
         {
             Map<ApplicationState, VersionedValue> appStates = new EnumMap<>(ApplicationState.class);
@@ -840,6 +859,34 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
     }
 
+    public void waitForSchema(int delay)
+    {
+        // first sleep the delay to make sure we see all our peers
+        for (long i = 0; i < delay; i += 1000)
+        {
+            // if we see schema, we can proceed to the next check directly
+            if (!Schema.instance.isEmpty())
+            {
+                logger.debug("current schema version: {}", Schema.instance.getVersion());
+                break;
+            }
+            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+        }
+
+        boolean schemasReceived = MigrationCoordinator.instance.awaitSchemaRequests(TimeUnit.SECONDS.toMillis(SCHEMA_DELAY));
+
+        if (schemasReceived)
+            return;
+
+        logger.warn(String.format("There are nodes in the cluster with a different schema version than us we did not merged schemas from, " +
+                                  "our version : (%s), outstanding versions -> endpoints : %s",
+                                  Schema.instance.getVersion(),
+                                  MigrationCoordinator.instance.outstandingVersions()));
+
+        if (REQUIRE_SCHEMAS)
+            throw new RuntimeException("Didn't receive schemas for all known versions within the timeout");
+    }
+
     @VisibleForTesting
     public void joinTokenRing(int delay) throws ConfigurationException
     {
@@ -887,14 +934,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 }
                 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
             }
-            // if our schema hasn't matched yet, wait until it has
-            // we do this by waiting for all in-flight migration requests and responses to complete
-            // (post CASSANDRA-1391 we don't expect this to be necessary very often, but it doesn't hurt to be careful)
-            if (!MigrationManager.isReadyForBootstrap())
-            {
-                setMode(Mode.JOINING, "waiting for schema information to complete", true);
-                MigrationManager.waitUntilReadyForBootstrap();
-            }
+            waitForSchema(delay);
             setMode(Mode.JOINING, "schema complete, ready to bootstrap", true);
             setMode(Mode.JOINING, "waiting for pending range calculation", true);
             PendingRangeCalculatorService.instance.blockUntilFinished();
@@ -1828,7 +1868,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                         break;
                     case SCHEMA:
                         SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(value.value), executor);
-                        MigrationManager.instance.scheduleSchemaPull(endpoint, epState);
+                        MigrationCoordinator.instance.reportEndpointVersion(endpoint, UUID.fromString(value.value));
                         break;
                     case HOST_ID:
                         SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(value.value), executor);
@@ -2589,13 +2629,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         {
             onChange(endpoint, entry.getKey(), entry.getValue());
         }
-        MigrationManager.instance.scheduleSchemaPull(endpoint, epState);
+        MigrationCoordinator.instance.reportEndpointVersion(endpoint, epState);
     }
 
     public void onAlive(InetAddress endpoint, EndpointState state)
     {
-        MigrationManager.instance.scheduleSchemaPull(endpoint, state);
-
         if (tokenMetadata.isMember(endpoint))
             notifyUp(endpoint);
     }
@@ -4901,4 +4939,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             Runtime.getRuntime().removeShutdownHook(drainOnShutdown);
         }
     }
+
+    @Override
+    public Map<String, Set<InetAddress>> getOutstandingSchemaVersions()
+    {
+        Map<UUID, Set<InetAddress>> outstanding = MigrationCoordinator.instance.outstandingVersions();
+        return outstanding.entrySet().stream().collect(Collectors.toMap(e -> e.getKey().toString(), Entry::getValue));
+    }
 }
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 1afa48e..dd534cb 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
@@ -634,4 +635,7 @@ public interface StorageServiceMBean extends NotificationEmitter
 
     /** Returns the max version that this node will negotiate for native protocol connections */
     public int getMaxNativeProtocolVersion();
+
+    /** Returns a map of schema version -> list of endpoints reporting that version that we need schema updates for */
+    public Map<String, Set<InetAddress>> getOutstandingSchemaVersions();
 }
diff --git a/test/unit/org/apache/cassandra/service/MigrationCoordinatorTest.java b/test/unit/org/apache/cassandra/service/MigrationCoordinatorTest.java
new file mode 100644
index 0000000..070537d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/MigrationCoordinatorTest.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Future;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
+
+import static com.google.common.util.concurrent.Futures.getUnchecked;
+
+public class MigrationCoordinatorTest
+{
+    private static final Logger logger = LoggerFactory.getLogger(MigrationCoordinatorTest.class);
+
+    private static final InetAddress EP1;
+    private static final InetAddress EP2;
+    private static final InetAddress EP3;
+
+    private static final UUID LOCAL_VERSION = UUID.randomUUID();
+    private static final UUID V1 = UUID.randomUUID();
+    private static final UUID V2 = UUID.randomUUID();
+    private static final UUID V3 = UUID.randomUUID();
+
+    static
+    {
+        try
+        {
+            EP1 = InetAddress.getByName("10.0.0.1");
+            EP2 = InetAddress.getByName("10.0.0.2");
+            EP3 = InetAddress.getByName("10.0.0.3");
+        }
+        catch (UnknownHostException e)
+        {
+            throw new AssertionError(e);
+        }
+
+        DatabaseDescriptor.forceStaticInitialization();
+    }
+
+    private static class InstrumentedCoordinator extends MigrationCoordinator
+    {
+
+        Queue<Callback> requests = new LinkedList<>();
+        @Override
+        protected void sendMigrationMessage(MigrationCoordinator.Callback callback)
+        {
+            requests.add(callback);
+        }
+
+        boolean shouldPullSchema = true;
+        @Override
+        protected boolean shouldPullSchema(UUID version)
+        {
+            return shouldPullSchema;
+        }
+
+        boolean shouldPullFromEndpoint = true;
+        @Override
+        protected boolean shouldPullFromEndpoint(InetAddress endpoint)
+        {
+            return shouldPullFromEndpoint;
+        }
+
+        boolean shouldPullImmediately = true;
+        @Override
+        protected boolean shouldPullImmediately(InetAddress endpoint, UUID version)
+        {
+            return shouldPullImmediately;
+        }
+
+        Set<InetAddress> deadNodes = new HashSet<>();
+        protected boolean isAlive(InetAddress endpoint)
+        {
+            return !deadNodes.contains(endpoint);
+        }
+
+        UUID localVersion = LOCAL_VERSION;
+        @Override
+        protected boolean isLocalVersion(UUID version)
+        {
+            return localVersion.equals(version);
+        }
+
+        int maxOutstandingRequests = 3;
+        @Override
+        protected int getMaxOutstandingVersionRequests()
+        {
+            return maxOutstandingRequests;
+        }
+
+        Set<InetAddress> mergedSchemasFrom = new HashSet<>();
+        @Override
+        protected void mergeSchemaFrom(InetAddress endpoint, Collection<Mutation> mutations)
+        {
+            mergedSchemasFrom.add(endpoint);
+        }
+    }
+
+    @Test
+    public void requestResponseCycle() throws InterruptedException
+    {
+        InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+        coordinator.maxOutstandingRequests = 1;
+
+        Assert.assertTrue(coordinator.requests.isEmpty());
+
+        // first schema report should send a migration request
+        getUnchecked(coordinator.reportEndpointVersion(EP1, V1));
+        Assert.assertEquals(1, coordinator.requests.size());
+        Assert.assertFalse(coordinator.awaitSchemaRequests(1));
+
+        // second should not
+        getUnchecked(coordinator.reportEndpointVersion(EP2, V1));
+        Assert.assertEquals(1, coordinator.requests.size());
+        Assert.assertFalse(coordinator.awaitSchemaRequests(1));
+
+        // until the first request fails, then the second endpoint should be contacted
+        MigrationCoordinator.Callback request1 = coordinator.requests.poll();
+        Assert.assertEquals(EP1, request1.endpoint);
+        getUnchecked(request1.fail());
+        Assert.assertTrue(coordinator.mergedSchemasFrom.isEmpty());
+        Assert.assertFalse(coordinator.awaitSchemaRequests(1));
+
+        // ... then the second endpoint should be contacted
+        Assert.assertEquals(1, coordinator.requests.size());
+        MigrationCoordinator.Callback request2 = coordinator.requests.poll();
+        Assert.assertEquals(EP2, request2.endpoint);
+        Assert.assertFalse(coordinator.awaitSchemaRequests(1));
+        getUnchecked(request2.response(Collections.emptyList()));
+        Assert.assertEquals(EP2, Iterables.getOnlyElement(coordinator.mergedSchemasFrom));
+        Assert.assertTrue(coordinator.awaitSchemaRequests(1));
+
+        // and migration tasks should not be sent out for subsequent version reports
+        getUnchecked(coordinator.reportEndpointVersion(EP3, V1));
+        Assert.assertTrue(coordinator.requests.isEmpty());
+
+    }
+
+    /**
+     * If we don't send a request for a version, and endpoints associated with
+     * it all change versions, we should signal anyone waiting on that version
+     */
+    @Test
+    public void versionsAreSignaledWhenDeleted()
+    {
+        InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+
+        coordinator.reportEndpointVersion(EP1, V1);
+        WaitQueue.Signal signal = coordinator.getVersionInfoUnsafe(V1).register();
+        Assert.assertFalse(signal.isSignalled());
+
+        coordinator.reportEndpointVersion(EP1, V2);
+        Assert.assertNull(coordinator.getVersionInfoUnsafe(V1));
+
+        Assert.assertTrue(signal.isSignalled());
+    }
+
+    private static void assertNoContact(InstrumentedCoordinator coordinator, InetAddress endpoint, UUID version, boolean startupShouldBeUnblocked)
+    {
+        Assert.assertTrue(coordinator.requests.isEmpty());
+        Future<Void> future = coordinator.reportEndpointVersion(EP1, V1);
+        if (future != null)
+            getUnchecked(future);
+        Assert.assertTrue(coordinator.requests.isEmpty());
+
+        Assert.assertEquals(startupShouldBeUnblocked, coordinator.awaitSchemaRequests(1));
+    }
+
+    private static void assertNoContact(InstrumentedCoordinator coordinator, boolean startupShouldBeUnblocked)
+    {
+        assertNoContact(coordinator, EP1, V1, startupShouldBeUnblocked);
+    }
+
+    @Test
+    public void dontContactNodesWithSameSchema()
+    {
+        InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+
+        coordinator.localVersion = V1;
+        assertNoContact(coordinator, true);
+    }
+
+    @Test
+    public void dontContactIncompatibleNodes()
+    {
+        InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+
+        coordinator.shouldPullFromEndpoint = false;
+        assertNoContact(coordinator, false);
+    }
+
+    @Test
+    public void dontContactDeadNodes()
+    {
+        InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+
+        coordinator.deadNodes.add(EP1);
+        assertNoContact(coordinator, EP1, V1, false);
+    }
+
+    /**
+     * If a node has become incompativle between when the task was scheduled and when it
+     * was run, we should detect that and fail the task
+     */
+    @Test
+    public void testGossipRace()
+    {
+        InstrumentedCoordinator coordinator = new InstrumentedCoordinator() {
+            protected boolean shouldPullImmediately(InetAddress endpoint, UUID version)
+            {
+                // this is the last thing that gets called before scheduling the pull, so set this flag here
+                shouldPullFromEndpoint = false;
+                return super.shouldPullImmediately(endpoint, version);
+            }
+        };
+
+        Assert.assertTrue(coordinator.shouldPullFromEndpoint(EP1));
+        assertNoContact(coordinator, EP1, V1, false);
+    }
+
+    @Test
+    public void testWeKeepSendingRequests()
+    {
+        InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+
+        getUnchecked(coordinator.reportEndpointVersion(EP3, V2));
+        coordinator.requests.remove().response(Collections.emptyList());
+
+        getUnchecked(coordinator.reportEndpointVersion(EP1, V1));
+        getUnchecked(coordinator.reportEndpointVersion(EP2, V1));
+
+        MigrationCoordinator.Callback prev = null;
+        Set<InetAddress> EPs = Sets.newHashSet(EP1, EP2);
+        int ep1requests = 0;
+        int ep2requests = 0;
+
+        for (int i=0; i<10; i++)
+        {
+            Assert.assertEquals(String.format("%s", i), 2, coordinator.requests.size());
+
+            MigrationCoordinator.Callback next = coordinator.requests.remove();
+
+            // we should be contacting endpoints in a round robin fashion
+            Assert.assertTrue(EPs.contains(next.endpoint));
+            if (prev != null && prev.endpoint.equals(next.endpoint))
+                Assert.fail(String.format("Not expecting prev %s to be equal to next %s", prev.endpoint, next.endpoint));
+
+            // should send a new request
+            next.fail();
+            prev = next;
+            Assert.assertFalse(coordinator.awaitSchemaRequests(1));
+
+            Assert.assertEquals(2, coordinator.requests.size());
+        }
+        logger.info("{} -> {}", EP1, ep1requests);
+        logger.info("{} -> {}", EP2, ep2requests);
+
+        // a single success should unblock startup though
+        coordinator.requests.remove().response(Collections.emptyList());
+        Assert.assertTrue(coordinator.awaitSchemaRequests(1));
+
+    }
+
+    /**
+     * Pull unreceived schemas should detect and send requests out for any
+     * schemas that are marked unreceived and have no outstanding requests
+     */
+    @Test
+    public void pullUnreceived()
+    {
+        InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+
+        coordinator.shouldPullFromEndpoint = false;
+        assertNoContact(coordinator, false);
+
+        coordinator.shouldPullFromEndpoint = true;
+        Assert.assertEquals(0, coordinator.requests.size());
+        List<Future<Void>> futures = coordinator.pullUnreceivedSchemaVersions();
+        futures.forEach(Futures::getUnchecked);
+        Assert.assertEquals(1, coordinator.requests.size());
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org