You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2012/01/07 15:39:24 UTC
[4/5] git commit: prevent multiple concurrent HH to the same target
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-3681
prevent multiple concurrent HH to the same target
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-3681
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0d093958
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0d093958
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0d093958
Branch: refs/heads/trunk
Commit: 0d0939582ceda20de2f7765e3496bda3d5318520
Parents: eca0c48
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Jan 5 16:39:57 2012 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Jan 5 17:05:53 2012 -0600
----------------------------------------------------------------------
.../apache/cassandra/db/HintedHandOffManager.java | 86 ++++++++++-----
1 files changed, 57 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d093958/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 6661ee3..0b92821 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -198,16 +198,24 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
|| (hintColumnFamily.getSortedColumns().size() == 1 && hintColumnFamily.getColumn(startColumn) != null);
}
- private int waitForSchemaAgreement(InetAddress endpoint) throws InterruptedException
+ private int waitForSchemaAgreement(InetAddress endpoint) throws TimeoutException
{
Gossiper gossiper = Gossiper.instance;
int waited = 0;
// first, wait for schema to be gossiped.
- while (gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA) == null) {
- Thread.sleep(1000);
+ while (gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA) == null)
+ {
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
waited += 1000;
if (waited > 2 * StorageService.RING_DELAY)
- throw new RuntimeException("Didin't receive gossiped schema from " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
+ throw new TimeoutException("Didin't receive gossiped schema from " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
}
waited = 0;
// then wait for the correct schema version.
@@ -217,44 +225,65 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
while (!gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA).value.equals(
gossiper.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddress()).getApplicationState(ApplicationState.SCHEMA).value))
{
- Thread.sleep(1000);
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
waited += 1000;
if (waited > 2 * StorageService.RING_DELAY)
- throw new RuntimeException("Could not reach schema agreement with " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
+ throw new TimeoutException("Could not reach schema agreement with " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
}
logger_.debug("schema for {} matches local schema", endpoint);
return waited;
}
- private void deliverHintsToEndpoint(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, TimeoutException, InterruptedException
+ private void deliverHintsToEndpoint(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, InterruptedException
{
- ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
try
{
- if (hintStore.isEmpty())
- return; // nothing to do, don't confuse users by logging a no-op handoff
-
- logger_.debug("Checking remote({}) schema before delivering hints", endpoint);
- int waited = waitForSchemaAgreement(endpoint);
- // sleep a random amount to stagger handoff delivery from different replicas.
- // (if we had to wait, then gossiper randomness took care of that for us already.)
- if (waited == 0) {
- // use a 'rounded' sleep interval because of a strange bug with windows: CASSANDRA-3375
- int sleep = FBUtilities.threadLocalRandom().nextInt(2000) * 30;
- logger_.debug("Sleeping {}ms to stagger hint delivery", sleep);
- Thread.sleep(sleep);
- }
-
- if (!FailureDetector.instance.isAlive(endpoint))
- {
- logger_.info("Endpoint {} died before hint delivery, aborting", endpoint);
- return;
- }
+ deliverHintsToEndpointInternal(endpoint);
}
finally
{
queuedDeliveries.remove(endpoint);
}
+ }
+
+ private void deliverHintsToEndpointInternal(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, InterruptedException
+ {
+ ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
+ if (hintStore.isEmpty())
+ return; // nothing to do, don't confuse users by logging a no-op handoff
+
+ logger_.debug("Checking remote({}) schema before delivering hints", endpoint);
+ int waited;
+ try
+ {
+ waited = waitForSchemaAgreement(endpoint);
+ }
+ catch (TimeoutException e)
+ {
+ return;
+ }
+ // sleep a random amount to stagger handoff delivery from different replicas.
+ // (if we had to wait, then gossiper randomness took care of that for us already.)
+ if (waited == 0)
+ {
+ // use a 'rounded' sleep interval because of a strange bug with windows: CASSANDRA-3375
+ int sleep = FBUtilities.threadLocalRandom().nextInt(2000) * 30;
+ logger_.debug("Sleeping {}ms to stagger hint delivery", sleep);
+ Thread.sleep(sleep);
+ }
+
+ if (!FailureDetector.instance.isAlive(endpoint))
+ {
+ logger_.info("Endpoint {} died before hint delivery, aborting", endpoint);
+ return;
+ }
// 1. Get the key of the endpoint we need to handoff
// 2. For each column, deserialize the mutation and send it to the endpoint
@@ -341,8 +370,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
}
}
- logger_.info(String.format("Finished hinted handoff of %s rows to endpoint %s",
- rowsReplayed, endpoint));
+ logger_.info(String.format("Finished hinted handoff of %s rows to endpoint %s", rowsReplayed, endpoint));
}
/**