You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2021/04/13 13:02:50 UTC
[cassandra] 01/02: Don't wait for migrations from removed nodes,
mention flag to skip
This is an automated email from the ASF dual-hosted git repository.
brandonwilliams pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit f7365cb0d988ff8aecd20a18d70831d39954b4d4
Author: Brandon Williams <br...@apache.org>
AuthorDate: Fri Apr 9 16:19:17 2021 -0500
Don't wait for migrations from removed nodes, mention flag to skip
Patch by brandonwilliams; reviewed by Adam Holmberg, adelapena and
bdeggleston for CASSANDRA-16577
---
CHANGES.txt | 1 +
.../cassandra/schema/MigrationCoordinator.java | 11 ++++++++++-
.../apache/cassandra/service/StorageService.java | 8 +++++---
.../cassandra/schema/MigrationCoordinatorTest.java | 23 +++++++++++++++++++++-
4 files changed, 38 insertions(+), 5 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 15b8dfb..c22d6a6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -58,6 +58,7 @@ Merged from 3.11:
* Reduce amount of allocations during batch statement execution (CASSANDRA-16201)
* Update jflex-1.6.0.jar to match upstream (CASSANDRA-16393)
Merged from 3.0:
+ * Don't wait for schema migrations from removed nodes (CASSANDRA-16577)
* Ignore trailing zeros in hint files (CASSANDRA-16523)
* Refuse DROP COMPACT STORAGE if some 2.x sstables are in use (CASSANDRA-15897)
* Fix ColumnFilter::toString not returning a valid CQL fragment (CASSANDRA-16483)
diff --git a/src/java/org/apache/cassandra/schema/MigrationCoordinator.java b/src/java/org/apache/cassandra/schema/MigrationCoordinator.java
index 9046479..554f545 100644
--- a/src/java/org/apache/cassandra/schema/MigrationCoordinator.java
+++ b/src/java/org/apache/cassandra/schema/MigrationCoordinator.java
@@ -349,7 +349,16 @@ public class MigrationCoordinator
}
}
- Future<Void> scheduleSchemaPull(InetAddressAndPort endpoint, VersionInfo info)
+ public synchronized void removeVersionInfoForEndpoint(InetAddress endpoint)
+ {
+ Set<UUID> versions = ImmutableSet.copyOf(versionInfo.keySet());
+ for (UUID version : versions)
+ {
+ removeEndpointFromVersion(endpoint, version);
+ }
+ }
+
+ Future<Void> scheduleSchemaPull(InetAddress endpoint, VersionInfo info)
{
FutureTask<Void> task = new FutureTask<>(() -> pullSchema(new Callback(endpoint, info)), null);
if (shouldPullImmediately(endpoint, info.version))
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 91007a7..c277f9d 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1000,12 +1000,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return;
logger.warn(String.format("There are nodes in the cluster with a different schema version than us we did not merged schemas from, " +
- "our version : (%s), outstanding versions -> endpoints : %s",
+ "our version : (%s), outstanding versions -> endpoints : %s. Use -Dcassandra.skip_schema_check=true " +
+ "to ignore this.",
Schema.instance.getVersion(),
MigrationCoordinator.instance.outstandingVersions()));
if (REQUIRE_SCHEMAS)
- throw new RuntimeException("Didn't receive schemas for all known versions within the timeout");
+ throw new RuntimeException("Didn't receive schemas for all known versions within the timeout. " +
+ "Use -Dcassandra.skip_schema_check=true to skip this check.");
}
private void joinTokenRing(long schemaTimeoutMillis) throws ConfigurationException
@@ -2951,6 +2953,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private void removeEndpoint(InetAddressAndPort endpoint)
{
Gossiper.runInGossipStageBlocking(() -> Gossiper.instance.removeEndpoint(endpoint));
+ MigrationCoordinator.instance.removeVersionInfoForEndpoint(endpoint);
SystemKeyspace.removeEndpoint(endpoint);
}
@@ -3254,7 +3257,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
onChange(endpoint, entry.getKey(), entry.getValue());
}
- MigrationCoordinator.instance.reportEndpointVersion(endpoint, epState);
}
public void onAlive(InetAddressAndPort endpoint, EndpointState state)
diff --git a/test/unit/org/apache/cassandra/schema/MigrationCoordinatorTest.java b/test/unit/org/apache/cassandra/schema/MigrationCoordinatorTest.java
index 1695563..9cc8c94 100644
--- a/test/unit/org/apache/cassandra/schema/MigrationCoordinatorTest.java
+++ b/test/unit/org/apache/cassandra/schema/MigrationCoordinatorTest.java
@@ -191,7 +191,28 @@ public class MigrationCoordinatorTest
Assert.assertTrue(signal.isSignalled());
}
- private static void assertNoContact(InstrumentedCoordinator coordinator, InetAddressAndPort endpoint, UUID version, boolean startupShouldBeUnblocked)
+ /**
+ * If an endpoint is removed and no other endpoints are reporting its
+ * schema version, the version should be removed and we should signal
+ * anyone waiting on that version
+ */
+ @Test
+ public void versionsAreSignaledWhenEndpointsRemoved()
+ {
+ InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+
+ coordinator.reportEndpointVersion(EP1, V1);
+ WaitQueue.Signal signal = coordinator.getVersionInfoUnsafe(V1).register();
+ Assert.assertFalse(signal.isSignalled());
+
+ coordinator.removeVersionInfoForEndpoint(EP1);
+ Assert.assertNull(coordinator.getVersionInfoUnsafe(V1));
+
+ Assert.assertTrue(signal.isSignalled());
+ }
+
+
+ private static void assertNoContact(InstrumentedCoordinator coordinator, InetAddress endpoint, UUID version, boolean startupShouldBeUnblocked)
{
Assert.assertTrue(coordinator.requests.isEmpty());
Future<Void> future = coordinator.reportEndpointVersion(EP1, V1);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org