You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2012/09/11 18:28:49 UTC

git commit: Replace Throttle with Guava's RateLimiter for HintedHandOff patch by Aleksey Yeschenko; reviewed by Pavel Yaskevich for CASSANDRA-4541

Updated Branches:
  refs/heads/trunk 93685a478 -> 203b3ad04


Replace Throttle with Guava's RateLimiter for HintedHandOff
patch by Aleksey Yeschenko; reviewed by Pavel Yaskevich for CASSANDRA-4541


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/203b3ad0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/203b3ad0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/203b3ad0

Branch: refs/heads/trunk
Commit: 203b3ad04b63345373f9ad5473b2b30b8b467b03
Parents: 93685a4
Author: Pavel Yaskevich <xe...@apache.org>
Authored: Tue Sep 11 19:27:56 2012 +0300
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Tue Sep 11 19:27:56 2012 +0300

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../apache/cassandra/db/HintedHandOffManager.java  |   27 +++-----------
 2 files changed, 7 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/203b3ad0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 58b3272..1af1a4a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -58,6 +58,7 @@
  * Make CQL3 the default for CQL (CASSANDRA-4640)
  * update stress tool to be able to use CQL3 (CASSANDRA-4406)
  * Accept all thrift update on CQL3 cf but don't expose their metadata (CASSANDRA-4377)
+ * Replace Throttle with Guava's RateLimiter for HintedHandOff (CASSANDRA-4541)
 
 
 1.1.6

http://git-wip-us.apache.org/repos/asf/cassandra/blob/203b3ad0/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 22052ad..064033b 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -29,6 +29,7 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,13 +57,10 @@ import org.apache.cassandra.service.*;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Throttle;
 import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.cliffc.high_scale_lib.NonBlockingHashSet;
 
-
-
 /**
  * The hint schema looks like this:
  *
@@ -258,21 +256,6 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
 
     private void deliverHintsToEndpointInternal(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, InterruptedException
     {
-        long hintSizes = 0;
-        Throttle hintThrottle = new Throttle("HintThrottle", new Throttle.ThroughputFunction()
-        {
-            public int targetThroughput()
-            {
-                if (DatabaseDescriptor.getHintedHandoffThrottleInKB() < 1)
-                    // throttling disabled
-                    return 0;
-                // total throughput
-                int totalBytesPerMS = (DatabaseDescriptor.getHintedHandoffThrottleInKB() * 1024) / 8 / 1000;
-                // per hint throughput (target bytes per MS)
-                return totalBytesPerMS / Math.max(1, executor.getActiveCount());
-            }
-        });
-
         ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.HINTS_CF);
         if (hintStore.isEmpty())
             return; // nothing to do, don't confuse users by logging a no-op handoff
@@ -318,6 +301,10 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
             logger.debug("average hinted-row column size is {}; using pageSize of {}", averageColumnSize, pageSize);
         }
 
+        // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
+        int throttleInKB = DatabaseDescriptor.getHintedHandoffThrottleInKB();
+        RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
+
         delivery:
         while (true)
         {
@@ -357,10 +344,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
                     if (rm != null)
                     {
                         MessageOut<RowMutation> message = rm.createMessage();
+                        rateLimiter.acquire(message.serializedSize(MessagingService.current_version));
                         sendMutation(endpoint, message);
-                        // throttle for the messages sent.
-                        hintSizes += message.serializedSize(MessagingService.current_version);
-                        hintThrottle.throttle(hintSizes);
                         rowsReplayed++;
                     }
                     deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp());