You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2021/04/13 13:02:49 UTC

[cassandra] branch trunk updated (44f5b8a -> edffdf6)

This is an automated email from the ASF dual-hosted git repository.

brandonwilliams pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 44f5b8a  Flaky StorageServiceServerTest
     new f7365cb  Don't wait for migrations from removed nodes, mention flag to skip
     new db6e704  Don't wait for migrations from removed nodes, mention flag to skip
     new edffdf6  Merge branch 'cassandra-3.11' into trunk

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                         |  1 +
 .../cassandra/schema/MigrationCoordinator.java      |  9 +++++++++
 .../apache/cassandra/service/StorageService.java    |  8 +++++---
 .../cassandra/schema/MigrationCoordinatorTest.java  | 21 +++++++++++++++++++++
 4 files changed, 36 insertions(+), 3 deletions(-)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/02: Don't wait for migrations from removed nodes, mention flag to skip

Posted by br...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

brandonwilliams pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit f7365cb0d988ff8aecd20a18d70831d39954b4d4
Author: Brandon Williams <br...@apache.org>
AuthorDate: Fri Apr 9 16:19:17 2021 -0500

    Don't wait for migrations from removed nodes, mention flag to skip
    
    Patch by brandonwilliams; reviewed by Adam Holmberg, adelapena and
    bdeggleston for CASSANDRA-16577
---
 CHANGES.txt                                        |  1 +
 .../cassandra/schema/MigrationCoordinator.java     | 11 ++++++++++-
 .../apache/cassandra/service/StorageService.java   |  8 +++++---
 .../cassandra/schema/MigrationCoordinatorTest.java | 23 +++++++++++++++++++++-
 4 files changed, 38 insertions(+), 5 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 15b8dfb..c22d6a6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -58,6 +58,7 @@ Merged from 3.11:
  * Reduce amount of allocations during batch statement execution (CASSANDRA-16201)
  * Update jflex-1.6.0.jar to match upstream (CASSANDRA-16393)
 Merged from 3.0:
+ * Don't wait for schema migrations from removed nodes (CASSANDRA-16577)
  * Ignore trailing zeros in hint files (CASSANDRA-16523)
  * Refuse DROP COMPACT STORAGE if some 2.x sstables are in use (CASSANDRA-15897)
  * Fix ColumnFilter::toString not returning a valid CQL fragment (CASSANDRA-16483)
diff --git a/src/java/org/apache/cassandra/schema/MigrationCoordinator.java b/src/java/org/apache/cassandra/schema/MigrationCoordinator.java
index 9046479..554f545 100644
--- a/src/java/org/apache/cassandra/schema/MigrationCoordinator.java
+++ b/src/java/org/apache/cassandra/schema/MigrationCoordinator.java
@@ -349,7 +349,16 @@ public class MigrationCoordinator
         }
     }
 
-    Future<Void> scheduleSchemaPull(InetAddressAndPort endpoint, VersionInfo info)
+    public synchronized void removeVersionInfoForEndpoint(InetAddress endpoint)
+    {
+        Set<UUID> versions = ImmutableSet.copyOf(versionInfo.keySet());
+        for (UUID version : versions)
+        {
+            removeEndpointFromVersion(endpoint, version);
+        }
+    } 
+
+    Future<Void> scheduleSchemaPull(InetAddress endpoint, VersionInfo info)
     {
         FutureTask<Void> task = new FutureTask<>(() -> pullSchema(new Callback(endpoint, info)), null);
         if (shouldPullImmediately(endpoint, info.version))
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 91007a7..c277f9d 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1000,12 +1000,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             return;
 
         logger.warn(String.format("There are nodes in the cluster with a different schema version than us we did not merged schemas from, " +
-                                  "our version : (%s), outstanding versions -> endpoints : %s",
+                                  "our version : (%s), outstanding versions -> endpoints : %s. Use -Dcassandra.skip_schema_check=true " +
+                                  "to ignore this.",
                                   Schema.instance.getVersion(),
                                   MigrationCoordinator.instance.outstandingVersions()));
 
         if (REQUIRE_SCHEMAS)
-            throw new RuntimeException("Didn't receive schemas for all known versions within the timeout");
+            throw new RuntimeException("Didn't receive schemas for all known versions within the timeout. " +
+                                       "Use -Dcassandra.skip_schema_check=true to skip this check.");
     }
 
     private void joinTokenRing(long schemaTimeoutMillis) throws ConfigurationException
