You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2021/04/13 13:02:51 UTC
[cassandra] 02/02: Merge branch 'cassandra-3.11' into trunk
This is an automated email from the ASF dual-hosted git repository.
brandonwilliams pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit edffdf6dcc092780757e278d5df789653f20bf37
Merge: f7365cb db6e704
Author: Brandon Williams <br...@apache.org>
AuthorDate: Tue Apr 13 07:47:50 2021 -0500
Merge branch 'cassandra-3.11' into trunk
src/java/org/apache/cassandra/schema/MigrationCoordinator.java | 4 ++--
test/unit/org/apache/cassandra/schema/MigrationCoordinatorTest.java | 2 +-
2 files changed, 3 insertions(+), 3 deletions(-)
diff --cc src/java/org/apache/cassandra/schema/MigrationCoordinator.java
index 554f545,0000000..6b6640e
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/schema/MigrationCoordinator.java
+++ b/src/java/org/apache/cassandra/schema/MigrationCoordinator.java
@@@ -1,535 -1,0 +1,535 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.schema;
+
+import java.lang.management.ManagementFactory;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.LongSupplier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.NoPayload;
+import org.apache.cassandra.net.RequestCallback;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
+
+public class MigrationCoordinator
+{
+ private static final Logger logger = LoggerFactory.getLogger(MigrationCoordinator.class);
+ private static final Future<Void> FINISHED_FUTURE = Futures.immediateFuture(null);
+
+ private static LongSupplier getUptimeFn = () -> ManagementFactory.getRuntimeMXBean().getUptime();
+
+ @VisibleForTesting
+ public static void setUptimeFn(LongSupplier supplier)
+ {
+ getUptimeFn = supplier;
+ }
+
+
+ private static final int MIGRATION_DELAY_IN_MS = 60000;
+ private static final int MAX_OUTSTANDING_VERSION_REQUESTS = 3;
+
+ public static final MigrationCoordinator instance = new MigrationCoordinator();
+
+ static class VersionInfo
+ {
+ final UUID version;
+
+ final Set<InetAddressAndPort> endpoints = Sets.newConcurrentHashSet();
+ final Set<InetAddressAndPort> outstandingRequests = Sets.newConcurrentHashSet();
+ final Deque<InetAddressAndPort> requestQueue = new ArrayDeque<>();
+
+ private final WaitQueue waitQueue = new WaitQueue();
+
+ volatile boolean receivedSchema;
+
+ VersionInfo(UUID version)
+ {
+ this.version = version;
+ }
+
+ WaitQueue.Signal register()
+ {
+ return waitQueue.register();
+ }
+
+ void markReceived()
+ {
+ if (receivedSchema)
+ return;
+
+ receivedSchema = true;
+ waitQueue.signalAll();
+ }
+
+ boolean wasReceived()
+ {
+ return receivedSchema;
+ }
+ }
+
+ private final Map<UUID, VersionInfo> versionInfo = new HashMap<>();
+ private final Map<InetAddressAndPort, UUID> endpointVersions = new HashMap<>();
+ private final AtomicInteger inflightTasks = new AtomicInteger();
+
+ public void start()
+ {
+ ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(this::pullUnreceivedSchemaVersions, 1, 1, TimeUnit.MINUTES);
+ }
+
+ public synchronized void reset()
+ {
+ versionInfo.clear();
+ }
+
+ synchronized List<Future<Void>> pullUnreceivedSchemaVersions()
+ {
+ List<Future<Void>> futures = new ArrayList<>();
+ for (VersionInfo info : versionInfo.values())
+ {
+ if (info.wasReceived() || info.outstandingRequests.size() > 0)
+ continue;
+
+ Future<Void> future = maybePullSchema(info);
+ if (future != null && future != FINISHED_FUTURE)
+ futures.add(future);
+ }
+
+ return futures;
+ }
+
+ synchronized Future<Void> maybePullSchema(VersionInfo info)
+ {
+ if (info.endpoints.isEmpty() || info.wasReceived() || !shouldPullSchema(info.version))
+ return FINISHED_FUTURE;
+
+ if (info.outstandingRequests.size() >= getMaxOutstandingVersionRequests())
+ return FINISHED_FUTURE;
+
+ for (int i=0, isize=info.requestQueue.size(); i<isize; i++)
+ {
+ InetAddressAndPort endpoint = info.requestQueue.remove();
+ if (!info.endpoints.contains(endpoint))
+ continue;
+
+ if (shouldPullFromEndpoint(endpoint) && info.outstandingRequests.add(endpoint))
+ {
+ return scheduleSchemaPull(endpoint, info);
+ }
+ else
+ {
+ // return to queue
+ info.requestQueue.offer(endpoint);
+ }
+ }
+
+ // no suitable endpoints were found, check again in a minute, the periodic task will pick it up
+ return null;
+ }
+
+ public synchronized Map<UUID, Set<InetAddressAndPort>> outstandingVersions()
+ {
+ HashMap<UUID, Set<InetAddressAndPort>> map = new HashMap<>();
+ for (VersionInfo info : versionInfo.values())
+ if (!info.wasReceived())
+ map.put(info.version, ImmutableSet.copyOf(info.endpoints));
+ return map;
+ }
+
+ @VisibleForTesting
+ protected VersionInfo getVersionInfoUnsafe(UUID version)
+ {
+ return versionInfo.get(version);
+ }
+
+ @VisibleForTesting
+ protected int getMaxOutstandingVersionRequests()
+ {
+ return MAX_OUTSTANDING_VERSION_REQUESTS;
+ }
+
+ @VisibleForTesting
+ protected boolean isAlive(InetAddressAndPort endpoint)
+ {
+ return FailureDetector.instance.isAlive(endpoint);
+ }
+
+ @VisibleForTesting
+ protected boolean shouldPullSchema(UUID version)
+ {
+ if (Schema.instance.getVersion() == null)
+ {
+ logger.debug("Not pulling schema for version {}, because local schama version is not known yet", version);
+ return false;
+ }
+
+ if (Schema.instance.isSameVersion(version))
+ {
+ logger.debug("Not pulling schema for version {}, because schema versions match: " +
+ "local={}, remote={}",
+ version,
+ Schema.schemaVersionToString(Schema.instance.getVersion()),
+ Schema.schemaVersionToString(version));
+ return false;
+ }
+ return true;
+ }
+
+ // Since 3.0.14 protocol contains only a CASSANDRA-13004 bugfix, it is safe to accept schema changes
+ // from both 3.0 and 3.0.14.
+ private static boolean is30Compatible(int version)
+ {
+ return version == MessagingService.current_version || version == MessagingService.VERSION_3014;
+ }
+
+ @VisibleForTesting
+ protected boolean shouldPullFromEndpoint(InetAddressAndPort endpoint)
+ {
+ if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
+ return false;
+
+ EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+ if (state == null)
+ return false;
+
+ final String releaseVersion = state.getApplicationState(ApplicationState.RELEASE_VERSION).value;
+ final String ourMajorVersion = FBUtilities.getReleaseVersionMajor();
+
+ if (!releaseVersion.startsWith(ourMajorVersion))
+ {
+ logger.debug("Not pulling schema from {} because release version in Gossip is not major version {}, it is {}",
+ endpoint, ourMajorVersion, releaseVersion);
+ return false;
+ }
+
+ if (!MessagingService.instance().versions.knows(endpoint))
+ {
+ logger.debug("Not pulling schema from {} because their messaging version is unknown", endpoint);
+ return false;
+ }
+
+ if (MessagingService.instance().versions.getRaw(endpoint) != MessagingService.current_version)
+ {
+ logger.debug("Not pulling schema from {} because their schema format is incompatible", endpoint);
+ return false;
+ }
+
+ if (Gossiper.instance.isGossipOnlyMember(endpoint))
+ {
+ logger.debug("Not pulling schema from {} because it's a gossip only member", endpoint);
+ return false;
+ }
+ return true;
+ }
+
+ @VisibleForTesting
+ protected boolean shouldPullImmediately(InetAddressAndPort endpoint, UUID version)
+ {
+ if (Schema.instance.isEmpty() || getUptimeFn.getAsLong() < MIGRATION_DELAY_IN_MS)
+ {
+ // If we think we may be bootstrapping or have recently started, submit MigrationTask immediately
+ logger.debug("Immediately submitting migration task for {}, " +
+ "schema versions: local={}, remote={}",
+ endpoint,
+ Schema.schemaVersionToString(Schema.instance.getVersion()),
+ Schema.schemaVersionToString(version));
+ return true;
+ }
+ return false;
+ }
+
+ @VisibleForTesting
+ protected boolean isLocalVersion(UUID version)
+ {
+ return Schema.instance.isSameVersion(version);
+ }
+
+ /**
+ * If a previous schema update brought our version the same as the incoming schema, don't apply it
+ */
+ synchronized boolean shouldApplySchemaFor(VersionInfo info)
+ {
+ if (info.wasReceived())
+ return false;
+ return !isLocalVersion(info.version);
+ }
+
+ public synchronized Future<Void> reportEndpointVersion(InetAddressAndPort endpoint, UUID version)
+ {
+ UUID current = endpointVersions.put(endpoint, version);
+ if (current != null && current.equals(version))
+ return FINISHED_FUTURE;
+
+ VersionInfo info = versionInfo.computeIfAbsent(version, VersionInfo::new);
+ if (isLocalVersion(version))
+ info.markReceived();
+ info.endpoints.add(endpoint);
+ info.requestQueue.addFirst(endpoint);
+
+ // disassociate this endpoint from its (now) previous schema version
+ removeEndpointFromVersion(endpoint, current);
+
+ return maybePullSchema(info);
+ }
+
+ public Future<Void> reportEndpointVersion(InetAddressAndPort endpoint, EndpointState state)
+ {
+ if (state == null)
+ return FINISHED_FUTURE;
+
+ UUID version = state.getSchemaVersion();
+
+ if (version == null)
+ return FINISHED_FUTURE;
+
+ return reportEndpointVersion(endpoint, version);
+ }
+
+ private synchronized void removeEndpointFromVersion(InetAddressAndPort endpoint, UUID version)
+ {
+ if (version == null)
+ return;
+
+ VersionInfo info = versionInfo.get(version);
+
+ if (info == null)
+ return;
+
+ info.endpoints.remove(endpoint);
+ if (info.endpoints.isEmpty())
+ {
+ info.waitQueue.signalAll();
+ versionInfo.remove(version);
+ }
+ }
+
- public synchronized void removeVersionInfoForEndpoint(InetAddress endpoint)
++ public synchronized void removeVersionInfoForEndpoint(InetAddressAndPort endpoint)
+ {
+ Set<UUID> versions = ImmutableSet.copyOf(versionInfo.keySet());
+ for (UUID version : versions)
+ {
+ removeEndpointFromVersion(endpoint, version);
+ }
+ }
+
- Future<Void> scheduleSchemaPull(InetAddress endpoint, VersionInfo info)
++ Future<Void> scheduleSchemaPull(InetAddressAndPort endpoint, VersionInfo info)
+ {
+ FutureTask<Void> task = new FutureTask<>(() -> pullSchema(new Callback(endpoint, info)), null);
+ if (shouldPullImmediately(endpoint, info.version))
+ {
+ submitToMigrationIfNotShutdown(task);
+ }
+ else
+ {
+ ScheduledExecutors.nonPeriodicTasks.schedule(()->submitToMigrationIfNotShutdown(task), MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);
+ }
+ return task;
+ }
+
+ private static Future<?> submitToMigrationIfNotShutdown(Runnable task)
+ {
+ if (Stage.MIGRATION.executor().isShutdown() || Stage.MIGRATION.executor().isTerminated())
+ {
+ logger.info("Skipped scheduled pulling schema from other nodes: the MIGRATION executor service has been shutdown.");
+ return null;
+ }
+ else
+ return Stage.MIGRATION.submit(task);
+ }
+
+ @VisibleForTesting
+ protected void mergeSchemaFrom(InetAddressAndPort endpoint, Collection<Mutation> mutations)
+ {
+ Schema.instance.mergeAndAnnounceVersion(mutations);
+ }
+
+ class Callback implements RequestCallback<Collection<Mutation>>
+ {
+ final InetAddressAndPort endpoint;
+ final VersionInfo info;
+
+ public Callback(InetAddressAndPort endpoint, VersionInfo info)
+ {
+ this.endpoint = endpoint;
+ this.info = info;
+ }
+
+ public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason)
+ {
+ fail();
+ }
+
+ Future<Void> fail()
+ {
+ return pullComplete(endpoint, info, false);
+ }
+
+ public void onResponse(Message<Collection<Mutation>> message)
+ {
+ response(message.payload);
+ }
+
+ Future<Void> response(Collection<Mutation> mutations)
+ {
+ synchronized (info)
+ {
+ if (shouldApplySchemaFor(info))
+ {
+ try
+ {
+ mergeSchemaFrom(endpoint, mutations);
+ }
+ catch (Exception e)
+ {
+ logger.error(String.format("Unable to merge schema from %s", endpoint), e);
+ return fail();
+ }
+ }
+ return pullComplete(endpoint, info, true);
+ }
+ }
+
+ public boolean isLatencyForSnitch()
+ {
+ return false;
+ }
+ }
+
+ private void pullSchema(Callback callback)
+ {
+ if (!isAlive(callback.endpoint))
+ {
+ logger.warn("Can't send schema pull request: node {} is down.", callback.endpoint);
+ callback.fail();
+ return;
+ }
+
+ // There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(),
+ // potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with
+ // a higher major.
+ if (!shouldPullFromEndpoint(callback.endpoint))
+ {
+ logger.info("Skipped sending a migration request: node {} has a higher major version now.", callback.endpoint);
+ callback.fail();
+ return;
+ }
+
+ logger.debug("Requesting schema from {}", callback.endpoint);
+ sendMigrationMessage(callback);
+ }
+
+ protected void sendMigrationMessage(Callback callback)
+ {
+ inflightTasks.getAndIncrement();
+ Message message = Message.out(Verb.SCHEMA_PULL_REQ, NoPayload.noPayload);
+ logger.info("Sending schema pull request to {}", callback.endpoint);
+ MessagingService.instance().sendWithCallback(message, callback.endpoint, callback);
+ }
+
+ private synchronized Future<Void> pullComplete(InetAddressAndPort endpoint, VersionInfo info, boolean wasSuccessful)
+ {
+ inflightTasks.decrementAndGet();
+ if (wasSuccessful)
+ info.markReceived();
+
+ info.outstandingRequests.remove(endpoint);
+ info.requestQueue.add(endpoint);
+ return maybePullSchema(info);
+ }
+
+ public int getInflightTasks()
+ {
+ return inflightTasks.get();
+ }
+
+ /**
+ * Wait until we've received schema responses for all versions we're aware of
+ * @param waitMillis
+ * @return true if response for all schemas were received, false if we timed out waiting
+ */
+ public boolean awaitSchemaRequests(long waitMillis)
+ {
+ if (!FBUtilities.getBroadcastAddressAndPort().equals(InetAddressAndPort.getLoopbackAddress()))
+ Gossiper.waitToSettle();
+
+ WaitQueue.Signal signal = null;
+ try
+ {
+ synchronized (this)
+ {
+ List<WaitQueue.Signal> signalList = new ArrayList<>(versionInfo.size());
+ for (VersionInfo version : versionInfo.values())
+ {
+ if (version.wasReceived())
+ continue;
+
+ signalList.add(version.register());
+ }
+
+ if (signalList.isEmpty())
+ return true;
+
+ WaitQueue.Signal[] signals = new WaitQueue.Signal[signalList.size()];
+ signalList.toArray(signals);
+ signal = WaitQueue.all(signals);
+ }
+
+ return signal.awaitUntil(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(waitMillis));
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ if (signal != null)
+ signal.cancel();
+ }
+ }
+}
diff --cc test/unit/org/apache/cassandra/schema/MigrationCoordinatorTest.java
index 9cc8c94,0000000..acd45b9
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/schema/MigrationCoordinatorTest.java
+++ b/test/unit/org/apache/cassandra/schema/MigrationCoordinatorTest.java
@@@ -1,339 -1,0 +1,339 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.schema;
+
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Future;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
+
+import static com.google.common.util.concurrent.Futures.getUnchecked;
+
+public class MigrationCoordinatorTest
+{
+ private static final Logger logger = LoggerFactory.getLogger(MigrationCoordinatorTest.class);
+
+ private static final InetAddressAndPort EP1;
+ private static final InetAddressAndPort EP2;
+ private static final InetAddressAndPort EP3;
+
+ private static final UUID LOCAL_VERSION = UUID.randomUUID();
+ private static final UUID V1 = UUID.randomUUID();
+ private static final UUID V2 = UUID.randomUUID();
+
+ static
+ {
+ try
+ {
+ EP1 = InetAddressAndPort.getByName("10.0.0.1");
+ EP2 = InetAddressAndPort.getByName("10.0.0.2");
+ EP3 = InetAddressAndPort.getByName("10.0.0.3");
+ }
+ catch (UnknownHostException e)
+ {
+ throw new AssertionError(e);
+ }
+
+ DatabaseDescriptor.daemonInitialization();
+ }
+
+ private static class InstrumentedCoordinator extends MigrationCoordinator
+ {
+
+ Queue<Callback> requests = new LinkedList<>();
+ @Override
+ protected void sendMigrationMessage(MigrationCoordinator.Callback callback)
+ {
+ requests.add(callback);
+ }
+
+ boolean shouldPullSchema = true;
+ @Override
+ protected boolean shouldPullSchema(UUID version)
+ {
+ return shouldPullSchema;
+ }
+
+ boolean shouldPullFromEndpoint = true;
+ @Override
+ protected boolean shouldPullFromEndpoint(InetAddressAndPort endpoint)
+ {
+ return shouldPullFromEndpoint;
+ }
+
+ boolean shouldPullImmediately = true;
+ @Override
+ protected boolean shouldPullImmediately(InetAddressAndPort endpoint, UUID version)
+ {
+ return shouldPullImmediately;
+ }
+
+ Set<InetAddressAndPort> deadNodes = new HashSet<>();
+ protected boolean isAlive(InetAddressAndPort endpoint)
+ {
+ return !deadNodes.contains(endpoint);
+ }
+
+ UUID localVersion = LOCAL_VERSION;
+ @Override
+ protected boolean isLocalVersion(UUID version)
+ {
+ return localVersion.equals(version);
+ }
+
+ int maxOutstandingRequests = 3;
+ @Override
+ protected int getMaxOutstandingVersionRequests()
+ {
+ return maxOutstandingRequests;
+ }
+
+ Set<InetAddressAndPort> mergedSchemasFrom = new HashSet<>();
+ @Override
+ protected void mergeSchemaFrom(InetAddressAndPort endpoint, Collection<Mutation> mutations)
+ {
+ mergedSchemasFrom.add(endpoint);
+ }
+ }
+
+ @Test
+ public void requestResponseCycle() throws InterruptedException
+ {
+ InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+ coordinator.maxOutstandingRequests = 1;
+
+ Assert.assertTrue(coordinator.requests.isEmpty());
+
+ // first schema report should send a migration request
+ getUnchecked(coordinator.reportEndpointVersion(EP1, V1));
+ Assert.assertEquals(1, coordinator.requests.size());
+ Assert.assertFalse(coordinator.awaitSchemaRequests(1));
+
+ // second should not
+ getUnchecked(coordinator.reportEndpointVersion(EP2, V1));
+ Assert.assertEquals(1, coordinator.requests.size());
+ Assert.assertFalse(coordinator.awaitSchemaRequests(1));
+
+ // until the first request fails, then the second endpoint should be contacted
+ MigrationCoordinator.Callback request1 = coordinator.requests.poll();
+ Assert.assertEquals(EP1, request1.endpoint);
+ getUnchecked(request1.fail());
+ Assert.assertTrue(coordinator.mergedSchemasFrom.isEmpty());
+ Assert.assertFalse(coordinator.awaitSchemaRequests(1));
+
+ // ... then the second endpoint should be contacted
+ Assert.assertEquals(1, coordinator.requests.size());
+ MigrationCoordinator.Callback request2 = coordinator.requests.poll();
+ Assert.assertEquals(EP2, request2.endpoint);
+ Assert.assertFalse(coordinator.awaitSchemaRequests(1));
+ getUnchecked(request2.response(Collections.emptyList()));
+ Assert.assertEquals(EP2, Iterables.getOnlyElement(coordinator.mergedSchemasFrom));
+ Assert.assertTrue(coordinator.awaitSchemaRequests(1));
+
+ // and migration tasks should not be sent out for subsequent version reports
+ getUnchecked(coordinator.reportEndpointVersion(EP3, V1));
+ Assert.assertTrue(coordinator.requests.isEmpty());
+
+ }
+
+ /**
+ * If we don't send a request for a version, and endpoints associated with
+ * it all change versions, we should signal anyone waiting on that version
+ */
+ @Test
+ public void versionsAreSignaledWhenDeleted()
+ {
+ InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+
+ coordinator.reportEndpointVersion(EP1, V1);
+ WaitQueue.Signal signal = coordinator.getVersionInfoUnsafe(V1).register();
+ Assert.assertFalse(signal.isSignalled());
+
+ coordinator.reportEndpointVersion(EP1, V2);
+ Assert.assertNull(coordinator.getVersionInfoUnsafe(V1));
+
+ Assert.assertTrue(signal.isSignalled());
+ }
+
+ /**
+ * If an endpoint is removed and no other endpoints are reporting its
+ * schema version, the version should be removed and we should signal
+ * anyone waiting on that version
+ */
+ @Test
+ public void versionsAreSignaledWhenEndpointsRemoved()
+ {
+ InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+
+ coordinator.reportEndpointVersion(EP1, V1);
+ WaitQueue.Signal signal = coordinator.getVersionInfoUnsafe(V1).register();
+ Assert.assertFalse(signal.isSignalled());
+
+ coordinator.removeVersionInfoForEndpoint(EP1);
+ Assert.assertNull(coordinator.getVersionInfoUnsafe(V1));
+
+ Assert.assertTrue(signal.isSignalled());
+ }
+
+
- private static void assertNoContact(InstrumentedCoordinator coordinator, InetAddress endpoint, UUID version, boolean startupShouldBeUnblocked)
++ private static void assertNoContact(InstrumentedCoordinator coordinator, InetAddressAndPort endpoint, UUID version, boolean startupShouldBeUnblocked)
+ {
+ Assert.assertTrue(coordinator.requests.isEmpty());
+ Future<Void> future = coordinator.reportEndpointVersion(EP1, V1);
+ if (future != null)
+ getUnchecked(future);
+ Assert.assertTrue(coordinator.requests.isEmpty());
+
+ Assert.assertEquals(startupShouldBeUnblocked, coordinator.awaitSchemaRequests(1));
+ }
+
+ private static void assertNoContact(InstrumentedCoordinator coordinator, boolean startupShouldBeUnblocked)
+ {
+ assertNoContact(coordinator, EP1, V1, startupShouldBeUnblocked);
+ }
+
+ @Test
+ public void dontContactNodesWithSameSchema()
+ {
+ InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+
+ coordinator.localVersion = V1;
+ assertNoContact(coordinator, true);
+ }
+
+ @Test
+ public void dontContactIncompatibleNodes()
+ {
+ InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+
+ coordinator.shouldPullFromEndpoint = false;
+ assertNoContact(coordinator, false);
+ }
+
+ @Test
+ public void dontContactDeadNodes()
+ {
+ InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+
+ coordinator.deadNodes.add(EP1);
+ assertNoContact(coordinator, EP1, V1, false);
+ }
+
+ /**
+ * If a node has become incompativle between when the task was scheduled and when it
+ * was run, we should detect that and fail the task
+ */
+ @Test
+ public void testGossipRace()
+ {
+ InstrumentedCoordinator coordinator = new InstrumentedCoordinator() {
+ protected boolean shouldPullImmediately(InetAddressAndPort endpoint, UUID version)
+ {
+ // this is the last thing that gets called before scheduling the pull, so set this flag here
+ shouldPullFromEndpoint = false;
+ return super.shouldPullImmediately(endpoint, version);
+ }
+ };
+
+ Assert.assertTrue(coordinator.shouldPullFromEndpoint(EP1));
+ assertNoContact(coordinator, EP1, V1, false);
+ }
+
+ @Test
+ public void testWeKeepSendingRequests() throws Exception
+ {
+ InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+
+ getUnchecked(coordinator.reportEndpointVersion(EP3, V2));
+ coordinator.requests.remove().response(Collections.emptyList());
+
+ getUnchecked(coordinator.reportEndpointVersion(EP1, V1));
+ getUnchecked(coordinator.reportEndpointVersion(EP2, V1));
+
+ MigrationCoordinator.Callback prev = null;
+ Set<InetAddressAndPort> EPs = Sets.newHashSet(EP1, EP2);
+ int ep1requests = 0;
+ int ep2requests = 0;
+
+ for (int i=0; i<10; i++)
+ {
+ Assert.assertEquals(String.format("%s", i), 2, coordinator.requests.size());
+
+ MigrationCoordinator.Callback next = coordinator.requests.remove();
+
+ // we should be contacting endpoints in a round robin fashion
+ Assert.assertTrue(EPs.contains(next.endpoint));
+ if (prev != null && prev.endpoint.equals(next.endpoint))
+ Assert.fail(String.format("Not expecting prev %s to be equal to next %s", prev.endpoint, next.endpoint));
+
+ // should send a new request
+ next.fail().get();
+ prev = next;
+ Assert.assertFalse(coordinator.awaitSchemaRequests(1));
+
+ Assert.assertEquals(2, coordinator.requests.size());
+ }
+ logger.info("{} -> {}", EP1, ep1requests);
+ logger.info("{} -> {}", EP2, ep2requests);
+
+ // a single success should unblock startup though
+ coordinator.requests.remove().response(Collections.emptyList());
+ Assert.assertTrue(coordinator.awaitSchemaRequests(1));
+
+ }
+
+ /**
+ * Pull unreceived schemas should detect and send requests out for any
+ * schemas that are marked unreceived and have no outstanding requests
+ */
+ @Test
+ public void pullUnreceived()
+ {
+ InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+
+ coordinator.shouldPullFromEndpoint = false;
+ assertNoContact(coordinator, false);
+
+ coordinator.shouldPullFromEndpoint = true;
+ Assert.assertEquals(0, coordinator.requests.size());
+ List<Future<Void>> futures = coordinator.pullUnreceivedSchemaVersions();
+ futures.forEach(Futures::getUnchecked);
+ Assert.assertEquals(1, coordinator.requests.size());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org