You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/06/25 22:35:30 UTC

svn commit: r958106 - in /cassandra/branches/cassandra-0.6: CHANGES.txt src/java/org/apache/cassandra/db/HintedHandOffManager.java

Author: jbellis
Date: Fri Jun 25 20:35:30 2010
New Revision: 958106

URL: http://svn.apache.org/viewvc?rev=958106&view=rev
Log:
avoid queuing multiple hint deliveries for the same endpoint.  patch by jbellis; reviewed by Brandon Williams and eevans for CASSANDRA-1229

Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/HintedHandOffManager.java

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=958106&r1=958105&r2=958106&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Fri Jun 25 20:35:30 2010
@@ -32,6 +32,8 @@
  * remove opportunistic repairs, when two machines with overlapping replica
    responsibilities happen to finish major compactions of the same CF near
    the same time.  repairs are now fully manual (CASSANDRA-1190)
+ * avoid queuing multiple hint deliveries for the same endpoint
+   (CASSANDRA-1229)
 
 
 0.6.2

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=958106&r1=958105&r2=958106&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/HintedHandOffManager.java Fri Jun 25 20:35:30 2010
@@ -21,6 +21,7 @@ package org.apache.cassandra.db;
 import java.net.UnknownHostException;
 import java.util.Collection;
 import java.util.Arrays;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.ExecutorService;
 import java.io.IOException;
@@ -45,6 +46,7 @@ import org.apache.cassandra.thrift.Inval
 import org.apache.cassandra.db.filter.IdentityQueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.utils.WrappedRunnable;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
 
 
 /**
@@ -86,6 +88,8 @@ public class HintedHandOffManager
     public static final String HINTS_CF = "HintsColumnFamily";
     private static final int PAGE_SIZE = 10000;
 
+    private final NonBlockingHashSet<InetAddress> queuedDeliveries = new NonBlockingHashSet<InetAddress>();
+
     private final ExecutorService executor_ = new JMXEnabledThreadPoolExecutor("HINTED-HANDOFF-POOL");
 
     private static boolean sendMessage(InetAddress endPoint, String tableName, String key) throws IOException
@@ -144,9 +148,10 @@ public class HintedHandOffManager
                || (hintColumnFamily.getSortedColumns().size() == 1 && hintColumnFamily.getColumn(startColumn) != null);
     }
 
-    private static void deliverHintsToEndpoint(InetAddress endPoint) throws IOException, DigestMismatchException, InvalidRequestException, TimeoutException
+    private void deliverHintsToEndpoint(InetAddress endPoint) throws IOException, DigestMismatchException, InvalidRequestException, TimeoutException
     {
         logger_.info("Started hinted handoff for endPoint " + endPoint);
+        queuedDeliveries.remove(endPoint);
 
         byte[] targetEPBytes = endPoint.getAddress();
         // 1. Scan through all the keys that we need to handoff
@@ -213,6 +218,9 @@ public class HintedHandOffManager
     */
     public void deliverHints(final InetAddress to)
     {
+        if (queuedDeliveries.contains(to))
+            return;
+
         Runnable r = new WrappedRunnable()
         {
             public void runMayThrow() throws Exception