@@ -2951,6 +2953,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     private void removeEndpoint(InetAddressAndPort endpoint)
     {
         Gossiper.runInGossipStageBlocking(() -> Gossiper.instance.removeEndpoint(endpoint));
+        MigrationCoordinator.instance.removeVersionInfoForEndpoint(endpoint);
         SystemKeyspace.removeEndpoint(endpoint);
     }
 
@@ -3254,7 +3257,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         {
             onChange(endpoint, entry.getKey(), entry.getValue());
         }
-        MigrationCoordinator.instance.reportEndpointVersion(endpoint, epState);
     }
 
     public void onAlive(InetAddressAndPort endpoint, EndpointState state)
diff --git a/test/unit/org/apache/cassandra/schema/MigrationCoordinatorTest.java b/test/unit/org/apache/cassandra/schema/MigrationCoordinatorTest.java
index 1695563..9cc8c94 100644
--- a/test/unit/org/apache/cassandra/schema/MigrationCoordinatorTest.java
+++ b/test/unit/org/apache/cassandra/schema/MigrationCoordinatorTest.java
@@ -191,7 +191,28 @@ public class MigrationCoordinatorTest
         Assert.assertTrue(signal.isSignalled());
     }
 
-    private static void assertNoContact(InstrumentedCoordinator coordinator, InetAddressAndPort endpoint, UUID version, boolean startupShouldBeUnblocked)
+	/**
+	 * If an endpoint is removed and no other endpoints are reporting its
+	 * schema version, the version should be removed and we should signal
+	 * anyone waiting on that version
+	 */
+	@Test
+	public void versionsAreSignaledWhenEndpointsRemoved()
+	{
+		InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+
+		coordinator.reportEndpointVersion(EP1, V1);
+		WaitQueue.Signal signal = coordinator.getVersionInfoUnsafe(V1).register();
+		Assert.assertFalse(signal.isSignalled());
+
+		coordinator.removeVersionInfoForEndpoint(EP1);
+		Assert.assertNull(coordinator.getVersionInfoUnsafe(V1));
+
+		Assert.assertTrue(signal.isSignalled());
+	}
+
+
+    private static void assertNoContact(InstrumentedCoordinator coordinator, InetAddress endpoint, UUID version, boolean startupShouldBeUnblocked)
     {
         Assert.assertTrue(coordinator.requests.isEmpty());
         Future<Void> future = coordinator.reportEndpointVersion(EP1, V1);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 02/02: Merge branch 'cassandra-3.11' into trunk

Posted by br...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

brandonwilliams pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit edffdf6dcc092780757e278d5df789653f20bf37
Merge: f7365cb db6e704
Author: Brandon Williams <br...@apache.org>
AuthorDate: Tue Apr 13 07:47:50 2021 -0500

    Merge branch 'cassandra-3.11' into trunk

 src/java/org/apache/cassandra/schema/MigrationCoordinator.java      | 4 ++--
 test/unit/org/apache/cassandra/schema/MigrationCoordinatorTest.java | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --cc src/java/org/apache/cassandra/schema/MigrationCoordinator.java
index 554f545,0000000..6b6640e
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/schema/MigrationCoordinator.java
+++ b/src/java/org/apache/cassandra/schema/MigrationCoordinator.java
@@@ -1,535 -1,0 +1,535 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.schema;
 +
 +import java.lang.management.ManagementFactory;
 +import java.util.ArrayDeque;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Deque;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.FutureTask;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.function.LongSupplier;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.collect.ImmutableSet;
 +import com.google.common.collect.Sets;
 +import com.google.common.util.concurrent.Futures;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.ScheduledExecutors;
 +import org.apache.cassandra.concurrent.Stage;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.exceptions.RequestFailureReason;
 +import org.apache.cassandra.gms.ApplicationState;
 +import org.apache.cassandra.gms.EndpointState;
 +import org.apache.cassandra.gms.FailureDetector;
 +import org.apache.cassandra.gms.Gossiper;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.net.Message;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.net.NoPayload;
 +import org.apache.cassandra.net.RequestCallback;
 +import org.apache.cassandra.net.Verb;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.concurrent.WaitQueue;
 +
 +public class MigrationCoordinator
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(MigrationCoordinator.class);
 +    private static final Future<Void> FINISHED_FUTURE = Futures.immediateFuture(null);
 +
 +    private static LongSupplier getUptimeFn = () -> ManagementFactory.getRuntimeMXBean().getUptime();
 +
 +    @VisibleForTesting
 +    public static void setUptimeFn(LongSupplier supplier)
 +    {
 +        getUptimeFn = supplier;
 +    }
 +
 +
 +    private static final int MIGRATION_DELAY_IN_MS = 60000;
 +    private static final int MAX_OUTSTANDING_VERSION_REQUESTS = 3;
 +
 +    public static final MigrationCoordinator instance = new MigrationCoordinator();
 +
 +    static class VersionInfo
 +    {
 +        final UUID version;
 +
 +        final Set<InetAddressAndPort> endpoints           = Sets.newConcurrentHashSet();
 +        final Set<InetAddressAndPort> outstandingRequests = Sets.newConcurrentHashSet();
 +        final Deque<InetAddressAndPort> requestQueue      = new ArrayDeque<>();
 +
 +        private final WaitQueue waitQueue = new WaitQueue();
 +
 +        volatile boolean receivedSchema;
 +
 +        VersionInfo(UUID version)
 +        {
 +            this.version = version;
 +        }
 +
 +        WaitQueue.Signal register()
 +        {
 +            return waitQueue.register();
 +        }
 +
 +        void markReceived()
 +        {
 +            if (receivedSchema)
 +                return;
 +
 +            receivedSchema = true;
 +            waitQueue.signalAll();
 +        }
 +
 +        boolean wasReceived()
 +        {
 +            return receivedSchema;
 +        }
 +    }
 +
 +    private final Map<UUID, VersionInfo> versionInfo = new HashMap<>();
 +    private final Map<InetAddressAndPort, UUID> endpointVersions = new HashMap<>();
 +    private final AtomicInteger inflightTasks = new AtomicInteger();
 +
 +    public void start()
 +    {
 +        ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(this::pullUnreceivedSchemaVersions, 1, 1, TimeUnit.MINUTES);
 +    }
 +
 +    public synchronized void reset()
 +    {
 +        versionInfo.clear();
 +    }
 +
 +    synchronized List<Future<Void>> pullUnreceivedSchemaVersions()
 +    {
 +        List<Future<Void>> futures = new ArrayList<>();
 +        for (VersionInfo info : versionInfo.values())
 +        {
 +            if (info.wasReceived() || info.outstandingRequests.size() > 0)
 +                continue;
 +
 +            Future<Void> future = maybePullSchema(info);
 +            if (future != null && future != FINISHED_FUTURE)
 +                futures.add(future);
 +        }
 +
 +        return futures;
 +    }
 +
 +    synchronized Future<Void> maybePullSchema(VersionInfo info)
 +    {
 +        if (info.endpoints.isEmpty() || info.wasReceived() || !shouldPullSchema(info.version))
 +            return FINISHED_FUTURE;
 +
 +        if (info.outstandingRequests.size() >= getMaxOutstandingVersionRequests())
 +            return FINISHED_FUTURE;
 +
 +        for (int i=0, isize=info.requestQueue.size(); i<isize; i++)
 +        {
 +            InetAddressAndPort endpoint = info.requestQueue.remove();
 +            if (!info.endpoints.contains(endpoint))
 +                continue;
 +
 +            if (shouldPullFromEndpoint(endpoint) && info.outstandingRequests.add(endpoint))
 +            {
 +                return scheduleSchemaPull(endpoint, info);
 +            }
 +            else
 +            {
 +                // return to queue
 +                info.requestQueue.offer(endpoint);
 +            }
 +        }
 +
 +        // no suitable endpoints were found, check again in a minute, the periodic task will pick it up
 +        return null;
 +    }
 +
 +    public synchronized Map<UUID, Set<InetAddressAndPort>> outstandingVersions()
 +    {
 +        HashMap<UUID, Set<InetAddressAndPort>> map = new HashMap<>();
 +        for (VersionInfo info : versionInfo.values())
 +            if (!info.wasReceived())
 +                map.put(info.version, ImmutableSet.copyOf(info.endpoints));
 +        return map;
 +    }
 +
 +    @VisibleForTesting
 +    protected VersionInfo getVersionInfoUnsafe(UUID version)
 +    {
 +        return versionInfo.get(version);
 +    }
 +
 +    @VisibleForTesting
 +    protected int getMaxOutstandingVersionRequests()
 +    {
 +        return MAX_OUTSTANDING_VERSION_REQUESTS;
 +    }
 +
 +    @VisibleForTesting
 +    protected boolean isAlive(InetAddressAndPort endpoint)
 +    {
 +        return FailureDetector.instance.isAlive(endpoint);
 +    }
 +
 +    @VisibleForTesting
 +    protected boolean shouldPullSchema(UUID version)
 +    {
 +        if (Schema.instance.getVersion() == null)
 +        {
 +            logger.debug("Not pulling schema for version {}, because local schama version is not known yet", version);
 +            return false;
 +        }
 +
 +        if (Schema.instance.isSameVersion(version))
 +        {
 +            logger.debug("Not pulling schema for version {}, because schema versions match: " +
 +                         "local={}, remote={}",
 +                         version,
 +                         Schema.schemaVersionToString(Schema.instance.getVersion()),
 +                         Schema.schemaVersionToString(version));
 +            return false;
 +        }
 +        return true;
 +    }
 +
 +    // Since 3.0.14 protocol contains only a CASSANDRA-13004 bugfix, it is safe to accept schema changes
 +    // from both 3.0 and 3.0.14.
 +    private static boolean is30Compatible(int version)
 +    {
 +        return version == MessagingService.current_version || version == MessagingService.VERSION_3014;
 +    }
 +
 +    @VisibleForTesting
 +    protected boolean shouldPullFromEndpoint(InetAddressAndPort endpoint)
 +    {
 +        if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
 +            return false;
 +
 +        EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
 +        if (state == null)
 +            return false;
 +
 +        final String releaseVersion = state.getApplicationState(ApplicationState.RELEASE_VERSION).value;
 +        final String ourMajorVersion = FBUtilities.getReleaseVersionMajor();
 +
 +        if (!releaseVersion.startsWith(ourMajorVersion))
 +        {
 +            logger.debug("Not pulling schema from {} because release version in Gossip is not major version {}, it is {}",
 +                         endpoint, ourMajorVersion, releaseVersion);
 +            return false;
 +        }
 +
 +        if (!MessagingService.instance().versions.knows(endpoint))
 +        {
 +            logger.debug("Not pulling schema from {} because their messaging version is unknown", endpoint);
 +            return false;
 +        }
 +
 +        if (MessagingService.instance().versions.getRaw(endpoint) != MessagingService.current_version)
 +        {
 +            logger.debug("Not pulling schema from {} because their schema format is incompatible", endpoint);
 +            return false;
 +        }
 +
 +        if (Gossiper.instance.isGossipOnlyMember(endpoint))
 +        {
 +            logger.debug("Not pulling schema from {} because it's a gossip only member", endpoint);
 +            return false;
 +        }
 +        return true;
 +    }
 +
 +    @VisibleForTesting
 +    protected boolean shouldPullImmediately(InetAddressAndPort endpoint, UUID version)
 +    {
 +        if (Schema.instance.isEmpty() || getUptimeFn.getAsLong() < MIGRATION_DELAY_IN_MS)
 +        {
 +            // If we think we may be bootstrapping or have recently started, submit MigrationTask immediately
 +            logger.debug("Immediately submitting migration task for {}, " +
 +                         "schema versions: local={}, remote={}",
 +                         endpoint,
 +                         Schema.schemaVersionToString(Schema.instance.getVersion()),
 +                         Schema.schemaVersionToString(version));
 +            return true;
 +        }
 +        return false;
 +    }
 +
 +    @VisibleForTesting
 +    protected boolean isLocalVersion(UUID version)
 +    {
 +        return Schema.instance.isSameVersion(version);
 +    }
 +
 +    /**
 +     * If a previous schema update brought our version the same as the incoming schema, don't apply it
 +     */
 +    synchronized boolean shouldApplySchemaFor(VersionInfo info)
 +    {
 +        if (info.wasReceived())
 +            return false;
 +        return !isLocalVersion(info.version);
 +    }
 +
 +    public synchronized Future<Void> reportEndpointVersion(InetAddressAndPort endpoint, UUID version)
 +    {
 +        UUID current = endpointVersions.put(endpoint, version);
 +        if (current != null && current.equals(version))
 +            return FINISHED_FUTURE;
 +
 +        VersionInfo info = versionInfo.computeIfAbsent(version, VersionInfo::new);
 +        if (isLocalVersion(version))
 +            info.markReceived();
 +        info.endpoints.add(endpoint);
 +        info.requestQueue.addFirst(endpoint);
 +
 +        // disassociate this endpoint from its (now) previous schema version
 +        removeEndpointFromVersion(endpoint, current);
 +
 +        return maybePullSchema(info);
 +    }
 +
 +    public Future<Void> reportEndpointVersion(InetAddressAndPort endpoint, EndpointState state)
 +    {
 +        if (state == null)
 +            return FINISHED_FUTURE;
 +
 +        UUID version = state.getSchemaVersion();
 +
 +        if (version == null)
 +            return FINISHED_FUTURE;
 +
 +        return reportEndpointVersion(endpoint, version);
 +    }
 +
 +    private synchronized void removeEndpointFromVersion(InetAddressAndPort endpoint, UUID version)
 +    {
 +        if (version == null)
 +            return;
 +
 +        VersionInfo info = versionInfo.get(version);
 +
 +        if (info == null)
 +            return;
 +
 +        info.endpoints.remove(endpoint);
 +        if (info.endpoints.isEmpty())
 +        {
 +            info.waitQueue.signalAll();
 +            versionInfo.remove(version);
 +        }
 +    }
 +
-     public synchronized void removeVersionInfoForEndpoint(InetAddress endpoint)
++    public synchronized void removeVersionInfoForEndpoint(InetAddressAndPort endpoint)
 +    {
 +        Set<UUID> versions = ImmutableSet.copyOf(versionInfo.keySet());
 +        for (UUID version : versions)
 +        {
 +            removeEndpointFromVersion(endpoint, version);
 +        }
 +    } 
 +
-     Future<Void> scheduleSchemaPull(InetAddress endpoint, VersionInfo info)
++    Future<Void> scheduleSchemaPull(InetAddressAndPort endpoint, VersionInfo info)
 +    {
 +        FutureTask<Void> task = new FutureTask<>(() -> pullSchema(new Callback(endpoint, info)), null);
 +        if (shouldPullImmediately(endpoint, info.version))
 +        {
 +            submitToMigrationIfNotShutdown(task);
 +        }
 +        else
 +        {
 +            ScheduledExecutors.nonPeriodicTasks.schedule(()->submitToMigrationIfNotShutdown(task), MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);
 +        }
 +        return task;
 +    }
 +
 +    private static Future<?> submitToMigrationIfNotShutdown(Runnable task)
 +    {
 +        if (Stage.MIGRATION.executor().isShutdown() || Stage.MIGRATION.executor().isTerminated())
 +        {
 +            logger.info("Skipped scheduled pulling schema from other nodes: the MIGRATION executor service has been shutdown.");
 +            return null;
 +        }
 +        else
 +            return Stage.MIGRATION.submit(task);
 +    }
 +
 +    @VisibleForTesting
 +    protected void mergeSchemaFrom(InetAddressAndPort endpoint, Collection<Mutation> mutations)
 +    {
 +        Schema.instance.mergeAndAnnounceVersion(mutations);
 +    }
 +
 +    class Callback implements RequestCallback<Collection<Mutation>>
 +    {
 +        final InetAddressAndPort endpoint;
 +        final VersionInfo info;
 +
 +        public Callback(InetAddressAndPort endpoint, VersionInfo info)
 +        {
 +            this.endpoint = endpoint;
 +            this.info = info;
 +        }
 +
 +        public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason)
 +        {
 +            fail();
 +        }
 +
 +        Future<Void> fail()
 +        {
 +            return pullComplete(endpoint, info, false);
 +        }
 +
 +        public void onResponse(Message<Collection<Mutation>> message)
 +        {
 +            response(message.payload);
 +        }
 +
 +        Future<Void> response(Collection<Mutation> mutations)
 +        {
 +            synchronized (info)
 +            {
 +                if (shouldApplySchemaFor(info))
 +                {
 +                    try
 +                    {
 +                        mergeSchemaFrom(endpoint, mutations);
 +                    }
 +                    catch (Exception e)
 +                    {
 +                        logger.error(String.format("Unable to merge schema from %s", endpoint), e);
 +                        return fail();
 +                    }
 +                }
 +                return pullComplete(endpoint, info, true);
 +            }
 +        }
 +
 +        public boolean isLatencyForSnitch()
 +        {
 +            return false;
 +        }
 +    }
 +
 +    private void pullSchema(Callback callback)
 +    {
 +        if (!isAlive(callback.endpoint))
 +        {
 +            logger.warn("Can't send schema pull request: node {} is down.", callback.endpoint);
 +            callback.fail();
 +            return;
 +        }
 +
 +        // There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(),
 +        // potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with
 +        // a higher major.
 +        if (!shouldPullFromEndpoint(callback.endpoint))
 +        {
 +            logger.info("Skipped sending a migration request: node {} has a higher major version now.", callback.endpoint);
 +            callback.fail();
 +            return;
 +        }
 +
 +        logger.debug("Requesting schema from {}", callback.endpoint);
 +        sendMigrationMessage(callback);
 +    }
 +
 +    protected void sendMigrationMessage(Callback callback)
 +    {
 +        inflightTasks.getAndIncrement();
 +        Message message = Message.out(Verb.SCHEMA_PULL_REQ, NoPayload.noPayload);
 +        logger.info("Sending schema pull request to {}", callback.endpoint);
 +        MessagingService.instance().sendWithCallback(message, callback.endpoint, callback);
 +    }
 +
 +    private synchronized Future<Void> pullComplete(InetAddressAndPort endpoint, VersionInfo info, boolean wasSuccessful)
 +    {
 +        inflightTasks.decrementAndGet();
 +        if (wasSuccessful)
 +            info.markReceived();
 +
 +        info.outstandingRequests.remove(endpoint);
 +        info.requestQueue.add(endpoint);
 +        return maybePullSchema(info);
 +    }
 +
 +    public int getInflightTasks()
 +    {
 +        return inflightTasks.get();
 +    }
 +
 +    /**
 +     * Wait until we've received schema responses for all versions we're aware of
 +     * @param waitMillis
 +     * @return true if response for all schemas were received, false if we timed out waiting
 +     */
 +    public boolean awaitSchemaRequests(long waitMillis)
 +    {
 +        if (!FBUtilities.getBroadcastAddressAndPort().equals(InetAddressAndPort.getLoopbackAddress()))
 +            Gossiper.waitToSettle();
 +
 +        WaitQueue.Signal signal = null;
 +        try
 +        {
 +            synchronized (this)
 +            {
 +                List<WaitQueue.Signal> signalList = new ArrayList<>(versionInfo.size());
 +                for (VersionInfo version : versionInfo.values())
 +                {
 +                    if (version.wasReceived())
 +                        continue;
 +
 +                    signalList.add(version.register());
 +                }
 +
 +                if (signalList.isEmpty())
 +                    return true;
 +
 +                WaitQueue.Signal[] signals = new WaitQueue.Signal[signalList.size()];
 +                signalList.toArray(signals);
 +                signal = WaitQueue.all(signals);
 +            }
 +
 +            return signal.awaitUntil(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(waitMillis));
 +        }
 +        catch (InterruptedException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +        finally
 +        {
 +            if (signal != null)
 +                signal.cancel();
 +        }
 +    }
 +}
