You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2020/11/09 20:25:33 UTC
[cassandra] 01/02: add schema coordinator
This is an automated email from the ASF dual-hosted git repository.
bdeggleston pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit a960d87e05f01000a758d8e7e5a58be57d13eb33
Author: Blake Eggleston <bd...@gmail.com>
AuthorDate: Mon May 18 23:02:51 2020 +0200
add schema coordinator
Co-authored-by: Stefan Miklosovic <st...@instaclustr.com>
---
src/java/org/apache/cassandra/config/Schema.java | 8 +
.../cassandra/service/MigrationCoordinator.java | 501 +++++++++++++++++++++
.../apache/cassandra/service/MigrationManager.java | 124 +----
.../apache/cassandra/service/MigrationTask.java | 116 -----
.../apache/cassandra/service/StorageService.java | 69 ++-
.../cassandra/service/StorageServiceMBean.java | 4 +
.../service/MigrationCoordinatorTest.java | 319 +++++++++++++
7 files changed, 895 insertions(+), 246 deletions(-)
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index 6d91d8d..2d50b32 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -574,6 +574,14 @@ public class Schema
}
/**
+ * Checks whether the current schema is empty.
+ */
+ public boolean isEmpty()
+ {
+ return emptyVersion.equals(version);
+ }
+
+ /**
* Read schema from system keyspace and calculate MD5 digest of every row, resulting digest
* will be converted into UUID which would act as content-based version of the schema.
*/
diff --git a/src/java/org/apache/cassandra/service/MigrationCoordinator.java b/src/java/org/apache/cassandra/service/MigrationCoordinator.java
new file mode 100644
index 0000000..18ffdbb
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/MigrationCoordinator.java
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.net.InetAddress;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.LocalAwareExecutorService;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
+
+public class MigrationCoordinator
+{
+ private static final Logger logger = LoggerFactory.getLogger(MigrationCoordinator.class);
+ private static final RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();
+ private static final Future<Void> FINISHED_FUTURE = Futures.immediateFuture(null);
+
+ private static final int MIGRATION_DELAY_IN_MS = 60000;
+ private static final int MAX_OUTSTANDING_VERSION_REQUESTS = 3;
+
+ public static final MigrationCoordinator instance = new MigrationCoordinator();
+
+ static class VersionInfo
+ {
+ final UUID version;
+
+ final Set<InetAddress> endpoints = Sets.newConcurrentHashSet();
+ final Set<InetAddress> outstandingRequests = Sets.newConcurrentHashSet();
+ final Deque<InetAddress> requestQueue = new ArrayDeque<>();
+
+ private final WaitQueue waitQueue = new WaitQueue();
+
+ volatile boolean receivedSchema;
+
+ VersionInfo(UUID version)
+ {
+ this.version = version;
+ }
+
+ WaitQueue.Signal register()
+ {
+ return waitQueue.register();
+ }
+
+ void markReceived()
+ {
+ if (receivedSchema)
+ return;
+
+ receivedSchema = true;
+ waitQueue.signalAll();
+ }
+
+ boolean wasReceived()
+ {
+ return receivedSchema;
+ }
+ }
+
+ private final Map<UUID, VersionInfo> versionInfo = new HashMap<>();
+ private final Map<InetAddress, UUID> endpointVersions = new HashMap<>();
+
+ public void start()
+ {
+ ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(this::pullUnreceivedSchemaVersions, 1, 1, TimeUnit.MINUTES);
+ }
+
+ public synchronized void reset()
+ {
+ versionInfo.clear();
+ }
+
+ synchronized List<Future<Void>> pullUnreceivedSchemaVersions()
+ {
+ List<Future<Void>> futures = new ArrayList<>();
+ for (VersionInfo info : versionInfo.values())
+ {
+ if (info.wasReceived() || info.outstandingRequests.size() > 0)
+ continue;
+
+ Future<Void> future = maybePullSchema(info);
+ if (future != null && future != FINISHED_FUTURE)
+ futures.add(future);
+ }
+
+ return futures;
+ }
+
+ synchronized Future<Void> maybePullSchema(VersionInfo info)
+ {
+ if (info.endpoints.isEmpty() || info.wasReceived() || !shouldPullSchema(info.version))
+ return FINISHED_FUTURE;
+
+ if (info.outstandingRequests.size() >= getMaxOutstandingVersionRequests())
+ return FINISHED_FUTURE;
+
+ for (int i=0, isize=info.requestQueue.size(); i<isize; i++)
+ {
+ InetAddress endpoint = info.requestQueue.remove();
+ if (!info.endpoints.contains(endpoint))
+ continue;
+
+ if (shouldPullFromEndpoint(endpoint) && info.outstandingRequests.add(endpoint))
+ {
+ return scheduleSchemaPull(endpoint, info);
+ }
+ else
+ {
+ // return to queue
+ info.requestQueue.offer(endpoint);
+ }
+ }
+
+ // no suitable endpoints were found, check again in a minute, the periodic task will pick it up
+ return null;
+ }
+
+ public synchronized Map<UUID, Set<InetAddress>> outstandingVersions()
+ {
+ HashMap<UUID, Set<InetAddress>> map = new HashMap<>();
+ for (VersionInfo info : versionInfo.values())
+ if (!info.wasReceived())
+ map.put(info.version, ImmutableSet.copyOf(info.endpoints));
+ return map;
+ }
+
+ @VisibleForTesting
+ protected VersionInfo getVersionInfoUnsafe(UUID version)
+ {
+ return versionInfo.get(version);
+ }
+
+ @VisibleForTesting
+ protected int getMaxOutstandingVersionRequests()
+ {
+ return MAX_OUTSTANDING_VERSION_REQUESTS;
+ }
+
+ @VisibleForTesting
+ protected boolean isAlive(InetAddress endpoint)
+ {
+ return FailureDetector.instance.isAlive(endpoint);
+ }
+
+ @VisibleForTesting
+ protected boolean shouldPullSchema(UUID version)
+ {
+ if (Schema.instance.getVersion() == null)
+ {
+ logger.debug("Not pulling schema for version {}, because local schama version is not known yet", version);
+ return false;
+ }
+
+ if (Schema.instance.getVersion().equals(version))
+ {
+ logger.debug("Not pulling schema for version {}, because schema versions match: " +
+ "local={}, remote={}",
+ version, Schema.instance.getVersion(),
+ version);
+ return false;
+ }
+ return true;
+ }
+
+ // Since 3.0.14 protocol contains only a CASSANDRA-13004 bugfix, it is safe to accept schema changes
+ // from both 3.0 and 3.0.14.
+ private static boolean is30Compatible(int version)
+ {
+ return version == MessagingService.current_version || version == MessagingService.VERSION_3014;
+ }
+
+ @VisibleForTesting
+ protected boolean shouldPullFromEndpoint(InetAddress endpoint)
+ {
+ if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+ return false;
+
+ EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+ if (state == null)
+ return false;
+
+ final String releaseVersion = state.getApplicationState(ApplicationState.RELEASE_VERSION).value;
+ final String ourMajorVersion = FBUtilities.getReleaseVersionMajor();
+
+ if (!releaseVersion.startsWith(ourMajorVersion))
+ {
+ logger.debug("Not pulling schema from {} because release version in Gossip is not major version {}, it is {}",
+ endpoint, ourMajorVersion, releaseVersion);
+ return false;
+ }
+
+ if (!MessagingService.instance().knowsVersion(endpoint))
+ {
+ logger.debug("Not pulling schema from {} because their messaging version is unknown", endpoint);
+ return false;
+ }
+
+ if (!is30Compatible(MessagingService.instance().getRawVersion(endpoint)))
+ {
+ logger.debug("Not pulling schema from {} because their schema format is incompatible", endpoint);
+ return false;
+ }
+
+ if (Gossiper.instance.isGossipOnlyMember(endpoint))
+ {
+ logger.debug("Not pulling schema from {} because it's a gossip only member", endpoint);
+ return false;
+ }
+ return true;
+ }
+
+ @VisibleForTesting
+ protected boolean shouldPullImmediately(InetAddress endpoint, UUID version)
+ {
+ if (Schema.instance.isEmpty() || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
+ {
+ // If we think we may be bootstrapping or have recently started, submit MigrationTask immediately
+ logger.debug("Immediately submitting migration task for {}, " +
+ "schema versions: local={}, remote={}",
+ endpoint,
+ Schema.instance.getVersion(),
+ version);
+ return true;
+ }
+ return false;
+ }
+
+ @VisibleForTesting
+ protected boolean isLocalVersion(UUID version)
+ {
+ return Schema.instance.getVersion().equals(version);
+ }
+
+ /**
+ * If a previous schema update brought our version the same as the incoming schema, don't apply it
+ */
+ synchronized boolean shouldApplySchemaFor(VersionInfo info)
+ {
+ if (info.wasReceived())
+ return false;
+ return !isLocalVersion(info.version);
+ }
+
+ synchronized Future<Void> reportEndpointVersion(InetAddress endpoint, UUID version)
+ {
+ UUID current = endpointVersions.put(endpoint, version);
+ if (current != null && current.equals(version))
+ return FINISHED_FUTURE;
+
+ VersionInfo info = versionInfo.computeIfAbsent(version, VersionInfo::new);
+ if (isLocalVersion(version))
+ info.markReceived();
+ info.endpoints.add(endpoint);
+ info.requestQueue.addFirst(endpoint);
+
+ // disassociate this endpoint from its (now) previous schema version
+ removeEndpointFromVersion(endpoint, current);
+
+ return maybePullSchema(info);
+ }
+
+ Future<Void> reportEndpointVersion(InetAddress endpoint, EndpointState state)
+ {
+ if (state == null)
+ return FINISHED_FUTURE;
+
+ VersionedValue version = state.getApplicationState(ApplicationState.SCHEMA);
+
+ if (version == null)
+ return FINISHED_FUTURE;
+
+ return reportEndpointVersion(endpoint, UUID.fromString(version.value));
+ }
+
+ private synchronized void removeEndpointFromVersion(InetAddress endpoint, UUID version)
+ {
+ if (version == null)
+ return;
+
+ VersionInfo info = versionInfo.get(version);
+
+ if (info == null)
+ return;
+
+ info.endpoints.remove(endpoint);
+ if (info.endpoints.isEmpty())
+ {
+ info.waitQueue.signalAll();
+ versionInfo.remove(version);
+ }
+ }
+
+ Future<Void> scheduleSchemaPull(InetAddress endpoint, VersionInfo info)
+ {
+ LocalAwareExecutorService stage = StageManager.getStage(Stage.MIGRATION);
+ FutureTask<Void> task = new FutureTask<>(() -> pullSchema(new Callback(endpoint, info)), null);
+ if (shouldPullImmediately(endpoint, info.version))
+ {
+ stage.submit(task);
+ }
+ else
+ {
+ ScheduledExecutors.nonPeriodicTasks.schedule(() -> stage.submit(task), MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);
+ }
+ return task;
+ }
+
+ @VisibleForTesting
+ protected void mergeSchemaFrom(InetAddress endpoint, Collection<Mutation> mutations)
+ {
+ SchemaKeyspace.mergeSchemaAndAnnounceVersion(mutations);
+ }
+
+ class Callback implements IAsyncCallbackWithFailure<Collection<Mutation>>
+ {
+ final InetAddress endpoint;
+ final VersionInfo info;
+
+ public Callback(InetAddress endpoint, VersionInfo info)
+ {
+ this.endpoint = endpoint;
+ this.info = info;
+ }
+
+ public void onFailure(InetAddress from)
+ {
+ fail();
+ }
+
+ Future<Void> fail()
+ {
+ return pullComplete(endpoint, info, false);
+ }
+
+ public void response(MessageIn<Collection<Mutation>> message)
+ {
+ response(message.payload);
+ }
+
+ Future<Void> response(Collection<Mutation> mutations)
+ {
+ synchronized (info)
+ {
+ if (shouldApplySchemaFor(info))
+ {
+ try
+ {
+ mergeSchemaFrom(endpoint, mutations);
+ }
+ catch (Exception e)
+ {
+ logger.error(String.format("Unable to merge schema from %s", endpoint), e);
+ return fail();
+ }
+ }
+ return pullComplete(endpoint, info, true);
+ }
+ }
+
+ public boolean isLatencyForSnitch()
+ {
+ return false;
+ }
+ }
+
+ private void pullSchema(Callback callback)
+ {
+ if (!isAlive(callback.endpoint))
+ {
+ logger.warn("Can't send schema pull request: node {} is down.", callback.endpoint);
+ callback.fail();
+ return;
+ }
+
+ // There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(),
+ // potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with
+ // a higher major.
+ if (!shouldPullFromEndpoint(callback.endpoint))
+ {
+ logger.info("Skipped sending a migration request: node {} has a higher major version now.", callback.endpoint);
+ callback.fail();
+ return;
+ }
+
+ logger.debug("Requesting schema from {}", callback.endpoint);
+ sendMigrationMessage(callback);
+ }
+
+ protected void sendMigrationMessage(Callback callback)
+ {
+ MessageOut<?> message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);
+ logger.info("Sending schema pull request to {} at {} with timeout {}", callback.endpoint, System.currentTimeMillis(), message.getTimeout());
+ MessagingService.instance().sendRR(message, callback.endpoint, callback, message.getTimeout(), true);
+ }
+
+ private synchronized Future<Void> pullComplete(InetAddress endpoint, VersionInfo info, boolean wasSuccessful)
+ {
+ if (wasSuccessful)
+ info.markReceived();
+
+ info.outstandingRequests.remove(endpoint);
+ info.requestQueue.add(endpoint);
+ return maybePullSchema(info);
+ }
+
+ /**
+ * Wait until we've received schema responses for all versions we're aware of
+ * @param waitMillis
+ * @return true if response for all schemas were received, false if we timed out waiting
+ */
+ public boolean awaitSchemaRequests(long waitMillis)
+ {
+ if (!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress()))
+ CassandraDaemon.waitForGossipToSettle();
+
+ WaitQueue.Signal signal = null;
+ try
+ {
+ synchronized (this)
+ {
+ List<WaitQueue.Signal> signalList = new ArrayList<>(versionInfo.size());
+ for (VersionInfo version : versionInfo.values())
+ {
+ if (version.wasReceived())
+ continue;
+
+ signalList.add(version.register());
+ }
+
+ if (signalList.isEmpty())
+ return true;
+
+ WaitQueue.Signal[] signals = new WaitQueue.Signal[signalList.size()];
+ signalList.toArray(signals);
+ signal = WaitQueue.all(signals);
+ }
+
+ return signal.awaitUntil(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(waitMillis));
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ if (signal != null)
+ signal.cancel();
+ }
+ }
+}
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 26b1aed..2b0c834 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -21,12 +21,9 @@ import java.io.IOException;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.*;
-import java.lang.management.ManagementFactory;
-import java.lang.management.RuntimeMXBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
@@ -56,12 +53,6 @@ public class MigrationManager
public static final MigrationManager instance = new MigrationManager();
- private static final RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();
-
- public static final int MIGRATION_DELAY_IN_MS = 60000;
-
- private static final int MIGRATION_TASK_WAIT_IN_SECONDS = Integer.parseInt(System.getProperty("cassandra.migration_task_wait_in_seconds", "1"));
-
private final List<MigrationListener> listeners = new CopyOnWriteArrayList<>();
private MigrationManager() {}
@@ -76,86 +67,6 @@ public class MigrationManager
listeners.remove(listener);
}
- public static void scheduleSchemaPull(InetAddress endpoint, EndpointState state)
- {
- VersionedValue value = state.getApplicationState(ApplicationState.SCHEMA);
-
- if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && value != null)
- maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint, state.getApplicationState(ApplicationState.RELEASE_VERSION).value);
- }
-
- /**
- * If versions differ this node sends request with local migration list to the endpoint
- * and expecting to receive a list of migrations to apply locally.
- */
- private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint, String releaseVersion)
- {
- String ourMajorVersion = FBUtilities.getReleaseVersionMajor();
- if (!releaseVersion.startsWith(ourMajorVersion))
- {
- logger.debug("Not pulling schema because release version in Gossip is not major version {}, it is {}", ourMajorVersion, releaseVersion);
- return;
- }
-
- if ((Schema.instance.getVersion() != null && Schema.instance.getVersion().equals(theirVersion)) || !shouldPullSchemaFrom(endpoint))
- {
- logger.debug("Not pulling schema because versions match or shouldPullSchemaFrom returned false");
- return;
- }
-
- if (Schema.emptyVersion.equals(Schema.instance.getVersion()) || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
- {
- // If we think we may be bootstrapping or have recently started, submit MigrationTask immediately
- logger.debug("Submitting migration task for {}", endpoint);
- submitMigrationTask(endpoint);
- }
- else
- {
- // Include a delay to make sure we have a chance to apply any changes being
- // pushed out simultaneously. See CASSANDRA-5025
- Runnable runnable = () ->
- {
- // grab the latest version of the schema since it may have changed again since the initial scheduling
- EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
- if (epState == null)
- {
- logger.debug("epState vanished for {}, not submitting migration task", endpoint);
- return;
- }
- VersionedValue value = epState.getApplicationState(ApplicationState.SCHEMA);
- UUID currentVersion = UUID.fromString(value.value);
- if (Schema.instance.getVersion().equals(currentVersion))
- {
- logger.debug("not submitting migration task for {} because our versions match", endpoint);
- return;
- }
- logger.debug("submitting migration task for {}", endpoint);
- submitMigrationTask(endpoint);
- };
- ScheduledExecutors.nonPeriodicTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);
- }
- }
-
- private static Future<?> submitMigrationTask(InetAddress endpoint)
- {
- /*
- * Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are
- * running in the gossip stage.
- */
- return StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint));
- }
-
- public static boolean shouldPullSchemaFrom(InetAddress endpoint)
- {
- /*
- * Don't request schema from nodes with a differnt or unknonw major version (may have incompatible schema)
- * Don't request schema from fat clients
- */
- return MessagingService.instance().knowsVersion(endpoint)
- && is30Compatible(MessagingService.instance().getRawVersion(endpoint))
- && !Gossiper.instance.isGossipOnlyMember(endpoint);
- }
-
// Since 3.0.14 protocol contains only a CASSANDRA-13004 bugfix, it is safe to accept schema changes
// from both 3.0 and 3.0.14.
private static boolean is30Compatible(int version)
@@ -163,29 +74,6 @@ public class MigrationManager
return version == MessagingService.current_version || version == MessagingService.VERSION_3014;
}
- public static boolean isReadyForBootstrap()
- {
- return MigrationTask.getInflightTasks().isEmpty();
- }
-
- public static void waitUntilReadyForBootstrap()
- {
- CountDownLatch completionLatch;
- while ((completionLatch = MigrationTask.getInflightTasks().poll()) != null)
- {
- try
- {
- if (!completionLatch.await(MIGRATION_TASK_WAIT_IN_SECONDS, TimeUnit.SECONDS))
- logger.error("Migration task failed to complete");
- }
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- logger.error("Migration task was interrupted");
- }
- }
- }
-
public void notifyCreateKeyspace(KeyspaceMetadata ksm)
{
for (MigrationListener listener : listeners)
@@ -610,15 +498,15 @@ public class MigrationManager
Set<InetAddress> liveEndpoints = Gossiper.instance.getLiveMembers();
liveEndpoints.remove(FBUtilities.getBroadcastAddress());
+ MigrationCoordinator.instance.reset();
+
// force migration if there are nodes around
for (InetAddress node : liveEndpoints)
{
- if (shouldPullSchemaFrom(node))
- {
- logger.debug("Requesting schema from {}", node);
- FBUtilities.waitOnFuture(submitMigrationTask(node));
- break;
- }
+ EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(node);
+ Future<Void> pull = MigrationCoordinator.instance.reportEndpointVersion(node, state);
+ if (pull != null)
+ FBUtilities.waitOnFuture(pull);
}
logger.info("Local schema reset is complete.");
diff --git a/src/java/org/apache/cassandra/service/MigrationTask.java b/src/java/org/apache/cassandra/service/MigrationTask.java
deleted file mode 100644
index 6b04756..0000000
--- a/src/java/org/apache/cassandra/service/MigrationTask.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.Collection;
-import java.util.EnumSet;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.SystemKeyspace.BootstrapState;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.schema.SchemaKeyspace;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-
-class MigrationTask extends WrappedRunnable
-{
- private static final Logger logger = LoggerFactory.getLogger(MigrationTask.class);
-
- private static final ConcurrentLinkedQueue<CountDownLatch> inflightTasks = new ConcurrentLinkedQueue<>();
-
- private static final Set<BootstrapState> monitoringBootstrapStates = EnumSet.of(BootstrapState.NEEDS_BOOTSTRAP, BootstrapState.IN_PROGRESS);
-
- private final InetAddress endpoint;
-
- MigrationTask(InetAddress endpoint)
- {
- this.endpoint = endpoint;
- }
-
- public static ConcurrentLinkedQueue<CountDownLatch> getInflightTasks()
- {
- return inflightTasks;
- }
-
- public void runMayThrow() throws Exception
- {
- if (!FailureDetector.instance.isAlive(endpoint))
- {
- logger.warn("Can't send schema pull request: node {} is down.", endpoint);
- return;
- }
-
- // There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(),
- // potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with
- // a higher major.
- if (!MigrationManager.shouldPullSchemaFrom(endpoint))
- {
- logger.info("Skipped sending a migration request: node {} has a higher major version now.", endpoint);
- return;
- }
-
- MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);
-
- final CountDownLatch completionLatch = new CountDownLatch(1);
-
- IAsyncCallback<Collection<Mutation>> cb = new IAsyncCallback<Collection<Mutation>>()
- {
- @Override
- public void response(MessageIn<Collection<Mutation>> message)
- {
- try
- {
- SchemaKeyspace.mergeSchemaAndAnnounceVersion(message.payload);
- }
- catch (ConfigurationException e)
- {
- logger.error("Configuration exception merging remote schema", e);
- }
- finally
- {
- completionLatch.countDown();
- }
- }
-
- public boolean isLatencyForSnitch()
- {
- return false;
- }
- };
-
- // Only save the latches if we need bootstrap or are bootstrapping
- if (monitoringBootstrapStates.contains(SystemKeyspace.getBootstrapState()))
- inflightTasks.offer(completionLatch);
-
- MessagingService.instance().sendRR(message, endpoint, cb);
- }
-}
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index c4309f8..3718e8c 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -27,6 +27,7 @@ import java.util.Map.Entry;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.management.*;
import javax.management.openmbean.TabularData;
@@ -112,6 +113,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private static final Logger logger = LoggerFactory.getLogger(StorageService.class);
public static final int RING_DELAY = getRingDelay(); // delay after which we assume ring has stablized
+ public static final int SCHEMA_DELAY = getRingDelay(); // delay after which we assume ring has stablized
+
+ private static final boolean REQUIRE_SCHEMAS = !Boolean.getBoolean("cassandra.skip_schema_check");
private final JMXProgressSupport progressSupport = new JMXProgressSupport(this);
@@ -134,6 +138,20 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return 30 * 1000;
}
+ private static int getSchemaDelay()
+ {
+ String newdelay = System.getProperty("cassandra.schema_delay_ms");
+ if (newdelay != null)
+ {
+ logger.info("Overriding SCHEMA_DELAY to {}ms", newdelay);
+ return Integer.parseInt(newdelay);
+ }
+ else
+ {
+ return 30 * 1000;
+ }
+ }
+
/* This abstraction maintains the token/endpoint metadata information */
private TokenMetadata tokenMetadata = new TokenMetadata();
@@ -764,6 +782,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
@VisibleForTesting
public void prepareToJoin() throws ConfigurationException
{
+ MigrationCoordinator.instance.start();
if (!joined)
{
Map<ApplicationState, VersionedValue> appStates = new EnumMap<>(ApplicationState.class);
@@ -840,6 +859,34 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
}
+ public void waitForSchema(int delay)
+ {
+ // first sleep the delay to make sure we see all our peers
+ for (long i = 0; i < delay; i += 1000)
+ {
+ // if we see schema, we can proceed to the next check directly
+ if (!Schema.instance.isEmpty())
+ {
+ logger.debug("current schema version: {}", Schema.instance.getVersion());
+ break;
+ }
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+ }
+
+ boolean schemasReceived = MigrationCoordinator.instance.awaitSchemaRequests(TimeUnit.SECONDS.toMillis(SCHEMA_DELAY));
+
+ if (schemasReceived)
+ return;
+
+ logger.warn(String.format("There are nodes in the cluster with a different schema version than us we did not merged schemas from, " +
+ "our version : (%s), outstanding versions -> endpoints : %s",
+ Schema.instance.getVersion(),
+ MigrationCoordinator.instance.outstandingVersions()));
+
+ if (REQUIRE_SCHEMAS)
+ throw new RuntimeException("Didn't receive schemas for all known versions within the timeout");
+ }
+
@VisibleForTesting
public void joinTokenRing(int delay) throws ConfigurationException
{
@@ -887,14 +934,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
}
- // if our schema hasn't matched yet, wait until it has
- // we do this by waiting for all in-flight migration requests and responses to complete
- // (post CASSANDRA-1391 we don't expect this to be necessary very often, but it doesn't hurt to be careful)
- if (!MigrationManager.isReadyForBootstrap())
- {
- setMode(Mode.JOINING, "waiting for schema information to complete", true);
- MigrationManager.waitUntilReadyForBootstrap();
- }
+ waitForSchema(delay);
setMode(Mode.JOINING, "schema complete, ready to bootstrap", true);
setMode(Mode.JOINING, "waiting for pending range calculation", true);
PendingRangeCalculatorService.instance.blockUntilFinished();
@@ -1828,7 +1868,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
break;
case SCHEMA:
SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(value.value), executor);
- MigrationManager.instance.scheduleSchemaPull(endpoint, epState);
+ MigrationCoordinator.instance.reportEndpointVersion(endpoint, UUID.fromString(value.value));
break;
case HOST_ID:
SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(value.value), executor);
@@ -2589,13 +2629,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
onChange(endpoint, entry.getKey(), entry.getValue());
}
- MigrationManager.instance.scheduleSchemaPull(endpoint, epState);
+ MigrationCoordinator.instance.reportEndpointVersion(endpoint, epState);
}
public void onAlive(InetAddress endpoint, EndpointState state)
{
- MigrationManager.instance.scheduleSchemaPull(endpoint, state);
-
if (tokenMetadata.isMember(endpoint))
notifyUp(endpoint);
}
@@ -4901,4 +4939,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
Runtime.getRuntime().removeShutdownHook(drainOnShutdown);
}
}
+
+ @Override
+ public Map<String, Set<InetAddress>> getOutstandingSchemaVersions()
+ {
+ Map<UUID, Set<InetAddress>> outstanding = MigrationCoordinator.instance.outstandingVersions();
+ return outstanding.entrySet().stream().collect(Collectors.toMap(e -> e.getKey().toString(), Entry::getValue));
+ }
}
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 1afa48e..dd534cb 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
@@ -634,4 +635,7 @@ public interface StorageServiceMBean extends NotificationEmitter
/** Returns the max version that this node will negotiate for native protocol connections */
public int getMaxNativeProtocolVersion();
+
+ /** Returns a map of schema version -> list of endpoints reporting that version that we need schema updates for */
+ public Map<String, Set<InetAddress>> getOutstandingSchemaVersions();
}
diff --git a/test/unit/org/apache/cassandra/service/MigrationCoordinatorTest.java b/test/unit/org/apache/cassandra/service/MigrationCoordinatorTest.java
new file mode 100644
index 0000000..070537d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/MigrationCoordinatorTest.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Future;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
+
+import static com.google.common.util.concurrent.Futures.getUnchecked;
+
+public class MigrationCoordinatorTest
+{
+ private static final Logger logger = LoggerFactory.getLogger(MigrationCoordinatorTest.class);
+
+ private static final InetAddress EP1;
+ private static final InetAddress EP2;
+ private static final InetAddress EP3;
+
+ private static final UUID LOCAL_VERSION = UUID.randomUUID();
+ private static final UUID V1 = UUID.randomUUID();
+ private static final UUID V2 = UUID.randomUUID();
+ private static final UUID V3 = UUID.randomUUID();
+
+ static
+ {
+ try
+ {
+ EP1 = InetAddress.getByName("10.0.0.1");
+ EP2 = InetAddress.getByName("10.0.0.2");
+ EP3 = InetAddress.getByName("10.0.0.3");
+ }
+ catch (UnknownHostException e)
+ {
+ throw new AssertionError(e);
+ }
+
+ DatabaseDescriptor.forceStaticInitialization();
+ }
+
+ private static class InstrumentedCoordinator extends MigrationCoordinator
+ {
+
+ Queue<Callback> requests = new LinkedList<>();
+ @Override
+ protected void sendMigrationMessage(MigrationCoordinator.Callback callback)
+ {
+ requests.add(callback);
+ }
+
+ boolean shouldPullSchema = true;
+ @Override
+ protected boolean shouldPullSchema(UUID version)
+ {
+ return shouldPullSchema;
+ }
+
+ boolean shouldPullFromEndpoint = true;
+ @Override
+ protected boolean shouldPullFromEndpoint(InetAddress endpoint)
+ {
+ return shouldPullFromEndpoint;
+ }
+
+ boolean shouldPullImmediately = true;
+ @Override
+ protected boolean shouldPullImmediately(InetAddress endpoint, UUID version)
+ {
+ return shouldPullImmediately;
+ }
+
+ Set<InetAddress> deadNodes = new HashSet<>();
+ protected boolean isAlive(InetAddress endpoint)
+ {
+ return !deadNodes.contains(endpoint);
+ }
+
+ UUID localVersion = LOCAL_VERSION;
+ @Override
+ protected boolean isLocalVersion(UUID version)
+ {
+ return localVersion.equals(version);
+ }
+
+ int maxOutstandingRequests = 3;
+ @Override
+ protected int getMaxOutstandingVersionRequests()
+ {
+ return maxOutstandingRequests;
+ }
+
+ Set<InetAddress> mergedSchemasFrom = new HashSet<>();
+ @Override
+ protected void mergeSchemaFrom(InetAddress endpoint, Collection<Mutation> mutations)
+ {
+ mergedSchemasFrom.add(endpoint);
+ }
+ }
+
+ @Test
+ public void requestResponseCycle() throws InterruptedException
+ {
+ InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+ coordinator.maxOutstandingRequests = 1;
+
+ Assert.assertTrue(coordinator.requests.isEmpty());
+
+ // first schema report should send a migration request
+ getUnchecked(coordinator.reportEndpointVersion(EP1, V1));
+ Assert.assertEquals(1, coordinator.requests.size());
+ Assert.assertFalse(coordinator.awaitSchemaRequests(1));
+
+ // second should not
+ getUnchecked(coordinator.reportEndpointVersion(EP2, V1));
+ Assert.assertEquals(1, coordinator.requests.size());
+ Assert.assertFalse(coordinator.awaitSchemaRequests(1));
+
+ // until the first request fails, then the second endpoint should be contacted
+ MigrationCoordinator.Callback request1 = coordinator.requests.poll();
+ Assert.assertEquals(EP1, request1.endpoint);
+ getUnchecked(request1.fail());
+ Assert.assertTrue(coordinator.mergedSchemasFrom.isEmpty());
+ Assert.assertFalse(coordinator.awaitSchemaRequests(1));
+
+ // ... then the second endpoint should be contacted
+ Assert.assertEquals(1, coordinator.requests.size());
+ MigrationCoordinator.Callback request2 = coordinator.requests.poll();
+ Assert.assertEquals(EP2, request2.endpoint);
+ Assert.assertFalse(coordinator.awaitSchemaRequests(1));
+ getUnchecked(request2.response(Collections.emptyList()));
+ Assert.assertEquals(EP2, Iterables.getOnlyElement(coordinator.mergedSchemasFrom));
+ Assert.assertTrue(coordinator.awaitSchemaRequests(1));
+
+ // and migration tasks should not be sent out for subsequent version reports
+ getUnchecked(coordinator.reportEndpointVersion(EP3, V1));
+ Assert.assertTrue(coordinator.requests.isEmpty());
+
+ }
+
+ /**
+ * If we don't send a request for a version, and endpoints associated with
+ * it all change versions, we should signal anyone waiting on that version
+ */
+ @Test
+ public void versionsAreSignaledWhenDeleted()
+ {
+ InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+
+ coordinator.reportEndpointVersion(EP1, V1);
+ WaitQueue.Signal signal = coordinator.getVersionInfoUnsafe(V1).register();
+ Assert.assertFalse(signal.isSignalled());
+
+ coordinator.reportEndpointVersion(EP1, V2);
+ Assert.assertNull(coordinator.getVersionInfoUnsafe(V1));
+
+ Assert.assertTrue(signal.isSignalled());
+ }
+
+ private static void assertNoContact(InstrumentedCoordinator coordinator, InetAddress endpoint, UUID version, boolean startupShouldBeUnblocked)
+ {
+ Assert.assertTrue(coordinator.requests.isEmpty());
+ Future<Void> future = coordinator.reportEndpointVersion(EP1, V1);
+ if (future != null)
+ getUnchecked(future);
+ Assert.assertTrue(coordinator.requests.isEmpty());
+
+ Assert.assertEquals(startupShouldBeUnblocked, coordinator.awaitSchemaRequests(1));
+ }
+
+ private static void assertNoContact(InstrumentedCoordinator coordinator, boolean startupShouldBeUnblocked)
+ {
+ assertNoContact(coordinator, EP1, V1, startupShouldBeUnblocked);
+ }
+
+ @Test
+ public void dontContactNodesWithSameSchema()
+ {
+ InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+
+ coordinator.localVersion = V1;
+ assertNoContact(coordinator, true);
+ }
+
+ @Test
+ public void dontContactIncompatibleNodes()
+ {
+ InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+
+ coordinator.shouldPullFromEndpoint = false;
+ assertNoContact(coordinator, false);
+ }
+
+ @Test
+ public void dontContactDeadNodes()
+ {
+ InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+
+ coordinator.deadNodes.add(EP1);
+ assertNoContact(coordinator, EP1, V1, false);
+ }
+
+ /**
+ * If a node has become incompativle between when the task was scheduled and when it
+ * was run, we should detect that and fail the task
+ */
+ @Test
+ public void testGossipRace()
+ {
+ InstrumentedCoordinator coordinator = new InstrumentedCoordinator() {
+ protected boolean shouldPullImmediately(InetAddress endpoint, UUID version)
+ {
+ // this is the last thing that gets called before scheduling the pull, so set this flag here
+ shouldPullFromEndpoint = false;
+ return super.shouldPullImmediately(endpoint, version);
+ }
+ };
+
+ Assert.assertTrue(coordinator.shouldPullFromEndpoint(EP1));
+ assertNoContact(coordinator, EP1, V1, false);
+ }
+
+ @Test
+ public void testWeKeepSendingRequests()
+ {
+ InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+
+ getUnchecked(coordinator.reportEndpointVersion(EP3, V2));
+ coordinator.requests.remove().response(Collections.emptyList());
+
+ getUnchecked(coordinator.reportEndpointVersion(EP1, V1));
+ getUnchecked(coordinator.reportEndpointVersion(EP2, V1));
+
+ MigrationCoordinator.Callback prev = null;
+ Set<InetAddress> EPs = Sets.newHashSet(EP1, EP2);
+ int ep1requests = 0;
+ int ep2requests = 0;
+
+ for (int i=0; i<10; i++)
+ {
+ Assert.assertEquals(String.format("%s", i), 2, coordinator.requests.size());
+
+ MigrationCoordinator.Callback next = coordinator.requests.remove();
+
+ // we should be contacting endpoints in a round robin fashion
+ Assert.assertTrue(EPs.contains(next.endpoint));
+ if (prev != null && prev.endpoint.equals(next.endpoint))
+ Assert.fail(String.format("Not expecting prev %s to be equal to next %s", prev.endpoint, next.endpoint));
+
+ // should send a new request
+ next.fail();
+ prev = next;
+ Assert.assertFalse(coordinator.awaitSchemaRequests(1));
+
+ Assert.assertEquals(2, coordinator.requests.size());
+ }
+ logger.info("{} -> {}", EP1, ep1requests);
+ logger.info("{} -> {}", EP2, ep2requests);
+
+ // a single success should unblock startup though
+ coordinator.requests.remove().response(Collections.emptyList());
+ Assert.assertTrue(coordinator.awaitSchemaRequests(1));
+
+ }
+
+ /**
+ * Pull unreceived schemas should detect and send requests out for any
+ * schemas that are marked unreceived and have no outstanding requests
+ */
+ @Test
+ public void pullUnreceived()
+ {
+ InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+
+ coordinator.shouldPullFromEndpoint = false;
+ assertNoContact(coordinator, false);
+
+ coordinator.shouldPullFromEndpoint = true;
+ Assert.assertEquals(0, coordinator.requests.size());
+ List<Future<Void>> futures = coordinator.pullUnreceivedSchemaVersions();
+ futures.forEach(Futures::getUnchecked);
+ Assert.assertEquals(1, coordinator.requests.size());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org