You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by vi...@apache.org on 2014/02/15 22:47:14 UTC

git commit: Split out outgoing stream throughput within a DC and inter-DC patch by Vijay and benedict for CASSANDRA-6596

Repository: cassandra
Updated Branches:
  refs/heads/trunk ee020c944 -> 94bda1f7d


Split out outgoing stream throughput within a DC and inter-DC
patch by Vijay and benedict for CASSANDRA-6596


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/94bda1f7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/94bda1f7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/94bda1f7

Branch: refs/heads/trunk
Commit: 94bda1f7d39adc631cfc8ef316730b54df040cdc
Parents: ee020c9
Author: vparthasarathy <vi...@gmail.com>
Authored: Sat Feb 15 12:25:14 2014 -0800
Committer: vparthasarathy <vi...@gmail.com>
Committed: Sat Feb 15 12:25:14 2014 -0800

----------------------------------------------------------------------
 conf/cassandra.yaml                             |  6 +++
 .../org/apache/cassandra/config/Config.java     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    | 10 ++++
 .../cassandra/streaming/StreamManager.java      | 53 +++++++++++++++-----
 .../cassandra/streaming/StreamWriter.java       |  5 +-
 5 files changed, 61 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/94bda1f7/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 8538920..2666316 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -518,6 +518,12 @@ compaction_preheat_key_cache: true
 # When unset, the default is 200 Mbps or 25 MB/s.
 # stream_throughput_outbound_megabits_per_sec: 200
 
+# Throttles all streaming file transfer between the datacenters,
+# this setting allows users to throttle inter dc stream throughput in addition
+# to throttling all network stream traffic as configured with
+# stream_throughput_outbound_megabits_per_sec
+# inter_dc_stream_throughput_outbound_megabits_per_sec:
+
 # How long the coordinator should wait for read operations to complete
 read_request_timeout_in_ms: 5000
 # How long the coordinator should wait for seq or index scans to complete

http://git-wip-us.apache.org/repos/asf/cassandra/blob/94bda1f7/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 1f47171..d5108e3 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -124,6 +124,7 @@ public class Config
     public Integer max_streaming_retries = 3;
 
     public volatile Integer stream_throughput_outbound_megabits_per_sec = 200;
+    public volatile Integer inter_dc_stream_throughput_outbound_megabits_per_sec = 0;
 
     public String[] data_file_directories;
     public String flush_directory;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/94bda1f7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 29dece6..73a03eb 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -930,6 +930,16 @@ public class DatabaseDescriptor
         conf.stream_throughput_outbound_megabits_per_sec = value;
     }
 
+    public static int getInterDCStreamThroughputOutboundMegabitsPerSec()
+    {
+        return conf.inter_dc_stream_throughput_outbound_megabits_per_sec;
+    }
+
+    public static void setInterDCStreamThroughputOutboundMegabitsPerSec(int value)
+    {
+        conf.inter_dc_stream_throughput_outbound_megabits_per_sec = value;
+    }
+
     public static String[] getAllDataFileLocations()
     {
         return conf.data_file_directories;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/94bda1f7/src/java/org/apache/cassandra/streaming/StreamManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java b/src/java/org/apache/cassandra/streaming/StreamManager.java
index ccd0053..c82e45c 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManager.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.streaming;
 
+import java.net.InetAddress;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -32,8 +33,8 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.RateLimiter;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.streaming.management.StreamEventJMXNotifier;
 import org.apache.cassandra.streaming.management.StreamStateCompositeData;
@@ -47,25 +48,53 @@ public class StreamManager implements StreamManagerMBean
 {
     public static final StreamManager instance = new StreamManager();
 
-    private static final RateLimiter limiter = RateLimiter.create(Double.MAX_VALUE);
-
     /**
      * Gets streaming rate limiter.
      * When stream_throughput_outbound_megabits_per_sec is 0, this returns rate limiter
      * with the rate of Double.MAX_VALUE bytes per second.
      * Rate unit is bytes per sec.
      *
-     * @return RateLimiter with rate limit set
+     * @return StreamRateLimiter with rate limit set based on peer location.
      */
-    public static RateLimiter getRateLimiter()
+    public static StreamRateLimiter getRateLimiter(InetAddress peer)
     {
-        double currentThroughput = ((double) DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec()) * 1024 * 1024;
-        // if throughput is set to 0, throttling is disabled
-        if (currentThroughput == 0)
-            currentThroughput = Double.MAX_VALUE;
-        if (limiter.getRate() != currentThroughput)
-            limiter.setRate(currentThroughput);
-        return limiter;
+        return new StreamRateLimiter(peer);
+    }
+
+    public static class StreamRateLimiter
+    {
+        private static final double ONE_MEGA_BITS = 1024 * 1024 * 8;
+        private static final RateLimiter limiter = RateLimiter.create(Double.MAX_VALUE);
+        private static final RateLimiter interDCLimiter = RateLimiter.create(Double.MAX_VALUE);
+        private final boolean isLocalDC;
+
+        public StreamRateLimiter(InetAddress peer)
+        {
+            double throughput = ((double) DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec()) * ONE_MEGA_BITS;
+            mayUpdateThroughput(throughput, limiter);
+
+            double interDCThroughput = ((double) DatabaseDescriptor.getInterDCStreamThroughputOutboundMegabitsPerSec()) * ONE_MEGA_BITS;
+            mayUpdateThroughput(interDCThroughput, interDCLimiter);
+
+            isLocalDC = DatabaseDescriptor.getLocalDataCenter().equals(
+                        DatabaseDescriptor.getEndpointSnitch().getDatacenter(peer));
+        }
+
+        private void mayUpdateThroughput(double limit, RateLimiter rateLimiter)
+        {
+            // if throughput is set to 0, throttling is disabled
+            if (limit == 0)
+                limit = Double.MAX_VALUE;
+            if (rateLimiter.getRate() != limit)
+                rateLimiter.setRate(limit);
+        }
+
+        public void acquire(int toTransfer)
+        {
+            limiter.acquire(toTransfer);
+            if (!isLocalDC)
+                interDCLimiter.acquire(toTransfer);
+        }
     }
 
     private final StreamEventJMXNotifier notifier = new StreamEventJMXNotifier();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/94bda1f7/src/java/org/apache/cassandra/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java
index 04301ba..a84d2f4 100644
--- a/src/java/org/apache/cassandra/streaming/StreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java
@@ -24,7 +24,6 @@ import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.util.Collection;
 
-import com.google.common.util.concurrent.RateLimiter;
 import com.ning.compress.lzf.LZFOutputStream;
 
 import org.apache.cassandra.io.sstable.Component;
@@ -33,6 +32,7 @@ import org.apache.cassandra.io.util.DataIntegrityMetadata;
 import org.apache.cassandra.io.util.DataIntegrityMetadata.ChecksumValidator;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -44,7 +44,7 @@ public class StreamWriter
 
     protected final SSTableReader sstable;
     protected final Collection<Pair<Long, Long>> sections;
-    protected final RateLimiter limiter = StreamManager.getRateLimiter();
+    protected final StreamRateLimiter limiter;
     protected final StreamSession session;
 
     private OutputStream compressedOutput;
@@ -57,6 +57,7 @@ public class StreamWriter
         this.session = session;
         this.sstable = sstable;
         this.sections = sections;
+        this.limiter =  StreamManager.getRateLimiter(session.peer);
     }
 
     /**