diff --cc test/unit/org/apache/cassandra/schema/MigrationCoordinatorTest.java
index 9cc8c94,0000000..acd45b9
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/schema/MigrationCoordinatorTest.java
+++ b/test/unit/org/apache/cassandra/schema/MigrationCoordinatorTest.java
@@@ -1,339 -1,0 +1,339 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.schema;
 +
 +import java.net.UnknownHostException;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.LinkedList;
 +import java.util.List;
 +import java.util.Queue;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.concurrent.Future;
 +
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Sets;
 +import com.google.common.util.concurrent.Futures;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.utils.concurrent.WaitQueue;
 +
 +import static com.google.common.util.concurrent.Futures.getUnchecked;
 +
 +public class MigrationCoordinatorTest
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(MigrationCoordinatorTest.class);
 +
 +    private static final InetAddressAndPort EP1;
 +    private static final InetAddressAndPort EP2;
 +    private static final InetAddressAndPort EP3;
 +
 +    private static final UUID LOCAL_VERSION = UUID.randomUUID();
 +    private static final UUID V1 = UUID.randomUUID();
 +    private static final UUID V2 = UUID.randomUUID();
 +
 +    static
 +    {
 +        try
 +        {
 +            EP1 = InetAddressAndPort.getByName("10.0.0.1");
 +            EP2 = InetAddressAndPort.getByName("10.0.0.2");
 +            EP3 = InetAddressAndPort.getByName("10.0.0.3");
 +        }
 +        catch (UnknownHostException e)
 +        {
 +            throw new AssertionError(e);
 +        }
 +
 +        DatabaseDescriptor.daemonInitialization();
 +    }
 +
 +    private static class InstrumentedCoordinator extends MigrationCoordinator
 +    {
 +
 +        Queue<Callback> requests = new LinkedList<>();
 +        @Override
 +        protected void sendMigrationMessage(MigrationCoordinator.Callback callback)
 +        {
 +            requests.add(callback);
 +        }
 +
 +        boolean shouldPullSchema = true;
 +        @Override
 +        protected boolean shouldPullSchema(UUID version)
 +        {
 +            return shouldPullSchema;
 +        }
 +
 +        boolean shouldPullFromEndpoint = true;
 +        @Override
 +        protected boolean shouldPullFromEndpoint(InetAddressAndPort endpoint)
 +        {
 +            return shouldPullFromEndpoint;
 +        }
 +
 +        boolean shouldPullImmediately = true;
 +        @Override
 +        protected boolean shouldPullImmediately(InetAddressAndPort endpoint, UUID version)
 +        {
 +            return shouldPullImmediately;
 +        }
 +
 +        Set<InetAddressAndPort> deadNodes = new HashSet<>();
 +        protected boolean isAlive(InetAddressAndPort endpoint)
 +        {
 +            return !deadNodes.contains(endpoint);
 +        }
 +
 +        UUID localVersion = LOCAL_VERSION;
 +        @Override
 +        protected boolean isLocalVersion(UUID version)
 +        {
 +            return localVersion.equals(version);
 +        }
 +
 +        int maxOutstandingRequests = 3;
 +        @Override
 +        protected int getMaxOutstandingVersionRequests()
 +        {
 +            return maxOutstandingRequests;
 +        }
 +
 +        Set<InetAddressAndPort> mergedSchemasFrom = new HashSet<>();
 +        @Override
 +        protected void mergeSchemaFrom(InetAddressAndPort endpoint, Collection<Mutation> mutations)
 +        {
 +            mergedSchemasFrom.add(endpoint);
 +        }
 +    }
 +
 +    @Test
 +    public void requestResponseCycle() throws InterruptedException
 +    {
 +        InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
 +        coordinator.maxOutstandingRequests = 1;
 +
 +        Assert.assertTrue(coordinator.requests.isEmpty());
 +
 +        // first schema report should send a migration request
 +        getUnchecked(coordinator.reportEndpointVersion(EP1, V1));
 +        Assert.assertEquals(1, coordinator.requests.size());
 +        Assert.assertFalse(coordinator.awaitSchemaRequests(1));
 +
 +        // second should not
 +        getUnchecked(coordinator.reportEndpointVersion(EP2, V1));
 +        Assert.assertEquals(1, coordinator.requests.size());
 +        Assert.assertFalse(coordinator.awaitSchemaRequests(1));
 +
 +        // until the first request fails, then the second endpoint should be contacted
 +        MigrationCoordinator.Callback request1 = coordinator.requests.poll();
 +        Assert.assertEquals(EP1, request1.endpoint);
 +        getUnchecked(request1.fail());
 +        Assert.assertTrue(coordinator.mergedSchemasFrom.isEmpty());
 +        Assert.assertFalse(coordinator.awaitSchemaRequests(1));
 +
 +        // ... then the second endpoint should be contacted
 +        Assert.assertEquals(1, coordinator.requests.size());
 +        MigrationCoordinator.Callback request2 = coordinator.requests.poll();
 +        Assert.assertEquals(EP2, request2.endpoint);
 +        Assert.assertFalse(coordinator.awaitSchemaRequests(1));
 +        getUnchecked(request2.response(Collections.emptyList()));
 +        Assert.assertEquals(EP2, Iterables.getOnlyElement(coordinator.mergedSchemasFrom));
 +        Assert.assertTrue(coordinator.awaitSchemaRequests(1));
 +
 +        // and migration tasks should not be sent out for subsequent version reports
 +        getUnchecked(coordinator.reportEndpointVersion(EP3, V1));
 +        Assert.assertTrue(coordinator.requests.isEmpty());
 +
 +    }
 +
 +    /**
 +     * If we don't send a request for a version, and endpoints associated with
 +     * it all change versions, we should signal anyone waiting on that version
 +     */
 +    @Test
 +    public void versionsAreSignaledWhenDeleted()
 +    {
 +        InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
 +
 +        coordinator.reportEndpointVersion(EP1, V1);
 +        WaitQueue.Signal signal = coordinator.getVersionInfoUnsafe(V1).register();
 +        Assert.assertFalse(signal.isSignalled());
 +
 +        coordinator.reportEndpointVersion(EP1, V2);
 +        Assert.assertNull(coordinator.getVersionInfoUnsafe(V1));
 +
 +        Assert.assertTrue(signal.isSignalled());
 +    }
 +
 +	/**
 +	 * If an endpoint is removed and no other endpoints are reporting its
 +	 * schema version, the version should be removed and we should signal
 +	 * anyone waiting on that version
 +	 */
 +	@Test
 +	public void versionsAreSignaledWhenEndpointsRemoved()
 +	{
 +		InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
 +
 +		coordinator.reportEndpointVersion(EP1, V1);
 +		WaitQueue.Signal signal = coordinator.getVersionInfoUnsafe(V1).register();
 +		Assert.assertFalse(signal.isSignalled());
 +
 +		coordinator.removeVersionInfoForEndpoint(EP1);
 +		Assert.assertNull(coordinator.getVersionInfoUnsafe(V1));
 +
 +		Assert.assertTrue(signal.isSignalled());
 +	}
 +
 +
