You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2020/11/09 20:25:35 UTC

[cassandra] branch cassandra-3.11 updated (94f940c -> fe70155)

This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a change to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 94f940c  Merge branch 'cassandra-3.0' into cassandra-3.11
     new a960d87  add schema coordinator
     new 0845008  Wait for schema agreement when bootstrapping
     new fe70155  Merge branch 'cassandra-3.0' into cassandra-3.11

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   1 +
 .../cassandra/service/MigrationCoordinator.java    | 506 +++++++++++++++++++++
 .../apache/cassandra/service/MigrationManager.java | 145 +-----
 .../apache/cassandra/service/MigrationTask.java    | 115 -----
 .../apache/cassandra/service/StorageService.java   |  64 ++-
 .../cassandra/service/StorageServiceMBean.java     |   4 +
 .../service/MigrationCoordinatorTest.java          | 319 +++++++++++++
 7 files changed, 883 insertions(+), 271 deletions(-)
 create mode 100644 src/java/org/apache/cassandra/service/MigrationCoordinator.java
 delete mode 100644 src/java/org/apache/cassandra/service/MigrationTask.java
 create mode 100644 test/unit/org/apache/cassandra/service/MigrationCoordinatorTest.java


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by bd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit fe701556a1d5c3955f79f957f835406319bb97ed
Merge: 94f940c 0845008
Author: Blake Eggleston <bd...@gmail.com>
AuthorDate: Mon Nov 9 12:18:24 2020 -0800

    Merge branch 'cassandra-3.0' into cassandra-3.11

 CHANGES.txt                                        |   1 +
 .../cassandra/service/MigrationCoordinator.java    | 506 +++++++++++++++++++++
 .../apache/cassandra/service/MigrationManager.java | 145 +-----
 .../apache/cassandra/service/MigrationTask.java    | 115 -----
 .../apache/cassandra/service/StorageService.java   |  64 ++-
 .../cassandra/service/StorageServiceMBean.java     |   4 +
 .../service/MigrationCoordinatorTest.java          | 319 +++++++++++++
 7 files changed, 883 insertions(+), 271 deletions(-)

diff --cc CHANGES.txt
index e46731b,07ee9af..b81b0c8
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -3,14 -3,7 +3,15 @@@ Merged from 3.0
   * Fix invalid cell value skipping when reading from disk (CASSANDRA-16223)
   * Prevent invoking enable/disable gossip when not in NORMAL (CASSANDRA-16146)
  
 -3.0.23:
 +3.11.9
 + * Synchronize Keyspace instance store/clear (CASSANDRA-16210)
 + * Fix ColumnFilter to avoid querying cells of unselected complex columns (CASSANDRA-15977)
 + * Fix memory leak in CompressedChunkReader (CASSANDRA-15880)
 + * Don't attempt value skipping with mixed version cluster (CASSANDRA-15833)
 + * Avoid failing compactions with very large partitions (CASSANDRA-15164)
 + * Make sure LCS handles duplicate sstable added/removed notifications correctly (CASSANDRA-14103)
 +Merged from 3.0:
++ * Wait for schema agreement when bootstrapping (CASSANDRA-15158)
   * Fix OOM when terminating repair session (CASSANDRA-15902)
   * Avoid marking shutting down nodes as up after receiving gossip shutdown message (CASSANDRA-16094)
   * Check SSTables for latest version before dropping compact storage (CASSANDRA-16063)
