You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/11/25 00:24:22 UTC
[1/2] cassandra git commit: Wait for migration responses to complete
before bootstrapping
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.1 521cc5425 -> 8feb66e6f
Wait for migration responses to complete before bootstrapping
patch by Mike Adamson; reviewed by Sergio Bossa for CASSANDRA-10731
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ae315b5e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ae315b5e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ae315b5e
Branch: refs/heads/cassandra-3.1
Commit: ae315b5ec944571342146867c51b2ceb50f3845e
Parents: 29b988d
Author: Mike Adamson <ma...@datastax.com>
Authored: Mon Nov 16 15:48:33 2015 +0000
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Tue Nov 24 23:18:14 2015 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/service/MigrationManager.java | 24 ++++++++++++++++--
.../apache/cassandra/service/MigrationTask.java | 26 ++++++++++++++++++++
.../cassandra/service/StorageService.java | 9 +++----
4 files changed, 53 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae315b5e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 608d8f8..116d4c3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.1
+ * Wait for migration responses to complete before bootstrapping (CASSANDRA-10731)
* Unable to create a function with argument of type Inet (CASSANDRA-10741)
* Fix backward incompatibiliy in CqlInputFormat (CASSANDRA-10717)
* Correctly preserve deletion info on updated rows when notifying indexers
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae315b5e/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index b7f9bf3..c0b5b10 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -59,8 +59,10 @@ public class MigrationManager
public static final int MIGRATION_DELAY_IN_MS = 60000;
+ private static final int MIGRATION_TASK_WAIT_IN_SECONDS = Integer.parseInt(System.getProperty("cassandra.migration_task_wait_in_seconds", "1"));
+
private final List<MigrationListener> listeners = new CopyOnWriteArrayList<>();
-
+
private MigrationManager() {}
public void register(MigrationListener listener)
@@ -148,7 +150,25 @@ public class MigrationManager
public static boolean isReadyForBootstrap()
{
- return ((ThreadPoolExecutor) StageManager.getStage(Stage.MIGRATION)).getActiveCount() == 0;
+ return MigrationTask.getInflightTasks().isEmpty();
+ }
+
+ public static void waitUntilReadyForBootstrap()
+ {
+ CountDownLatch completionLatch;
+ while ((completionLatch = MigrationTask.getInflightTasks().poll()) != null)
+ {
+ try
+ {
+ if (!completionLatch.await(MIGRATION_TASK_WAIT_IN_SECONDS, TimeUnit.SECONDS))
+ logger.error("Migration task failed to complete");
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ logger.error("Migration task was interrupted");
+ }
+ }
}
public void notifyCreateKeyspace(KeyspaceMetadata ksm)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae315b5e/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationTask.java b/src/java/org/apache/cassandra/service/MigrationTask.java
index 8a1b858..39a5a11 100644
--- a/src/java/org/apache/cassandra/service/MigrationTask.java
+++ b/src/java/org/apache/cassandra/service/MigrationTask.java
@@ -20,11 +20,17 @@ package org.apache.cassandra.service;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Collection;
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.SystemKeyspace.BootstrapState;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.net.IAsyncCallback;
@@ -39,6 +45,10 @@ class MigrationTask extends WrappedRunnable
{
private static final Logger logger = LoggerFactory.getLogger(MigrationTask.class);
+ private static final ConcurrentLinkedQueue<CountDownLatch> inflightTasks = new ConcurrentLinkedQueue<>();
+
+ private static final Set<BootstrapState> monitoringBootstrapStates = EnumSet.of(BootstrapState.NEEDS_BOOTSTRAP, BootstrapState.IN_PROGRESS);
+
private final InetAddress endpoint;
MigrationTask(InetAddress endpoint)
@@ -46,6 +56,11 @@ class MigrationTask extends WrappedRunnable
this.endpoint = endpoint;
}
+ public static ConcurrentLinkedQueue<CountDownLatch> getInflightTasks()
+ {
+ return inflightTasks;
+ }
+
public void runMayThrow() throws Exception
{
// There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(),
@@ -65,6 +80,8 @@ class MigrationTask extends WrappedRunnable
MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);
+ final CountDownLatch completionLatch = new CountDownLatch(1);
+
IAsyncCallback<Collection<Mutation>> cb = new IAsyncCallback<Collection<Mutation>>()
{
@Override
@@ -78,6 +95,10 @@ class MigrationTask extends WrappedRunnable
{
logger.error("Configuration exception merging remote schema", e);
}
+ finally
+ {
+ completionLatch.countDown();
+ }
}
public boolean isLatencyForSnitch()
@@ -85,6 +106,11 @@ class MigrationTask extends WrappedRunnable
return false;
}
};
+
+ // Only save the latches if we need bootstrap or are bootstrapping
+ if (monitoringBootstrapStates.contains(SystemKeyspace.getBootstrapState()))
+ inflightTasks.offer(completionLatch);
+
MessagingService.instance().sendRR(message, endpoint, cb);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae315b5e/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 1c20a22..1baa478 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -17,8 +17,6 @@
*/
package org.apache.cassandra.service;
-import static java.nio.charset.StandardCharsets.ISO_8859_1;
-
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.File;
@@ -848,12 +846,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
}
- // if our schema hasn't matched yet, keep sleeping until it does
+ // if our schema hasn't matched yet, wait until it has
+ // we do this by waiting for all in-flight migration requests and responses to complete
// (post CASSANDRA-1391 we don't expect this to be necessary very often, but it doesn't hurt to be careful)
- while (!MigrationManager.isReadyForBootstrap())
+ if (!MigrationManager.isReadyForBootstrap())
{
setMode(Mode.JOINING, "waiting for schema information to complete", true);
- Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+ MigrationManager.waitUntilReadyForBootstrap();
}
setMode(Mode.JOINING, "schema complete, ready to bootstrap", true);
setMode(Mode.JOINING, "waiting for pending range calculation", true);
[2/2] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.1
Posted by al...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8feb66e6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8feb66e6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8feb66e6
Branch: refs/heads/cassandra-3.1
Commit: 8feb66e6fb57521a831a1a6627eb7e0e52462fd4
Parents: 521cc54 ae315b5
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Tue Nov 24 23:24:13 2015 +0000
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Tue Nov 24 23:24:13 2015 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/service/MigrationManager.java | 24 ++++++++++++++++--
.../apache/cassandra/service/MigrationTask.java | 26 ++++++++++++++++++++
.../cassandra/service/StorageService.java | 9 +++----
4 files changed, 53 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8feb66e6/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 038840a,116d4c3..75922bf
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,5 -1,5 +1,6 @@@
-3.0.1
+3.1
+Merged from 3.0:
+ * Wait for migration responses to complete before bootstrapping (CASSANDRA-10731)
* Unable to create a function with argument of type Inet (CASSANDRA-10741)
* Fix backward incompatibiliy in CqlInputFormat (CASSANDRA-10717)
* Correctly preserve deletion info on updated rows when notifying indexers