-     private static void assertNoContact(InstrumentedCoordinator coordinator, InetAddress endpoint, UUID version, boolean startupShouldBeUnblocked)
++    private static void assertNoContact(InstrumentedCoordinator coordinator, InetAddressAndPort endpoint, UUID version, boolean startupShouldBeUnblocked)
 +    {
 +        Assert.assertTrue(coordinator.requests.isEmpty());
 +        Future<Void> future = coordinator.reportEndpointVersion(EP1, V1);
 +        if (future != null)
 +            getUnchecked(future);
 +        Assert.assertTrue(coordinator.requests.isEmpty());
 +
 +        Assert.assertEquals(startupShouldBeUnblocked, coordinator.awaitSchemaRequests(1));
 +    }
 +
 +    private static void assertNoContact(InstrumentedCoordinator coordinator, boolean startupShouldBeUnblocked)
 +    {
 +        assertNoContact(coordinator, EP1, V1, startupShouldBeUnblocked);
 +    }
 +
 +    @Test
 +    public void dontContactNodesWithSameSchema()
 +    {
 +        InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
 +
 +        coordinator.localVersion = V1;
 +        assertNoContact(coordinator, true);
 +    }
 +
 +    @Test
 +    public void dontContactIncompatibleNodes()
 +    {
 +        InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
 +
 +        coordinator.shouldPullFromEndpoint = false;
 +        assertNoContact(coordinator, false);
 +    }
 +
 +    @Test
 +    public void dontContactDeadNodes()
 +    {
 +        InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
 +
 +        coordinator.deadNodes.add(EP1);
 +        assertNoContact(coordinator, EP1, V1, false);
 +    }
 +
 +    /**
 +     * If a node has become incompativle between when the task was scheduled and when it
 +     * was run, we should detect that and fail the task
 +     */
 +    @Test
 +    public void testGossipRace()
 +    {
 +        InstrumentedCoordinator coordinator = new InstrumentedCoordinator() {
 +            protected boolean shouldPullImmediately(InetAddressAndPort endpoint, UUID version)
 +            {
 +                // this is the last thing that gets called before scheduling the pull, so set this flag here
 +                shouldPullFromEndpoint = false;
 +                return super.shouldPullImmediately(endpoint, version);
 +            }
 +        };
 +
 +        Assert.assertTrue(coordinator.shouldPullFromEndpoint(EP1));
 +        assertNoContact(coordinator, EP1, V1, false);
 +    }
 +
 +    @Test
 +    public void testWeKeepSendingRequests() throws Exception
 +    {
 +        InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
 +
 +        getUnchecked(coordinator.reportEndpointVersion(EP3, V2));
 +        coordinator.requests.remove().response(Collections.emptyList());
 +
 +        getUnchecked(coordinator.reportEndpointVersion(EP1, V1));
 +        getUnchecked(coordinator.reportEndpointVersion(EP2, V1));
 +
 +        MigrationCoordinator.Callback prev = null;
 +        Set<InetAddressAndPort> EPs = Sets.newHashSet(EP1, EP2);
 +        int ep1requests = 0;
 +        int ep2requests = 0;
 +
 +        for (int i=0; i<10; i++)
 +        {
 +            Assert.assertEquals(String.format("%s", i), 2, coordinator.requests.size());
 +
 +            MigrationCoordinator.Callback next = coordinator.requests.remove();
 +
 +            // we should be contacting endpoints in a round robin fashion
 +            Assert.assertTrue(EPs.contains(next.endpoint));
 +            if (prev != null && prev.endpoint.equals(next.endpoint))
 +                Assert.fail(String.format("Not expecting prev %s to be equal to next %s", prev.endpoint, next.endpoint));
 +
 +            // should send a new request
 +            next.fail().get();
 +            prev = next;
 +            Assert.assertFalse(coordinator.awaitSchemaRequests(1));
 +
 +            Assert.assertEquals(2, coordinator.requests.size());
 +        }
 +        logger.info("{} -> {}", EP1, ep1requests);
 +        logger.info("{} -> {}", EP2, ep2requests);
 +
 +        // a single success should unblock startup though
 +        coordinator.requests.remove().response(Collections.emptyList());
 +        Assert.assertTrue(coordinator.awaitSchemaRequests(1));
 +
 +    }
 +
 +    /**
 +     * Pull unreceived schemas should detect and send requests out for any
 +     * schemas that are marked unreceived and have no outstanding requests
 +     */
 +    @Test
 +    public void pullUnreceived()
 +    {
 +        InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
 +
 +        coordinator.shouldPullFromEndpoint = false;
 +        assertNoContact(coordinator, false);
 +
 +        coordinator.shouldPullFromEndpoint = true;
 +        Assert.assertEquals(0, coordinator.requests.size());
 +        List<Future<Void>> futures = coordinator.pullUnreceivedSchemaVersions();
 +        futures.forEach(Futures::getUnchecked);
 +        Assert.assertEquals(1, coordinator.requests.size());
 +    }
 +}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org