diff --cc src/java/org/apache/cassandra/service/MigrationCoordinator.java
index 0000000,18ffdbb..3730f62
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/service/MigrationCoordinator.java
+++ b/src/java/org/apache/cassandra/service/MigrationCoordinator.java
@@@ -1,0 -1,501 +1,506 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.service;
+ 
+ import java.lang.management.ManagementFactory;
+ import java.lang.management.RuntimeMXBean;
+ import java.net.InetAddress;
+ import java.util.ArrayDeque;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Deque;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.UUID;
+ import java.util.concurrent.Future;
+ import java.util.concurrent.FutureTask;
+ import java.util.concurrent.TimeUnit;
+ 
+ import com.google.common.annotations.VisibleForTesting;
+ import com.google.common.collect.ImmutableSet;
+ import com.google.common.collect.Sets;
+ import com.google.common.util.concurrent.Futures;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import org.apache.cassandra.concurrent.LocalAwareExecutorService;
+ import org.apache.cassandra.concurrent.ScheduledExecutors;
+ import org.apache.cassandra.concurrent.Stage;
+ import org.apache.cassandra.concurrent.StageManager;
+ import org.apache.cassandra.config.Schema;
+ import org.apache.cassandra.db.Mutation;
++import org.apache.cassandra.exceptions.RequestFailureReason;
+ import org.apache.cassandra.gms.ApplicationState;
+ import org.apache.cassandra.gms.EndpointState;
+ import org.apache.cassandra.gms.FailureDetector;
+ import org.apache.cassandra.gms.Gossiper;
++import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+ import org.apache.cassandra.gms.VersionedValue;
+ import org.apache.cassandra.net.IAsyncCallbackWithFailure;
+ import org.apache.cassandra.net.MessageIn;
+ import org.apache.cassandra.net.MessageOut;
+ import org.apache.cassandra.net.MessagingService;
+ import org.apache.cassandra.schema.SchemaKeyspace;
+ import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.concurrent.WaitQueue;
+ 
+ public class MigrationCoordinator
+ {
+     private static final Logger logger = LoggerFactory.getLogger(MigrationCoordinator.class);
+     private static final RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();
+     private static final Future<Void> FINISHED_FUTURE = Futures.immediateFuture(null);
+ 
+     private static final int MIGRATION_DELAY_IN_MS = 60000;
+     private static final int MAX_OUTSTANDING_VERSION_REQUESTS = 3;
+ 
+     public static final MigrationCoordinator instance = new MigrationCoordinator();
+ 
+     static class VersionInfo
+     {
+         final UUID version;
+ 
+         final Set<InetAddress> endpoints           = Sets.newConcurrentHashSet();
+         final Set<InetAddress> outstandingRequests = Sets.newConcurrentHashSet();
+         final Deque<InetAddress> requestQueue      = new ArrayDeque<>();
+ 
+         private final WaitQueue waitQueue = new WaitQueue();
+ 
+         volatile boolean receivedSchema;
+ 
+         VersionInfo(UUID version)
+         {
+             this.version = version;
+         }
+ 
+         WaitQueue.Signal register()
+         {
+             return waitQueue.register();
+         }
+ 
+         void markReceived()
+         {
+             if (receivedSchema)
+                 return;
+ 
+             receivedSchema = true;
+             waitQueue.signalAll();
+         }
+ 
+         boolean wasReceived()
+         {
+             return receivedSchema;
+         }
+     }
+ 
+     private final Map<UUID, VersionInfo> versionInfo = new HashMap<>();
+     private final Map<InetAddress, UUID> endpointVersions = new HashMap<>();
+ 
+     public void start()
+     {
+         ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(this::pullUnreceivedSchemaVersions, 1, 1, TimeUnit.MINUTES);
+     }
+ 
+     public synchronized void reset()
+     {
+         versionInfo.clear();
+     }
+ 
+     synchronized List<Future<Void>> pullUnreceivedSchemaVersions()
+     {
+         List<Future<Void>> futures = new ArrayList<>();
+         for (VersionInfo info : versionInfo.values())
+         {
+             if (info.wasReceived() || info.outstandingRequests.size() > 0)
+                 continue;
+ 
+             Future<Void> future = maybePullSchema(info);
+             if (future != null && future != FINISHED_FUTURE)
+                 futures.add(future);
+         }
+ 
+         return futures;
+     }
+ 
+     synchronized Future<Void> maybePullSchema(VersionInfo info)
+     {
+         if (info.endpoints.isEmpty() || info.wasReceived() || !shouldPullSchema(info.version))
+             return FINISHED_FUTURE;
+ 
+         if (info.outstandingRequests.size() >= getMaxOutstandingVersionRequests())
+             return FINISHED_FUTURE;
+ 
+         for (int i=0, isize=info.requestQueue.size(); i<isize; i++)
+         {
+             InetAddress endpoint = info.requestQueue.remove();
+             if (!info.endpoints.contains(endpoint))
+                 continue;
+ 
+             if (shouldPullFromEndpoint(endpoint) && info.outstandingRequests.add(endpoint))
+             {
+                 return scheduleSchemaPull(endpoint, info);
+             }
+             else
+             {
+                 // return to queue
+                 info.requestQueue.offer(endpoint);
+             }
+         }
+ 
+         // no suitable endpoints were found, check again in a minute, the periodic task will pick it up
+         return null;
+     }
+ 
+     public synchronized Map<UUID, Set<InetAddress>> outstandingVersions()
+     {
+         HashMap<UUID, Set<InetAddress>> map = new HashMap<>();
+         for (VersionInfo info : versionInfo.values())
+             if (!info.wasReceived())
+                 map.put(info.version, ImmutableSet.copyOf(info.endpoints));
+         return map;
+     }
+ 
+     @VisibleForTesting
+     protected VersionInfo getVersionInfoUnsafe(UUID version)
+     {
+         return versionInfo.get(version);
+     }
+ 
+     @VisibleForTesting
+     protected int getMaxOutstandingVersionRequests()
+     {
+         return MAX_OUTSTANDING_VERSION_REQUESTS;
+     }
+ 
+     @VisibleForTesting
+     protected boolean isAlive(InetAddress endpoint)
+     {
+         return FailureDetector.instance.isAlive(endpoint);
+     }
+ 
+     @VisibleForTesting
+     protected boolean shouldPullSchema(UUID version)
+     {
+         if (Schema.instance.getVersion() == null)
+         {
+             logger.debug("Not pulling schema for version {}, because local schama version is not known yet", version);
+             return false;
+         }
+ 
 -        if (Schema.instance.getVersion().equals(version))
++        if (Schema.instance.isSameVersion(version))
+         {
+             logger.debug("Not pulling schema for version {}, because schema versions match: " +
 -                         "local={}, remote={}",
 -                         version, Schema.instance.getVersion(),
 -                         version);
++                         "local/real={}, local/compatible={}, remote={}",
++                         version,
++                         Schema.schemaVersionToString(Schema.instance.getRealVersion()),
++                         Schema.schemaVersionToString(Schema.instance.getAltVersion()),
++                         Schema.schemaVersionToString(version));
+             return false;
+         }
+         return true;
+     }
+ 
+     // Since 3.0.14 protocol contains only a CASSANDRA-13004 bugfix, it is safe to accept schema changes
+     // from both 3.0 and 3.0.14.
+     private static boolean is30Compatible(int version)
+     {
+         return version == MessagingService.current_version || version == MessagingService.VERSION_3014;
+     }
+ 
+     @VisibleForTesting
+     protected boolean shouldPullFromEndpoint(InetAddress endpoint)
+     {
+         if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+             return false;
+ 
+         EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+         if (state == null)
+             return false;
+ 
+         final String releaseVersion = state.getApplicationState(ApplicationState.RELEASE_VERSION).value;
+         final String ourMajorVersion = FBUtilities.getReleaseVersionMajor();
+ 
+         if (!releaseVersion.startsWith(ourMajorVersion))
+         {
+             logger.debug("Not pulling schema from {} because release version in Gossip is not major version {}, it is {}",
+                          endpoint, ourMajorVersion, releaseVersion);
+             return false;
+         }
+ 
+         if (!MessagingService.instance().knowsVersion(endpoint))
+         {
+             logger.debug("Not pulling schema from {} because their messaging version is unknown", endpoint);
+             return false;
+         }
+ 
+         if (!is30Compatible(MessagingService.instance().getRawVersion(endpoint)))
+         {
+             logger.debug("Not pulling schema from {} because their schema format is incompatible", endpoint);
+             return false;
+         }
+ 
+         if (Gossiper.instance.isGossipOnlyMember(endpoint))
+         {
+             logger.debug("Not pulling schema from {} because it's a gossip only member", endpoint);
+             return false;
+         }
+         return true;
+     }
+ 
+     @VisibleForTesting
+     protected boolean shouldPullImmediately(InetAddress endpoint, UUID version)
+     {
+         if (Schema.instance.isEmpty() || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
+         {
+             // If we think we may be bootstrapping or have recently started, submit MigrationTask immediately
+             logger.debug("Immediately submitting migration task for {}, " +
 -                         "schema versions: local={}, remote={}",
++                         "schema versions: local/real={}, local/compatible={}, remote={}",
+                          endpoint,
 -                         Schema.instance.getVersion(),
 -                         version);
++                         Schema.schemaVersionToString(Schema.instance.getRealVersion()),
++                         Schema.schemaVersionToString(Schema.instance.getAltVersion()),
++                         Schema.schemaVersionToString(version));
+             return true;
+         }
+         return false;
+     }
+ 
+     @VisibleForTesting
+     protected boolean isLocalVersion(UUID version)
+     {
 -        return Schema.instance.getVersion().equals(version);
++        return Schema.instance.isSameVersion(version);
+     }
+ 
+     /**
+      * If a previous schema update brought our version the same as the incoming schema, don't apply it
+      */
+     synchronized boolean shouldApplySchemaFor(VersionInfo info)
+     {
+         if (info.wasReceived())
+             return false;
+         return !isLocalVersion(info.version);
+     }
+ 
+     synchronized Future<Void> reportEndpointVersion(InetAddress endpoint, UUID version)
+     {
+         UUID current = endpointVersions.put(endpoint, version);
+         if (current != null && current.equals(version))
+             return FINISHED_FUTURE;
+ 
+         VersionInfo info = versionInfo.computeIfAbsent(version, VersionInfo::new);
+         if (isLocalVersion(version))
+             info.markReceived();
+         info.endpoints.add(endpoint);
+         info.requestQueue.addFirst(endpoint);
+ 
+         // disassociate this endpoint from its (now) previous schema version
+         removeEndpointFromVersion(endpoint, current);
+ 
+         return maybePullSchema(info);
+     }
+ 
+     Future<Void> reportEndpointVersion(InetAddress endpoint, EndpointState state)
+     {
+         if (state == null)
+             return FINISHED_FUTURE;
+ 
 -        VersionedValue version = state.getApplicationState(ApplicationState.SCHEMA);
++        UUID version = state.getSchemaVersion();
+ 
+         if (version == null)
+             return FINISHED_FUTURE;
+ 
 -        return reportEndpointVersion(endpoint, UUID.fromString(version.value));
++        return reportEndpointVersion(endpoint, version);
+     }
+ 
+     private synchronized void removeEndpointFromVersion(InetAddress endpoint, UUID version)
+     {
+         if (version == null)
+             return;
+ 
+         VersionInfo info = versionInfo.get(version);
+ 
+         if (info == null)
+             return;
+ 
+         info.endpoints.remove(endpoint);
+         if (info.endpoints.isEmpty())
+         {
+             info.waitQueue.signalAll();
+             versionInfo.remove(version);
+         }
+     }
+ 
+     Future<Void> scheduleSchemaPull(InetAddress endpoint, VersionInfo info)
+     {
+         LocalAwareExecutorService stage = StageManager.getStage(Stage.MIGRATION);
+         FutureTask<Void> task = new FutureTask<>(() -> pullSchema(new Callback(endpoint, info)), null);
+         if (shouldPullImmediately(endpoint, info.version))
+         {
+             stage.submit(task);
+         }
+         else
+         {
+             ScheduledExecutors.nonPeriodicTasks.schedule(() -> stage.submit(task), MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);
+         }
+         return task;
+     }
+ 
+     @VisibleForTesting
+     protected void mergeSchemaFrom(InetAddress endpoint, Collection<Mutation> mutations)
+     {
+         SchemaKeyspace.mergeSchemaAndAnnounceVersion(mutations);
+     }
+ 
+     class Callback implements IAsyncCallbackWithFailure<Collection<Mutation>>
+     {
+         final InetAddress endpoint;
+         final VersionInfo info;
+ 
+         public Callback(InetAddress endpoint, VersionInfo info)
+         {
+             this.endpoint = endpoint;
+             this.info = info;
+         }
+ 
 -        public void onFailure(InetAddress from)
++        public void onFailure(InetAddress from, RequestFailureReason failureReason)
+         {
+             fail();
+         }
+ 
+         Future<Void> fail()
+         {
+             return pullComplete(endpoint, info, false);
+         }
+ 
+         public void response(MessageIn<Collection<Mutation>> message)
+         {
+             response(message.payload);
+         }
+ 
+         Future<Void> response(Collection<Mutation> mutations)
+         {
+             synchronized (info)
+             {
+                 if (shouldApplySchemaFor(info))
+                 {
+                     try
+                     {
+                         mergeSchemaFrom(endpoint, mutations);
+                     }
+                     catch (Exception e)
+                     {
+                         logger.error(String.format("Unable to merge schema from %s", endpoint), e);
+                         return fail();
+                     }
+                 }
+                 return pullComplete(endpoint, info, true);
+             }
+         }
+ 
+         public boolean isLatencyForSnitch()
+         {
+             return false;
+         }
+     }
+ 
+     private void pullSchema(Callback callback)
+     {
+         if (!isAlive(callback.endpoint))
+         {
+             logger.warn("Can't send schema pull request: node {} is down.", callback.endpoint);
+             callback.fail();
+             return;
+         }
+ 
+         // There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(),
+         // potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with
+         // a higher major.
+         if (!shouldPullFromEndpoint(callback.endpoint))
+         {
+             logger.info("Skipped sending a migration request: node {} has a higher major version now.", callback.endpoint);
+             callback.fail();
+             return;
+         }
+ 
+         logger.debug("Requesting schema from {}", callback.endpoint);
+         sendMigrationMessage(callback);
+     }
+ 
+     protected void sendMigrationMessage(Callback callback)
+     {
+         MessageOut<?> message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);
+         logger.info("Sending schema pull request to {} at {} with timeout {}", callback.endpoint, System.currentTimeMillis(), message.getTimeout());
+         MessagingService.instance().sendRR(message, callback.endpoint, callback, message.getTimeout(), true);
+     }
+ 
+     private synchronized Future<Void> pullComplete(InetAddress endpoint, VersionInfo info, boolean wasSuccessful)
+     {
+         if (wasSuccessful)
+             info.markReceived();
+ 
+         info.outstandingRequests.remove(endpoint);
+         info.requestQueue.add(endpoint);
+         return maybePullSchema(info);
+     }
+ 
+     /**
+      * Wait until we've received schema responses for all versions we're aware of
+      * @param waitMillis
+      * @return true if response for all schemas were received, false if we timed out waiting
+      */
+     public boolean awaitSchemaRequests(long waitMillis)
+     {
+         if (!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress()))
 -            CassandraDaemon.waitForGossipToSettle();
