You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2012/01/07 15:39:24 UTC

[4/5] git commit: prevent multiple concurrent HH to the same target patch by jbellis; reviewed by brandonwilliams for CASSANDRA-3681

prevent multiple concurrent HH to the same target
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-3681


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0d093958
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0d093958
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0d093958

Branch: refs/heads/trunk
Commit: 0d0939582ceda20de2f7765e3496bda3d5318520
Parents: eca0c48
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Jan 5 16:39:57 2012 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Jan 5 17:05:53 2012 -0600

----------------------------------------------------------------------
 .../apache/cassandra/db/HintedHandOffManager.java  |   86 ++++++++++-----
 1 files changed, 57 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d093958/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 6661ee3..0b92821 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -198,16 +198,24 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
                || (hintColumnFamily.getSortedColumns().size() == 1 && hintColumnFamily.getColumn(startColumn) != null);
     }
 
-    private int waitForSchemaAgreement(InetAddress endpoint) throws InterruptedException
+    private int waitForSchemaAgreement(InetAddress endpoint) throws TimeoutException
     {
         Gossiper gossiper = Gossiper.instance;
         int waited = 0;
         // first, wait for schema to be gossiped.
-        while (gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA) == null) {
-            Thread.sleep(1000);
+        while (gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA) == null)
+        {
+            try
+            {
+                Thread.sleep(1000);
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError(e);
+            }
             waited += 1000;
             if (waited > 2 * StorageService.RING_DELAY)
-                throw new RuntimeException("Didin't receive gossiped schema from " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
+                throw new TimeoutException("Didin't receive gossiped schema from " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
         }
         waited = 0;
         // then wait for the correct schema version.
@@ -217,44 +225,65 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         while (!gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA).value.equals(
                 gossiper.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddress()).getApplicationState(ApplicationState.SCHEMA).value))
         {
-            Thread.sleep(1000);
+            try
+            {
+                Thread.sleep(1000);
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError(e);
+            }
             waited += 1000;
             if (waited > 2 * StorageService.RING_DELAY)
-                throw new RuntimeException("Could not reach schema agreement with " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
+                throw new TimeoutException("Could not reach schema agreement with " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
         }
         logger_.debug("schema for {} matches local schema", endpoint);
         return waited;
     }
 
-    private void deliverHintsToEndpoint(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, TimeoutException, InterruptedException
+    private void deliverHintsToEndpoint(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, InterruptedException
     {
-        ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
         try
         {
-            if (hintStore.isEmpty())
-                return; // nothing to do, don't confuse users by logging a no-op handoff
-
-            logger_.debug("Checking remote({}) schema before delivering hints", endpoint);
-            int waited = waitForSchemaAgreement(endpoint);
-            // sleep a random amount to stagger handoff delivery from different replicas.
-            // (if we had to wait, then gossiper randomness took care of that for us already.)
-            if (waited == 0) {
-                // use a 'rounded' sleep interval because of a strange bug with windows: CASSANDRA-3375
-                int sleep = FBUtilities.threadLocalRandom().nextInt(2000) * 30;
-                logger_.debug("Sleeping {}ms to stagger hint delivery", sleep);
-                Thread.sleep(sleep);
-            }
-
-            if (!FailureDetector.instance.isAlive(endpoint))
-            {
-                logger_.info("Endpoint {} died before hint delivery, aborting", endpoint);
-                return;
-            }
+            deliverHintsToEndpointInternal(endpoint);
         }
         finally
         {
             queuedDeliveries.remove(endpoint);
         }
+    }
+
+    private void deliverHintsToEndpointInternal(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, InterruptedException
+    {
+        ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
+        if (hintStore.isEmpty())
+            return; // nothing to do, don't confuse users by logging a no-op handoff
+
+        logger_.debug("Checking remote({}) schema before delivering hints", endpoint);
+        int waited;
+        try
+        {
+            waited = waitForSchemaAgreement(endpoint);
+        }
+        catch (TimeoutException e)
+        {
+            return;
+        }
+        // sleep a random amount to stagger handoff delivery from different replicas.
+        // (if we had to wait, then gossiper randomness took care of that for us already.)
+        if (waited == 0)
+        {
+            // use a 'rounded' sleep interval because of a strange bug with windows: CASSANDRA-3375
+            int sleep = FBUtilities.threadLocalRandom().nextInt(2000) * 30;
+            logger_.debug("Sleeping {}ms to stagger hint delivery", sleep);
+            Thread.sleep(sleep);
+        }
+
+        if (!FailureDetector.instance.isAlive(endpoint))
+        {
+            logger_.info("Endpoint {} died before hint delivery, aborting", endpoint);
+            return;
+        }
 
         // 1. Get the key of the endpoint we need to handoff
         // 2. For each column, deserialize the mutation and send it to the endpoint
@@ -341,8 +370,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
             }
         }
 
-        logger_.info(String.format("Finished hinted handoff of %s rows to endpoint %s",
-                                   rowsReplayed, endpoint));
+        logger_.info(String.format("Finished hinted handoff of %s rows to endpoint %s", rowsReplayed, endpoint));
     }
 
     /**