You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2021/04/13 13:02:50 UTC

[cassandra] 01/02: Don't wait for migrations from removed nodes, mention flag to skip

This is an automated email from the ASF dual-hosted git repository.

brandonwilliams pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit f7365cb0d988ff8aecd20a18d70831d39954b4d4
Author: Brandon Williams <br...@apache.org>
AuthorDate: Fri Apr 9 16:19:17 2021 -0500

    Don't wait for migrations from removed nodes, mention flag to skip
    
    Patch by brandonwilliams; reviewed by Adam Holmberg, adelapena and
    bdeggleston for CASSANDRA-16577
---
 CHANGES.txt                                        |  1 +
 .../cassandra/schema/MigrationCoordinator.java     | 11 ++++++++++-
 .../apache/cassandra/service/StorageService.java   |  8 +++++---
 .../cassandra/schema/MigrationCoordinatorTest.java | 23 +++++++++++++++++++++-
 4 files changed, 38 insertions(+), 5 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 15b8dfb..c22d6a6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -58,6 +58,7 @@ Merged from 3.11:
  * Reduce amount of allocations during batch statement execution (CASSANDRA-16201)
  * Update jflex-1.6.0.jar to match upstream (CASSANDRA-16393)
 Merged from 3.0:
+ * Don't wait for schema migrations from removed nodes (CASSANDRA-16577)
  * Ignore trailing zeros in hint files (CASSANDRA-16523)
  * Refuse DROP COMPACT STORAGE if some 2.x sstables are in use (CASSANDRA-15897)
  * Fix ColumnFilter::toString not returning a valid CQL fragment (CASSANDRA-16483)
diff --git a/src/java/org/apache/cassandra/schema/MigrationCoordinator.java b/src/java/org/apache/cassandra/schema/MigrationCoordinator.java
index 9046479..554f545 100644
--- a/src/java/org/apache/cassandra/schema/MigrationCoordinator.java
+++ b/src/java/org/apache/cassandra/schema/MigrationCoordinator.java
@@ -349,7 +349,16 @@ public class MigrationCoordinator
         }
     }
 
-    Future<Void> scheduleSchemaPull(InetAddressAndPort endpoint, VersionInfo info)
+    public synchronized void removeVersionInfoForEndpoint(InetAddress endpoint)
+    {
+        Set<UUID> versions = ImmutableSet.copyOf(versionInfo.keySet());
+        for (UUID version : versions)
+        {
+            removeEndpointFromVersion(endpoint, version);
+        }
+    } 
+
+    Future<Void> scheduleSchemaPull(InetAddress endpoint, VersionInfo info)
     {
         FutureTask<Void> task = new FutureTask<>(() -> pullSchema(new Callback(endpoint, info)), null);
         if (shouldPullImmediately(endpoint, info.version))
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 91007a7..c277f9d 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1000,12 +1000,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             return;
 
         logger.warn(String.format("There are nodes in the cluster with a different schema version than us we did not merged schemas from, " +
-                                  "our version : (%s), outstanding versions -> endpoints : %s",
+                                  "our version : (%s), outstanding versions -> endpoints : %s. Use -Dcassandra.skip_schema_check=true " +
+                                  "to ignore this.",
                                   Schema.instance.getVersion(),
                                   MigrationCoordinator.instance.outstandingVersions()));
 
         if (REQUIRE_SCHEMAS)
-            throw new RuntimeException("Didn't receive schemas for all known versions within the timeout");
+            throw new RuntimeException("Didn't receive schemas for all known versions within the timeout. " +
+                                       "Use -Dcassandra.skip_schema_check=true to skip this check.");
     }
 
     private void joinTokenRing(long schemaTimeoutMillis) throws ConfigurationException
@@ -2951,6 +2953,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     private void removeEndpoint(InetAddressAndPort endpoint)
     {
         Gossiper.runInGossipStageBlocking(() -> Gossiper.instance.removeEndpoint(endpoint));
+        MigrationCoordinator.instance.removeVersionInfoForEndpoint(endpoint);
         SystemKeyspace.removeEndpoint(endpoint);
     }
 
@@ -3254,7 +3257,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         {
             onChange(endpoint, entry.getKey(), entry.getValue());
         }
-        MigrationCoordinator.instance.reportEndpointVersion(endpoint, epState);
     }
 
     public void onAlive(InetAddressAndPort endpoint, EndpointState state)
diff --git a/test/unit/org/apache/cassandra/schema/MigrationCoordinatorTest.java b/test/unit/org/apache/cassandra/schema/MigrationCoordinatorTest.java
index 1695563..9cc8c94 100644
--- a/test/unit/org/apache/cassandra/schema/MigrationCoordinatorTest.java
+++ b/test/unit/org/apache/cassandra/schema/MigrationCoordinatorTest.java
@@ -191,7 +191,28 @@ public class MigrationCoordinatorTest
         Assert.assertTrue(signal.isSignalled());
     }
 
-    private static void assertNoContact(InstrumentedCoordinator coordinator, InetAddressAndPort endpoint, UUID version, boolean startupShouldBeUnblocked)
+	/**
+	 * If an endpoint is removed and no other endpoints are reporting its
+	 * schema version, the version should be removed and we should signal
+	 * anyone waiting on that version
+	 */
+	@Test
+	public void versionsAreSignaledWhenEndpointsRemoved()
+	{
+		InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
+
+		coordinator.reportEndpointVersion(EP1, V1);
+		WaitQueue.Signal signal = coordinator.getVersionInfoUnsafe(V1).register();
+		Assert.assertFalse(signal.isSignalled());
+
+		coordinator.removeVersionInfoForEndpoint(EP1);
+		Assert.assertNull(coordinator.getVersionInfoUnsafe(V1));
+
+		Assert.assertTrue(signal.isSignalled());
+	}
+
+
+    private static void assertNoContact(InstrumentedCoordinator coordinator, InetAddress endpoint, UUID version, boolean startupShouldBeUnblocked)
     {
         Assert.assertTrue(coordinator.requests.isEmpty());
         Future<Void> future = coordinator.reportEndpointVersion(EP1, V1);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org