++            Gossiper.waitToSettle();
+ 
+         WaitQueue.Signal signal = null;
+         try
+         {
+             synchronized (this)
+             {
+                 List<WaitQueue.Signal> signalList = new ArrayList<>(versionInfo.size());
+                 for (VersionInfo version : versionInfo.values())
+                 {
+                     if (version.wasReceived())
+                         continue;
+ 
+                     signalList.add(version.register());
+                 }
+ 
+                 if (signalList.isEmpty())
+                     return true;
+ 
+                 WaitQueue.Signal[] signals = new WaitQueue.Signal[signalList.size()];
+                 signalList.toArray(signals);
+                 signal = WaitQueue.all(signals);
+             }
+ 
+             return signal.awaitUntil(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(waitMillis));
+         }
+         catch (InterruptedException e)
+         {
+             throw new RuntimeException(e);
+         }
+         finally
+         {
+             if (signal != null)
+                 signal.cancel();
+         }
+     }
+ }
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 7dea7a0,3718e8c..734f176
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -26,10 -27,7 +26,11 @@@ import java.util.Map.Entry
  import java.util.concurrent.*;
  import java.util.concurrent.atomic.AtomicBoolean;
  import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.regex.MatchResult;
 +import java.util.regex.Pattern;
+ import java.util.stream.Collectors;
 +import java.util.stream.StreamSupport;
 +
  import javax.annotation.Nullable;
  import javax.management.*;
  import javax.management.openmbean.TabularData;
@@@ -899,6 -859,6 +918,7 @@@ public class StorageService extends Not
          }
      }
  
++
      public void waitForSchema(int delay)
      {
          // first sleep the delay to make sure we see all our peers
@@@ -923,8 -888,8 +948,8 @@@
      }
  
      @VisibleForTesting
--    public void joinTokenRing(int delay) throws ConfigurationException
- {
++    private void joinTokenRing(int delay) throws ConfigurationException
+     {
          joined = true;
  
          // We bootstrap if we haven't successfully bootstrapped before, as long as we are not a seed.
diff --cc test/unit/org/apache/cassandra/service/MigrationCoordinatorTest.java
index 0000000,070537d..373f6d6
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/service/MigrationCoordinatorTest.java
+++ b/test/unit/org/apache/cassandra/service/MigrationCoordinatorTest.java
@@@ -1,0 -1,319 +1,319 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.service;
+ 
+ import java.net.InetAddress;
+ import java.net.UnknownHostException;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.LinkedList;
+ import java.util.List;
+ import java.util.Queue;
+ import java.util.Set;
+ import java.util.UUID;
+ import java.util.concurrent.Future;
+ 
+ import com.google.common.collect.Iterables;
+ import com.google.common.collect.Sets;
+ import com.google.common.util.concurrent.Futures;
+ import org.junit.Assert;
+ import org.junit.Test;
+ 
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.db.Mutation;
+ import org.apache.cassandra.utils.concurrent.WaitQueue;
+ 
+ import static com.google.common.util.concurrent.Futures.getUnchecked;
+ 
+ public class MigrationCoordinatorTest
+ {
+     private static final Logger logger = LoggerFactory.getLogger(MigrationCoordinatorTest.class);
+ 
+     private static final InetAddress EP1;
+     private static final InetAddress EP2;
+     private static final InetAddress EP3;
+ 
+     private static final UUID LOCAL_VERSION = UUID.randomUUID();
+     private static final UUID V1 = UUID.randomUUID();
+     private static final UUID V2 = UUID.randomUUID();
+     private static final UUID V3 = UUID.randomUUID();
+ 
+     static
+     {
+         try
+         {
+             EP1 = InetAddress.getByName("10.0.0.1");
+             EP2 = InetAddress.getByName("10.0.0.2");
+             EP3 = InetAddress.getByName("10.0.0.3");
+         }
+         catch (UnknownHostException e)
+         {
+             throw new AssertionError(e);
+         }
+ 
 -        DatabaseDescriptor.forceStaticInitialization();
++        DatabaseDescriptor.daemonInitialization();
+     }
+ 
+     private static class InstrumentedCoordinator extends MigrationCoordinator
+     {
+ 
+         Queue<Callback> requests = new LinkedList<>();
+         @Override
+         protected void sendMigrationMessage(MigrationCoordinator.Callback callback)
+         {
+             requests.add(callback);
+         }
+ 
+         boolean shouldPullSchema = true;
+         @Override
+         protected boolean shouldPullSchema(UUID version)
+         {
+             return shouldPullSchema;
+         }
+ 
+         boolean shouldPullFromEndpoint = true;
+         @Override
+         protected boolean shouldPullFromEndpoint(InetAddress endpoint)
+         {
+             return shouldPullFromEndpoint;
+         }
+ 
+         boolean shouldPullImmediately = true;
+         @Override
+         protected boolean shouldPullImmediately(InetAddress endpoint, UUID version)
+         {
+             return shouldPullImmediately;
+         }
+ 
+         Set<InetAddress> deadNodes = new HashSet<>();
+         protected boolean isAlive(InetAddress endpoint)
+         {
+             return !deadNodes.contains(endpoint);
+         }
+ 
+         UUID localVersion = LOCAL_VERSION;
+         @Override
+         protected boolean isLocalVersion(UUID version)
+         {
+             return localVersion.equals(version);
+         }
+ 
+         int maxOutstandingRequests = 3;
+         @Override
+         protected int getMaxOutstandingVersionRequests()
+         {
+             return maxOutstandingRequests;
+         }
+ 
+         Set<InetAddress> mergedSchemasFrom = new HashSet<>();
+         @Override
+         protected void mergeSchemaFrom(InetAddress endpoint, Collection<Mutation> mutations)
+         {
+             mergedSchemasFrom.add(endpoint);
+         }
+     }
+ 
+     @Test
+     public void requestResponseCycle() throws InterruptedException
+     {
+         InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+         coordinator.maxOutstandingRequests = 1;
+ 
+         Assert.assertTrue(coordinator.requests.isEmpty());
+ 
+         // first schema report should send a migration request
+         getUnchecked(coordinator.reportEndpointVersion(EP1, V1));
+         Assert.assertEquals(1, coordinator.requests.size());
+         Assert.assertFalse(coordinator.awaitSchemaRequests(1));
+ 
+         // second should not
+         getUnchecked(coordinator.reportEndpointVersion(EP2, V1));
+         Assert.assertEquals(1, coordinator.requests.size());
+         Assert.assertFalse(coordinator.awaitSchemaRequests(1));
+ 
+         // until the first request fails, then the second endpoint should be contacted
+         MigrationCoordinator.Callback request1 = coordinator.requests.poll();
+         Assert.assertEquals(EP1, request1.endpoint);
+         getUnchecked(request1.fail());
+         Assert.assertTrue(coordinator.mergedSchemasFrom.isEmpty());
+         Assert.assertFalse(coordinator.awaitSchemaRequests(1));
+ 
+         // ... then the second endpoint should be contacted
+         Assert.assertEquals(1, coordinator.requests.size());
+         MigrationCoordinator.Callback request2 = coordinator.requests.poll();
+         Assert.assertEquals(EP2, request2.endpoint);
+         Assert.assertFalse(coordinator.awaitSchemaRequests(1));
+         getUnchecked(request2.response(Collections.emptyList()));
+         Assert.assertEquals(EP2, Iterables.getOnlyElement(coordinator.mergedSchemasFrom));
+         Assert.assertTrue(coordinator.awaitSchemaRequests(1));
+ 
+         // and migration tasks should not be sent out for subsequent version reports
+         getUnchecked(coordinator.reportEndpointVersion(EP3, V1));
+         Assert.assertTrue(coordinator.requests.isEmpty());
+ 
+     }
+ 
+     /**
+      * If we don't send a request for a version, and endpoints associated with
+      * it all change versions, we should signal anyone waiting on that version
+      */
+     @Test
+     public void versionsAreSignaledWhenDeleted()
+     {
+         InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+ 
+         coordinator.reportEndpointVersion(EP1, V1);
+         WaitQueue.Signal signal = coordinator.getVersionInfoUnsafe(V1).register();
+         Assert.assertFalse(signal.isSignalled());
+ 
+         coordinator.reportEndpointVersion(EP1, V2);
+         Assert.assertNull(coordinator.getVersionInfoUnsafe(V1));
+ 
+         Assert.assertTrue(signal.isSignalled());
+     }
+ 
+     private static void assertNoContact(InstrumentedCoordinator coordinator, InetAddress endpoint, UUID version, boolean startupShouldBeUnblocked)
+     {
+         Assert.assertTrue(coordinator.requests.isEmpty());
+         Future<Void> future = coordinator.reportEndpointVersion(EP1, V1);
+         if (future != null)
+             getUnchecked(future);
+         Assert.assertTrue(coordinator.requests.isEmpty());
+ 
+         Assert.assertEquals(startupShouldBeUnblocked, coordinator.awaitSchemaRequests(1));
+     }
+ 
+     private static void assertNoContact(InstrumentedCoordinator coordinator, boolean startupShouldBeUnblocked)
+     {
+         assertNoContact(coordinator, EP1, V1, startupShouldBeUnblocked);
+     }
+ 
+     @Test
+     public void dontContactNodesWithSameSchema()
+     {
+         InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+ 
+         coordinator.localVersion = V1;
+         assertNoContact(coordinator, true);
+     }
+ 
+     @Test
+     public void dontContactIncompatibleNodes()
+     {
+         InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+ 
+         coordinator.shouldPullFromEndpoint = false;
+         assertNoContact(coordinator, false);
+     }
+ 
+     @Test
+     public void dontContactDeadNodes()
+     {
+         InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+ 
+         coordinator.deadNodes.add(EP1);
+         assertNoContact(coordinator, EP1, V1, false);
+     }
+ 
+     /**
+      * If a node has become incompativle between when the task was scheduled and when it
+      * was run, we should detect that and fail the task
+      */
+     @Test
+     public void testGossipRace()
+     {
+         InstrumentedCoordinator coordinator = new InstrumentedCoordinator() {
+             protected boolean shouldPullImmediately(InetAddress endpoint, UUID version)
+             {
+                 // this is the last thing that gets called before scheduling the pull, so set this flag here
+                 shouldPullFromEndpoint = false;
+                 return super.shouldPullImmediately(endpoint, version);
+             }
+         };
+ 
+         Assert.assertTrue(coordinator.shouldPullFromEndpoint(EP1));
+         assertNoContact(coordinator, EP1, V1, false);
+     }
+ 
+     @Test
+     public void testWeKeepSendingRequests()
+     {
+         InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+ 
+         getUnchecked(coordinator.reportEndpointVersion(EP3, V2));
+         coordinator.requests.remove().response(Collections.emptyList());
+ 
+         getUnchecked(coordinator.reportEndpointVersion(EP1, V1));
+         getUnchecked(coordinator.reportEndpointVersion(EP2, V1));
+ 
+         MigrationCoordinator.Callback prev = null;
+         Set<InetAddress> EPs = Sets.newHashSet(EP1, EP2);
+         int ep1requests = 0;
+         int ep2requests = 0;
+ 
+         for (int i=0; i<10; i++)
+         {
+             Assert.assertEquals(String.format("%s", i), 2, coordinator.requests.size());
+ 
+             MigrationCoordinator.Callback next = coordinator.requests.remove();
+ 
+             // we should be contacting endpoints in a round robin fashion
+             Assert.assertTrue(EPs.contains(next.endpoint));
+             if (prev != null && prev.endpoint.equals(next.endpoint))
+                 Assert.fail(String.format("Not expecting prev %s to be equal to next %s", prev.endpoint, next.endpoint));
+ 
+             // should send a new request
+             next.fail();
+             prev = next;
+             Assert.assertFalse(coordinator.awaitSchemaRequests(1));
+ 
+             Assert.assertEquals(2, coordinator.requests.size());
+         }
+         logger.info("{} -> {}", EP1, ep1requests);
+         logger.info("{} -> {}", EP2, ep2requests);
+ 
+         // a single success should unblock startup though
+         coordinator.requests.remove().response(Collections.emptyList());
+         Assert.assertTrue(coordinator.awaitSchemaRequests(1));
+ 
+     }
+ 
+     /**
+      * Pull unreceived schemas should detect and send requests out for any
+      * schemas that are marked unreceived and have no outstanding requests
+      */
+     @Test
+     public void pullUnreceived()
+     {
+         InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+ 
+         coordinator.shouldPullFromEndpoint = false;
+         assertNoContact(coordinator, false);
+ 
+         coordinator.shouldPullFromEndpoint = true;
+         Assert.assertEquals(0, coordinator.requests.size());
+         List<Future<Void>> futures = coordinator.pullUnreceivedSchemaVersions();
+         futures.forEach(Futures::getUnchecked);
+         Assert.assertEquals(1, coordinator.requests.size());
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org