You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by st...@apache.org on 2016/09/20 01:08:45 UTC

[1/2] cassandra git commit: Support optional backpressure strategies at the coordinator

Repository: cassandra
Updated Branches:
  refs/heads/trunk 560faba2f -> d43b9ce50


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java b/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java
new file mode 100644
index 0000000..b94b6ee
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java
@@ -0,0 +1,409 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.utils.TestTimeSource;
+import org.apache.cassandra.utils.TimeSource;
+
+import static org.apache.cassandra.net.RateBasedBackPressure.FACTOR;
+import static org.apache.cassandra.net.RateBasedBackPressure.FLOW;
+import static org.apache.cassandra.net.RateBasedBackPressure.HIGH_RATIO;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class RateBasedBackPressureTest
+{
+    @Test(expected = IllegalArgumentException.class)
+    public void testAcceptsNoLessThanThreeArguments() throws Exception
+    {
+        new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "1"), new TestTimeSource(), 10);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testHighRatioMustBeBiggerThanZero() throws Exception
+    {
+        new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0", FACTOR, "2", FLOW, "FAST"), new TestTimeSource(), 10);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testHighRatioMustBeSmallerEqualThanOne() throws Exception
+    {
+        new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "2", FACTOR, "2", FLOW, "FAST"), new TestTimeSource(), 10);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testFactorMustBeBiggerEqualThanOne() throws Exception
+    {
+        new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "0", FLOW, "FAST"), new TestTimeSource(), 10);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testWindowSizeMustBeBiggerEqualThanTen() throws Exception
+    {
+        new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "5", FLOW, "FAST"), new TestTimeSource(), 1);
+    }
+
+    @Test
+    public void testFlowMustBeEitherFASTorSLOW() throws Exception
+    {
+        new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "1", FLOW, "FAST"), new TestTimeSource(), 10);
+        new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "1", FLOW, "SLOW"), new TestTimeSource(), 10);
+        try
+        {
+            new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "1", FLOW, "WRONG"), new TestTimeSource(), 10);
+            fail("Expected to fail with wrong flow type.");
+        }
+        catch (Exception ex)
+        {
+        }
+    }
+
+    @Test
+    public void testBackPressureStateUpdates()
+    {
+        long windowSize = 6000;
+        TestTimeSource timeSource = new TestTimeSource();
+        RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
+
+        RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress());
+        state.onMessageSent(null);
+        assertEquals(0, state.incomingRate.size());
+        assertEquals(0, state.outgoingRate.size());
+
+        state = strategy.newState(InetAddress.getLoopbackAddress());
+        state.onResponseReceived();
+        assertEquals(1, state.incomingRate.size());
+        assertEquals(1, state.outgoingRate.size());
+
+        state = strategy.newState(InetAddress.getLoopbackAddress());
+        state.onResponseTimeout();
+        assertEquals(0, state.incomingRate.size());
+        assertEquals(1, state.outgoingRate.size());
+    }
+
+    @Test
+    public void testBackPressureIsNotUpdatedBeyondInfinity() throws Exception
+    {
+        long windowSize = 6000;
+        TestTimeSource timeSource = new TestTimeSource();
+        RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
+        RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress());
+
+        // Get initial rate:
+        double initialRate = state.rateLimiter.getRate();
+        assertEquals(Double.POSITIVE_INFINITY, initialRate, 0.0);
+
+        // Update incoming and outgoing rate equally:
+        state.incomingRate.update(1);
+        state.outgoingRate.update(1);
+
+        // Move time ahead:
+        timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
+
+        // Verify the rate doesn't change because already at infinity:
+        strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
+        assertEquals(initialRate, state.rateLimiter.getRate(), 0.0);
+    }
+
+    @Test
+    public void testBackPressureIsUpdatedOncePerWindowSize() throws Exception
+    {
+        long windowSize = 6000;
+        TestTimeSource timeSource = new TestTimeSource();
+        RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
+        RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress());
+
+        // Get initial time:
+        long current = state.getLastIntervalAcquire();
+        assertEquals(0, current);
+
+        // Update incoming and outgoing rate:
+        state.incomingRate.update(1);
+        state.outgoingRate.update(1);
+
+        // Move time ahead by window size:
+        timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
+
+        // Verify the timestamp changed:
+        strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
+        current = state.getLastIntervalAcquire();
+        assertEquals(timeSource.currentTimeMillis(), current);
+
+        // Move time ahead by less than interval:
+        long previous = current;
+        timeSource.sleep(windowSize / 2, TimeUnit.MILLISECONDS);
+
+        // Verify the last timestamp didn't change because below the window size:
+        strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
+        current = state.getLastIntervalAcquire();
+        assertEquals(previous, current);
+    }
+
+    @Test
+    public void testBackPressureWhenBelowHighRatio() throws Exception
+    {
+        long windowSize = 6000;
+        TestTimeSource timeSource = new TestTimeSource();
+        RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
+        RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress());
+
+        // Update incoming and outgoing rate so that the ratio is 0.5:
+        state.incomingRate.update(50);
+        state.outgoingRate.update(100);
+
+        // Move time ahead:
+        timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
+
+        // Verify the rate is decreased by factor:
+        strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
+        assertEquals(7.4, state.rateLimiter.getRate(), 0.1);
+    }
+
+    @Test
+    public void testBackPressureRateLimiterIsIncreasedAfterGoingAgainAboveHighRatio() throws Exception
+    {
+        long windowSize = 6000;
+        TestTimeSource timeSource = new TestTimeSource();
+        RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
+        RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress());
+
+        // Update incoming and outgoing rate so that the ratio is 0.5:
+        state.incomingRate.update(50);
+        state.outgoingRate.update(100);
+
+        // Move time ahead:
+        timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
+
+        // Verify the rate decreased:
+        strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
+        assertEquals(7.4, state.rateLimiter.getRate(), 0.1);
+
+        // Update incoming and outgoing rate back above high rate:
+        state.incomingRate.update(50);
+        state.outgoingRate.update(50);
+
+        // Move time ahead:
+        timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
+
+        // Verify rate limiter is increased by factor:
+        strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
+        assertEquals(8.25, state.rateLimiter.getRate(), 0.1);
+
+        // Update incoming and outgoing rate to keep it below the limiter rate:
+        state.incomingRate.update(1);
+        state.outgoingRate.update(1);
+
+        // Move time ahead:
+        timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
+
+        // Verify rate limiter is not increased as already higher than the actual rate:
+        strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
+        assertEquals(8.25, state.rateLimiter.getRate(), 0.1);
+    }
+
+    @Test
+    public void testBackPressureFastFlow() throws Exception
+    {
+        long windowSize = 6000;
+        TestTimeSource timeSource = new TestTimeSource();
+        TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
+        RateBasedBackPressureState state1 = strategy.newState(InetAddress.getByName("127.0.0.1"));
+        RateBasedBackPressureState state2 = strategy.newState(InetAddress.getByName("127.0.0.2"));
+        RateBasedBackPressureState state3 = strategy.newState(InetAddress.getByName("127.0.0.3"));
+
+        // Update incoming and outgoing rates:
+        state1.incomingRate.update(50);
+        state1.outgoingRate.update(100);
+        state2.incomingRate.update(80); // fast
+        state2.outgoingRate.update(100);
+        state3.incomingRate.update(20);
+        state3.outgoingRate.update(100);
+
+        // Move time ahead:
+        timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
+
+        // Verify the fast replica rate limiting has been applied:
+        Set<RateBasedBackPressureState> replicaGroup = Sets.newHashSet(state1, state2, state3);
+        strategy.apply(replicaGroup, 1, TimeUnit.SECONDS);
+        assertTrue(strategy.checkAcquired());
+        assertTrue(strategy.checkApplied());
+        assertEquals(12.0, strategy.getRateLimiterForReplicaGroup(replicaGroup).getRate(), 0.1);
+    }
+
+    @Test
+    public void testBackPressureSlowFlow() throws Exception
+    {
+        long windowSize = 6000;
+        TestTimeSource timeSource = new TestTimeSource();
+        TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "SLOW"), timeSource, windowSize);
+        RateBasedBackPressureState state1 = strategy.newState(InetAddress.getByName("127.0.0.1"));
+        RateBasedBackPressureState state2 = strategy.newState(InetAddress.getByName("127.0.0.2"));
+        RateBasedBackPressureState state3 = strategy.newState(InetAddress.getByName("127.0.0.3"));
+
+        // Update incoming and outgoing rates:
+        state1.incomingRate.update(50);
+        state1.outgoingRate.update(100);
+        state2.incomingRate.update(100);
+        state2.outgoingRate.update(100);
+        state3.incomingRate.update(20); // slow
+        state3.outgoingRate.update(100);
+
+        // Move time ahead:
+        timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
+
+        // Verify the slow replica rate limiting has been applied:
+        Set<RateBasedBackPressureState> replicaGroup = Sets.newHashSet(state1, state2, state3);
+        strategy.apply(replicaGroup, 1, TimeUnit.SECONDS);
+        assertTrue(strategy.checkAcquired());
+        assertTrue(strategy.checkApplied());
+        assertEquals(3.0, strategy.getRateLimiterForReplicaGroup(replicaGroup).getRate(), 0.1);
+    }
+
+    @Test
+    public void testBackPressureWithDifferentGroups() throws Exception
+    {
+        long windowSize = 6000;
+        TestTimeSource timeSource = new TestTimeSource();
+        TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "SLOW"), timeSource, windowSize);
+        RateBasedBackPressureState state1 = strategy.newState(InetAddress.getByName("127.0.0.1"));
+        RateBasedBackPressureState state2 = strategy.newState(InetAddress.getByName("127.0.0.2"));
+        RateBasedBackPressureState state3 = strategy.newState(InetAddress.getByName("127.0.0.3"));
+        RateBasedBackPressureState state4 = strategy.newState(InetAddress.getByName("127.0.0.4"));
+
+        // Update incoming and outgoing rates:
+        state1.incomingRate.update(50); // this
+        state1.outgoingRate.update(100);
+        state2.incomingRate.update(100);
+        state2.outgoingRate.update(100);
+        state3.incomingRate.update(20); // this
+        state3.outgoingRate.update(100);
+        state4.incomingRate.update(80);
+        state4.outgoingRate.update(100);
+
+        // Move time ahead:
+        timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
+
+        // Verify the first group:
+        Set<RateBasedBackPressureState> replicaGroup = Sets.newHashSet(state1, state2);
+        strategy.apply(replicaGroup, 1, TimeUnit.SECONDS);
+        assertTrue(strategy.checkAcquired());
+        assertTrue(strategy.checkApplied());
+        assertEquals(7.4, strategy.getRateLimiterForReplicaGroup(replicaGroup).getRate(), 0.1);
+
+        // Verify the second group:
+        replicaGroup = Sets.newHashSet(state3, state4);
+        strategy.apply(replicaGroup, 1, TimeUnit.SECONDS);
+        assertTrue(strategy.checkAcquired());
+        assertTrue(strategy.checkApplied());
+        assertEquals(3.0, strategy.getRateLimiterForReplicaGroup(replicaGroup).getRate(), 0.1);
+    }
+
+    @Test
+    public void testBackPressurePastTimeout() throws Exception
+    {
+        long windowSize = 10000;
+        TestTimeSource timeSource = new TestTimeSource();
+        TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "SLOW"), timeSource, windowSize);
+        RateBasedBackPressureState state1 = strategy.newState(InetAddress.getByName("127.0.0.1"));
+        RateBasedBackPressureState state2 = strategy.newState(InetAddress.getByName("127.0.0.2"));
+        RateBasedBackPressureState state3 = strategy.newState(InetAddress.getByName("127.0.0.3"));
+
+        // Update incoming and outgoing rates:
+        state1.incomingRate.update(5); // slow
+        state1.outgoingRate.update(100);
+        state2.incomingRate.update(100);
+        state2.outgoingRate.update(100);
+        state3.incomingRate.update(100);
+        state3.outgoingRate.update(100);
+
+        // Move time ahead:
+        timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
+
+        // Verify the slow replica rate limiting has been applied:
+        Set<RateBasedBackPressureState> replicaGroup = Sets.newHashSet(state1, state2, state3);
+        strategy.apply(replicaGroup, 4, TimeUnit.SECONDS);
+        assertTrue(strategy.checkAcquired());
+        assertTrue(strategy.checkApplied());
+        assertEquals(0.5, strategy.getRateLimiterForReplicaGroup(replicaGroup).getRate(), 0.1);
+
+        // Make one more apply call to saturate the rate limit timeout (0.5 requests per second means 2 requests span
+        // 4 seconds, but we can only make one as we have to subtract the incoming response time):
+        strategy.apply(replicaGroup, 4, TimeUnit.SECONDS);
+
+        // Now verify another call to apply doesn't acquire the rate limit because of the max timeout of 4 seconds minus
+        // 2 seconds of response time, so the time source itself sleeps two second:
+        long start = timeSource.currentTimeMillis();
+        strategy.apply(replicaGroup, 4, TimeUnit.SECONDS);
+        assertFalse(strategy.checkAcquired());
+        assertTrue(strategy.checkApplied());
+        assertEquals(TimeUnit.NANOSECONDS.convert(2, TimeUnit.SECONDS),
+                     strategy.timeout);
+        assertEquals(strategy.timeout,
+                     TimeUnit.NANOSECONDS.convert(timeSource.currentTimeMillis() - start, TimeUnit.MILLISECONDS));
+    }
+
+    public static class TestableBackPressure extends RateBasedBackPressure
+    {
+        public volatile boolean acquired = false;
+        public volatile boolean applied = false;
+        public volatile long timeout;
+
+        public TestableBackPressure(Map<String, Object> args, TimeSource timeSource, long windowSize)
+        {
+            super(args, timeSource, windowSize);
+        }
+
+        @Override
+        public boolean doRateLimit(RateLimiter rateLimiter, long timeoutInNanos)
+        {
+            acquired = super.doRateLimit(rateLimiter, timeoutInNanos);
+            applied = true;
+            timeout = timeoutInNanos;
+            return acquired;
+        }
+
+        public boolean checkAcquired()
+        {
+            boolean checked = acquired;
+            acquired = false;
+            return checked;
+        }
+
+        public boolean checkApplied()
+        {
+            boolean checked = applied;
+            applied = false;
+            return checked;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/test/unit/org/apache/cassandra/utils/SlidingTimeRateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/SlidingTimeRateTest.java b/test/unit/org/apache/cassandra/utils/SlidingTimeRateTest.java
new file mode 100644
index 0000000..8c11f9d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/SlidingTimeRateTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SlidingTimeRateTest
+{
+    @Test
+    public void testUpdateAndGet()
+    {
+        SlidingTimeRate rate = new SlidingTimeRate(new TestTimeSource(), 10, 1, TimeUnit.SECONDS);
+        int updates = 100;
+        for (int i = 0; i < updates; i++)
+        {
+            rate.update(1);
+        }
+        Assert.assertEquals(updates, rate.get(TimeUnit.SECONDS), 0.0);
+    }
+
+    @Test
+    public void testUpdateAndGetBetweenWindows() throws InterruptedException
+    {
+        TestTimeSource time = new TestTimeSource();
+        SlidingTimeRate rate = new SlidingTimeRate(time, 5, 1, TimeUnit.SECONDS);
+        int updates = 100;
+        for (int i = 0; i < updates; i++)
+        {
+            rate.update(1);
+            time.sleep(100, TimeUnit.MILLISECONDS);
+        }
+        Assert.assertEquals(10, rate.get(TimeUnit.SECONDS), 0.0);
+    }
+
+    @Test
+    public void testUpdateAndGetPastWindowSize() throws InterruptedException
+    {
+        TestTimeSource time = new TestTimeSource();
+        SlidingTimeRate rate = new SlidingTimeRate(time, 5, 1, TimeUnit.SECONDS);
+        int updates = 100;
+        for (int i = 0; i < updates; i++)
+        {
+            rate.update(1);
+        }
+
+        time.sleep(6, TimeUnit.SECONDS);
+
+        Assert.assertEquals(0, rate.get(TimeUnit.SECONDS), 0.0);
+    }
+
+    @Test
+    public void testUpdateAndGetToPointInTime() throws InterruptedException
+    {
+        TestTimeSource time = new TestTimeSource();
+        SlidingTimeRate rate = new SlidingTimeRate(time, 5, 1, TimeUnit.SECONDS);
+        int updates = 10;
+        for (int i = 0; i < updates; i++)
+        {
+            rate.update(1);
+            time.sleep(100, TimeUnit.MILLISECONDS);
+        }
+
+        time.sleep(1, TimeUnit.SECONDS);
+
+        Assert.assertEquals(5, rate.get(TimeUnit.SECONDS), 0.0);
+        Assert.assertEquals(10, rate.get(1, TimeUnit.SECONDS), 0.0);
+    }
+
+    @Test
+    public void testDecay() throws InterruptedException
+    {
+        TestTimeSource time = new TestTimeSource();
+        SlidingTimeRate rate = new SlidingTimeRate(time, 5, 1, TimeUnit.SECONDS);
+        int updates = 10;
+        for (int i = 0; i < updates; i++)
+        {
+            rate.update(1);
+            time.sleep(100, TimeUnit.MILLISECONDS);
+        }
+        Assert.assertEquals(10, rate.get(TimeUnit.SECONDS), 0.0);
+
+        time.sleep(1, TimeUnit.SECONDS);
+
+        Assert.assertEquals(5, rate.get(TimeUnit.SECONDS), 0.0);
+
+        time.sleep(2, TimeUnit.SECONDS);
+
+        Assert.assertEquals(2.5, rate.get(TimeUnit.SECONDS), 0.0);
+    }
+
+    @Test
+    public void testPruning() throws InterruptedException
+    {
+        TestTimeSource time = new TestTimeSource();
+        SlidingTimeRate rate = new SlidingTimeRate(time, 5, 1, TimeUnit.SECONDS);
+
+        rate.update(1);
+        Assert.assertEquals(1, rate.size());
+
+        time.sleep(6, TimeUnit.SECONDS);
+
+        rate.prune();
+        Assert.assertEquals(0, rate.size());
+    }
+
+    @Test
+    public void testConcurrentUpdateAndGet() throws InterruptedException
+    {
+        final ExecutorService executor = Executors.newFixedThreadPool(FBUtilities.getAvailableProcessors());
+        final TestTimeSource time = new TestTimeSource();
+        final SlidingTimeRate rate = new SlidingTimeRate(time, 5, 1, TimeUnit.SECONDS);
+        int updates = 100000;
+        for (int i = 0; i < updates; i++)
+        {
+            executor.submit(() -> {
+                time.sleep(1, TimeUnit.MILLISECONDS);
+                rate.update(1);
+            });
+        }
+
+        executor.shutdown();
+
+        Assert.assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES));
+        Assert.assertEquals(1000, rate.get(TimeUnit.SECONDS), 100.0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/test/unit/org/apache/cassandra/utils/TestTimeSource.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/TestTimeSource.java b/test/unit/org/apache/cassandra/utils/TestTimeSource.java
new file mode 100644
index 0000000..4ecd086
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/TestTimeSource.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class TestTimeSource implements TimeSource
+{
+    private final AtomicLong timeInMillis = new AtomicLong(System.currentTimeMillis());
+
+    @Override
+    public long currentTimeMillis()
+    {
+        return timeInMillis.get();
+    }
+
+    @Override
+    public long nanoTime()
+    {
+        return timeInMillis.get() * 1_000_000;
+    }
+
+    @Override
+    public TimeSource sleep(long sleepFor, TimeUnit unit)
+    {
+        long current = timeInMillis.get();
+        long sleepInMillis = TimeUnit.MILLISECONDS.convert(sleepFor, unit);
+        boolean elapsed;
+        do
+        {
+            long newTime = current + sleepInMillis;
+            elapsed = timeInMillis.compareAndSet(current, newTime);
+            if (!elapsed)
+            {
+                long updated = timeInMillis.get();
+                if (updated - current >= sleepInMillis)
+                {
+                    elapsed = true;
+                }
+                else
+                {
+                    sleepInMillis -= updated - current;
+                    current = updated;
+                }
+            }
+        }
+        while (!elapsed);
+        return this;
+    }
+
+    @Override
+    public TimeSource sleepUninterruptibly(long sleepFor, TimeUnit unit)
+    {
+        return sleep(sleepFor, unit);
+    }
+}


[2/2] cassandra git commit: Support optional backpressure strategies at the coordinator

Posted by st...@apache.org.
Support optional backpressure strategies at the coordinator

patch by Sergio Bossa; reviewed by Stefania Alborghetti for CASSANDRA-9318


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d43b9ce5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d43b9ce5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d43b9ce5

Branch: refs/heads/trunk
Commit: d43b9ce5092f8879a1a66afebab74d86e9e127fb
Parents: 560faba
Author: Sergio Bossa <se...@gmail.com>
Authored: Mon Sep 19 10:42:50 2016 +0800
Committer: Stefania Alborghetti <st...@datastax.com>
Committed: Tue Sep 20 09:07:36 2016 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |  22 +
 .../org/apache/cassandra/config/Config.java     |   4 +-
 .../cassandra/config/DatabaseDescriptor.java    |  46 +++
 .../apache/cassandra/hints/HintsDispatcher.java |   6 +
 .../apache/cassandra/net/BackPressureState.java |  51 +++
 .../cassandra/net/BackPressureStrategy.java     |  42 ++
 .../apache/cassandra/net/IAsyncCallback.java    |   5 +
 .../apache/cassandra/net/MessagingService.java  | 124 +++++-
 .../cassandra/net/MessagingServiceMBean.java    |  18 +-
 .../net/OutboundTcpConnectionPool.java          |  12 +-
 .../cassandra/net/RateBasedBackPressure.java    | 296 ++++++++++++++
 .../net/RateBasedBackPressureState.java         | 133 ++++++
 .../cassandra/net/ResponseVerbHandler.java      |   5 +
 .../service/AbstractWriteResponseHandler.java   |  27 +-
 .../apache/cassandra/service/StorageProxy.java  |  37 +-
 .../apache/cassandra/utils/SlidingTimeRate.java | 167 ++++++++
 .../cassandra/utils/SystemTimeSource.java       |  54 +++
 .../apache/cassandra/utils/TestRateLimiter.java |  58 +++
 .../org/apache/cassandra/utils/TimeSource.java  |  58 +++
 .../utils/concurrent/IntervalLock.java          |  69 ++++
 test/resources/byteman/mutation_limiter.btm     |   8 +
 .../config/DatabaseDescriptorRefTest.java       |   1 +
 .../cassandra/net/MessagingServiceTest.java     | 247 ++++++++++-
 .../net/RateBasedBackPressureTest.java          | 409 +++++++++++++++++++
 .../cassandra/utils/SlidingTimeRateTest.java    | 146 +++++++
 .../apache/cassandra/utils/TestTimeSource.java  |  72 ++++
 27 files changed, 2072 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b625a58..e9e8ccf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.10
+ * Support optional backpressure strategies at the coordinator (CASSANDRA-9318)
  * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
  * Fix cassandra-stress graphing (CASSANDRA-12237)
  * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index a25e084..8a0b3ee 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1172,3 +1172,25 @@ gc_warn_threshold_in_ms: 1000
 # early. Any value size larger than this threshold will result into marking an SSTable
 # as corrupted.
 # max_value_size_in_mb: 256
+
+# Back-pressure settings #
+# If enabled, the coordinator will apply the back-pressure strategy specified below to each mutation
+# sent to replicas, with the aim of reducing pressure on overloaded replicas.
+back_pressure_enabled: false
+# The back-pressure strategy applied.
+# The default implementation, RateBasedBackPressure, takes three arguments:
+# high ratio, factor, and flow type, and uses the ratio between incoming mutation responses and outgoing mutation requests.
+# If below high ratio, outgoing mutations are rate limited according to the incoming rate decreased by the given factor;
+# if above high ratio, the rate limiting is increased by the given factor;
+# such factor is usually best configured between 1 and 10, use larger values for a faster recovery
+# at the expense of potentially more dropped mutations;
+# the rate limiting is applied according to the flow type: if FAST, it's rate limited at the speed of the fastest replica,
+# if SLOW at the speed of the slowest one.
+# New strategies can be added. Implementors need to implement org.apache.cassandra.net.BackpressureStrategy and
+# provide a public constructor accepting a Map<String, Object>.
+back_pressure_strategy:
+    - class_name: org.apache.cassandra.net.RateBasedBackPressure
+      parameters:
+        - high_ratio: 0.90
+          factor: 5
+          flow: FAST

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index f2f21ad..e6b3638 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -46,7 +46,6 @@ public class Config
      */
     public static final String PROPERTY_PREFIX = "cassandra.";
 
-
     public String cluster_name = "Test Cluster";
     public String authenticator;
     public String authorizer;
@@ -355,6 +354,9 @@ public class Config
      */
     public UserFunctionTimeoutPolicy user_function_timeout_policy = UserFunctionTimeoutPolicy.die;
 
+    public volatile boolean back_pressure_enabled = false;
+    public volatile ParameterizedClass back_pressure_strategy;
+
     public static boolean getOutboundBindAny()
     {
         return outboundBindAny;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index dc4cc36..ce889ff 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.config;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Constructor;
 import java.net.*;
 import java.nio.file.FileStore;
 import java.nio.file.Files;
@@ -53,6 +54,8 @@ import org.apache.cassandra.locator.DynamicEndpointSnitch;
 import org.apache.cassandra.locator.EndpointSnitchInfo;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.SeedProvider;
+import org.apache.cassandra.net.BackPressureStrategy;
+import org.apache.cassandra.net.RateBasedBackPressure;
 import org.apache.cassandra.scheduler.IRequestScheduler;
 import org.apache.cassandra.scheduler.NoScheduler;
 import org.apache.cassandra.security.EncryptionContext;
@@ -110,6 +113,7 @@ public class DatabaseDescriptor
     private static EncryptionContext encryptionContext;
     private static boolean hasLoggedConfig;
 
+    private static BackPressureStrategy backPressureStrategy;
     private static DiskOptimizationStrategy diskOptimizationStrategy;
 
     private static boolean clientInitialized;
@@ -706,6 +710,27 @@ public class DatabaseDescriptor
                 diskOptimizationStrategy = new SpinningDiskOptimizationStrategy();
                 break;
         }
+
+        try
+        {
+            ParameterizedClass strategy = conf.back_pressure_strategy != null ? conf.back_pressure_strategy : RateBasedBackPressure.withDefaultParams();
+            Class<?> clazz = Class.forName(strategy.class_name);
+            if (!BackPressureStrategy.class.isAssignableFrom(clazz))
+                throw new ConfigurationException(strategy + " is not an instance of " + BackPressureStrategy.class.getCanonicalName(), false);
+
+            Constructor<?> ctor = clazz.getConstructor(Map.class);
+            BackPressureStrategy instance = (BackPressureStrategy) ctor.newInstance(strategy.parameters);
+            logger.info("Back-pressure is {} with strategy {}.", backPressureEnabled() ? "enabled" : "disabled", conf.back_pressure_strategy);
+            backPressureStrategy = instance;
+        }
+        catch (ConfigurationException ex)
+        {
+            throw ex;
+        }
+        catch (Exception ex)
+        {
+            throw new ConfigurationException("Error configuring back-pressure strategy: " + conf.back_pressure_strategy, ex);
+        }
     }
 
     public static void applyAddressConfig() throws ConfigurationException
@@ -2342,4 +2367,25 @@ public class DatabaseDescriptor
     {
         return Integer.parseInt(System.getProperty("cassandra.search_concurrency_factor", "1"));
     }
+
+    public static void setBackPressureEnabled(boolean backPressureEnabled)
+    {
+        conf.back_pressure_enabled = backPressureEnabled;
+    }
+
+    public static boolean backPressureEnabled()
+    {
+        return conf.back_pressure_enabled;
+    }
+
+    @VisibleForTesting
+    public static void setBackPressureStrategy(BackPressureStrategy strategy)
+    {
+        backPressureStrategy = strategy;
+    }
+
+    public static BackPressureStrategy getBackPressureStrategy()
+    {
+        return backPressureStrategy;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/hints/HintsDispatcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatcher.java b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
index 29bab80..6940e1e 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatcher.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
@@ -209,5 +209,11 @@ final class HintsDispatcher implements AutoCloseable
         {
             return false;
         }
+
+        @Override
+        public boolean supportsBackPressure()
+        {
+            return true;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/BackPressureState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/BackPressureState.java b/src/java/org/apache/cassandra/net/BackPressureState.java
new file mode 100644
index 0000000..34fd0dd
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/BackPressureState.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+
+/**
+ * Interface meant to track the back-pressure state per replica host.
+ */
+public interface BackPressureState
+{
+    /**
+     * Called when a message is sent to a replica.
+     */
+    void onMessageSent(MessageOut<?> message);
+
+    /**
+     * Called when a response is received from a replica.
+     */
+    void onResponseReceived();
+
+    /**
+     * Called when no response is received from replica.
+     */
+    void onResponseTimeout();
+
+    /**
+     * Gets the current back-pressure rate limit.
+     */
+    double getBackPressureRateLimit();
+
+    /**
+     * Returns the host this state refers to.
+     */
+    InetAddress getHost();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/BackPressureStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/BackPressureStrategy.java b/src/java/org/apache/cassandra/net/BackPressureStrategy.java
new file mode 100644
index 0000000..b61a0a1
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/BackPressureStrategy.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Back-pressure algorithm interface.
+ * <br/>
+ * For experts usage only. Implementors must provide a constructor accepting a single {@code Map<String, Object>} argument,
+ * representing any parameters eventually required by the specific implementation.
+ */
+public interface BackPressureStrategy<S extends BackPressureState>
+{
+    /**
+     * Applies the back-pressure algorithm, based and acting on the given {@link BackPressureState}s, and up to the given
+     * timeout.
+     */
+    void apply(Set<S> states, long timeout, TimeUnit unit);
+
+    /**
+     * Creates a new {@link BackPressureState} initialized as needed by the specific implementation.
+     */
+    S newState(InetAddress host);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/IAsyncCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IAsyncCallback.java b/src/java/org/apache/cassandra/net/IAsyncCallback.java
index d159e0c..7835079 100644
--- a/src/java/org/apache/cassandra/net/IAsyncCallback.java
+++ b/src/java/org/apache/cassandra/net/IAsyncCallback.java
@@ -49,4 +49,9 @@ public interface IAsyncCallback<T>
      * given as input to the dynamic snitch.
      */
     boolean isLatencyForSnitch();
+
+    default boolean supportsBackPressure()
+    {
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index e72a9a2..7632ebd 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -28,6 +28,8 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -39,8 +41,10 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.concurrent.ExecutorLocals;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.Stage;
@@ -419,7 +423,6 @@ public final class MessagingService implements MessagingServiceMBean
                                                                    Verb.BATCH_STORE,
                                                                    Verb.BATCH_REMOVE);
 
-
     private static final class DroppedMessages
     {
         final DroppedMessageMetrics metrics;
@@ -445,20 +448,8 @@ public final class MessagingService implements MessagingServiceMBean
     // message sinks are a testing hook
     private final Set<IMessageSink> messageSinks = new CopyOnWriteArraySet<>();
 
-    public void addMessageSink(IMessageSink sink)
-    {
-        messageSinks.add(sink);
-    }
-
-    public void removeMessageSink(IMessageSink sink)
-    {
-        messageSinks.remove(sink);
-    }
-
-    public void clearMessageSinks()
-    {
-        messageSinks.clear();
-    }
+    // back-pressure implementation
+    private final BackPressureStrategy backPressure = DatabaseDescriptor.getBackPressureStrategy();
 
     private static class MSHandle
     {
@@ -504,9 +495,17 @@ public final class MessagingService implements MessagingServiceMBean
             public Object apply(Pair<Integer, ExpiringMap.CacheableObject<CallbackInfo>> pair)
             {
                 final CallbackInfo expiredCallbackInfo = pair.right.value;
+
                 maybeAddLatency(expiredCallbackInfo.callback, expiredCallbackInfo.target, pair.right.timeout);
+
                 ConnectionMetrics.totalTimeouts.mark();
                 getConnectionPool(expiredCallbackInfo.target).incrementTimeout();
+
+                if (expiredCallbackInfo.callback.supportsBackPressure())
+                {
+                    updateBackPressureOnReceive(expiredCallbackInfo.target, expiredCallbackInfo.callback, true);
+                }
+
                 if (expiredCallbackInfo.isFailureCallback())
                 {
                     StageManager.getStage(Stage.INTERNAL_RESPONSE).submit(new Runnable()
@@ -545,6 +544,76 @@ public final class MessagingService implements MessagingServiceMBean
         }
     }
 
+    public void addMessageSink(IMessageSink sink)
+    {
+        messageSinks.add(sink);
+    }
+
+    public void removeMessageSink(IMessageSink sink)
+    {
+        messageSinks.remove(sink);
+    }
+
+    public void clearMessageSinks()
+    {
+        messageSinks.clear();
+    }
+
+    /**
+     * Updates the back-pressure state on sending to the given host if enabled and the given message callback supports it.
+     *
+     * @param host The replica host the back-pressure state refers to.
+     * @param callback The message callback.
+     * @param message The actual message.
+     */
+    public void updateBackPressureOnSend(InetAddress host, IAsyncCallback callback, MessageOut<?> message)
+    {
+        if (DatabaseDescriptor.backPressureEnabled() && callback.supportsBackPressure())
+        {
+            BackPressureState backPressureState = getConnectionPool(host).getBackPressureState();
+            backPressureState.onMessageSent(message);
+        }
+    }
+
+    /**
+     * Updates the back-pressure state on reception from the given host if enabled and the given message callback supports it.
+     *
+     * @param host The replica host the back-pressure state refers to.
+     * @param callback The message callback.
+     * @param timeout True if updated following a timeout, false otherwise.
+     */
+    public void updateBackPressureOnReceive(InetAddress host, IAsyncCallback callback, boolean timeout)
+    {
+        if (DatabaseDescriptor.backPressureEnabled() && callback.supportsBackPressure())
+        {
+            BackPressureState backPressureState = getConnectionPool(host).getBackPressureState();
+            if (!timeout)
+                backPressureState.onResponseReceived();
+            else
+                backPressureState.onResponseTimeout();
+        }
+    }
+
+    /**
+     * Applies back-pressure for the given hosts, according to the configured strategy.
+     *
+     * If the local host is present, it is removed from the pool, as back-pressure is only applied
+     * to remote hosts.
+     *
+     * @param hosts The hosts to apply back-pressure to.
+     * @param timeoutInNanos The max back-pressure timeout.
+     */
+    public void applyBackPressure(Iterable<InetAddress> hosts, long timeoutInNanos)
+    {
+        if (DatabaseDescriptor.backPressureEnabled())
+        {
+            backPressure.apply(StreamSupport.stream(hosts.spliterator(), false)
+                    .filter(h -> !h.equals(FBUtilities.getBroadcastAddress()))
+                    .map(h -> getConnectionPool(h).getBackPressureState())
+                    .collect(Collectors.toSet()), timeoutInNanos, TimeUnit.NANOSECONDS);
+        }
+    }
+
     /**
      * Track latency information for the dynamic snitch
      *
@@ -699,7 +768,7 @@ public final class MessagingService implements MessagingServiceMBean
         OutboundTcpConnectionPool cp = connectionManagers.get(to);
         if (cp == null)
         {
-            cp = new OutboundTcpConnectionPool(to);
+            cp = new OutboundTcpConnectionPool(to, backPressure.newState(to));
             OutboundTcpConnectionPool existingPool = connectionManagers.putIfAbsent(to, cp);
             if (existingPool != null)
                 cp = existingPool;
@@ -805,6 +874,7 @@ public final class MessagingService implements MessagingServiceMBean
     public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, long timeout, boolean failureCallback)
     {
         int id = addCallback(cb, message, to, timeout, failureCallback);
+        updateBackPressureOnSend(to, cb, message);
         sendOneWay(failureCallback ? message.withParameter(FAILURE_CALLBACK_PARAM, ONE_BYTE) : message, id, to);
         return id;
     }
@@ -827,6 +897,7 @@ public final class MessagingService implements MessagingServiceMBean
                       boolean allowHints)
     {
         int id = addCallback(handler, message, to, message.getTimeout(), handler.consistencyLevel, allowHints);
+        updateBackPressureOnSend(to, handler, message);
         sendOneWay(message.withParameter(FAILURE_CALLBACK_PARAM, ONE_BYTE), id, to);
         return id;
     }
@@ -1379,6 +1450,27 @@ public final class MessagingService implements MessagingServiceMBean
         return result;
     }
 
+    public Map<String, Double> getBackPressurePerHost()
+    {
+        Map<String, Double> map = new HashMap<>(connectionManagers.size());
+        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet())
+            map.put(entry.getKey().getHostAddress(), entry.getValue().getBackPressureState().getBackPressureRateLimit());
+
+        return map;
+    }
+
+    @Override
+    public void setBackPressureEnabled(boolean enabled)
+    {
+        DatabaseDescriptor.setBackPressureEnabled(enabled);
+    }
+
+    @Override
+    public boolean isBackPressureEnabled()
+    {
+        return DatabaseDescriptor.backPressureEnabled();
+    }
+
     public static IPartitioner globalPartitioner()
     {
         return StorageService.instance.getTokenMetadata().partitioner;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
index 3bcb0d5..b2e79e0 100644
--- a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
+++ b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
@@ -23,8 +23,7 @@ import java.net.UnknownHostException;
 import java.util.Map;
 
 /**
- * MBean exposing MessagingService metrics.
- * - OutboundConnectionPools - Command/Response - Pending/Completed Tasks
+ * MBean exposing MessagingService metrics plus allowing to enable/disable back-pressure.
  */
 public interface MessagingServiceMBean
 {
@@ -88,5 +87,20 @@ public interface MessagingServiceMBean
      */
     public Map<String, Long> getTimeoutsPerHost();
 
+    /**
+     * Back-pressure rate limiting per host
+     */
+    public Map<String, Double> getBackPressurePerHost();
+
+    /**
+     * Enable/Disable back-pressure
+     */
+    public void setBackPressureEnabled(boolean enabled);
+
+    /**
+     * Get back-pressure enabled state
+     */
+    public boolean isBackPressureEnabled();
+
     public int getVersion(String address) throws UnknownHostException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
index 0418ff6..b0391ba 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
@@ -50,7 +50,10 @@ public class OutboundTcpConnectionPool
     private InetAddress resetEndpoint;
     private ConnectionMetrics metrics;
 
-    OutboundTcpConnectionPool(InetAddress remoteEp)
+    // back-pressure state linked to this connection:
+    private final BackPressureState backPressureState;
+
+    OutboundTcpConnectionPool(InetAddress remoteEp, BackPressureState backPressureState)
     {
         id = remoteEp;
         resetEndpoint = SystemKeyspace.getPreferredIP(remoteEp);
@@ -59,6 +62,8 @@ public class OutboundTcpConnectionPool
         smallMessages = new OutboundTcpConnection(this, "Small");
         largeMessages = new OutboundTcpConnection(this, "Large");
         gossipMessages = new OutboundTcpConnection(this, "Gossip");
+
+        this.backPressureState = backPressureState;
     }
 
     /**
@@ -74,6 +79,11 @@ public class OutboundTcpConnectionPool
                : smallMessages;
     }
 
+    public BackPressureState getBackPressureState()
+    {
+        return backPressureState;
+    }
+
     void reset()
     {
         for (OutboundTcpConnection conn : new OutboundTcpConnection[] { smallMessages, largeMessages, gossipMessages })

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/RateBasedBackPressure.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/RateBasedBackPressure.java b/src/java/org/apache/cassandra/net/RateBasedBackPressure.java
new file mode 100644
index 0000000..1dae243
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/RateBasedBackPressure.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.utils.NoSpamLogger;
+import org.apache.cassandra.utils.SystemTimeSource;
+import org.apache.cassandra.utils.TimeSource;
+import org.apache.cassandra.utils.concurrent.IntervalLock;
+
+/**
+ * Back-pressure algorithm based on rate limiting according to the ratio between incoming and outgoing rates, computed
+ * over a sliding time window with size equal to write RPC timeout.
+ */
+public class RateBasedBackPressure implements BackPressureStrategy<RateBasedBackPressureState>
+{
+    static final String HIGH_RATIO = "high_ratio";
+    static final String FACTOR = "factor";
+    static final String FLOW = "flow";
+    private static final String BACK_PRESSURE_HIGH_RATIO = "0.90";
+    private static final String BACK_PRESSURE_FACTOR = "5";
+    private static final String BACK_PRESSURE_FLOW = "FAST";
+
+    private static final Logger logger = LoggerFactory.getLogger(RateBasedBackPressure.class);
+    private static final NoSpamLogger tenSecsNoSpamLogger = NoSpamLogger.getLogger(logger, 10, TimeUnit.SECONDS);
+    private static final NoSpamLogger oneMinNoSpamLogger = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
+
+    protected final TimeSource timeSource;
+    protected final double highRatio;
+    protected final int factor;
+    protected final Flow flow;
+    protected final long windowSize;
+
+    private final Cache<Set<RateBasedBackPressureState>, IntervalRateLimiter> rateLimiters =
+            CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS).build();
+
+    enum Flow
+    {
+        FAST,
+        SLOW
+    }
+
+    public static ParameterizedClass withDefaultParams()
+    {
+        return new ParameterizedClass(RateBasedBackPressure.class.getName(),
+                                      ImmutableMap.of(HIGH_RATIO, BACK_PRESSURE_HIGH_RATIO,
+                                                      FACTOR, BACK_PRESSURE_FACTOR,
+                                                      FLOW, BACK_PRESSURE_FLOW));
+    }
+
+    public RateBasedBackPressure(Map<String, Object> args)
+    {
+        this(args, new SystemTimeSource(), DatabaseDescriptor.getWriteRpcTimeout());
+    }
+
+    @VisibleForTesting
+    public RateBasedBackPressure(Map<String, Object> args, TimeSource timeSource, long windowSize)
+    {
+        if (args.size() != 3)
+        {
+            throw new IllegalArgumentException(RateBasedBackPressure.class.getCanonicalName()
+                    + " requires 3 arguments: high ratio, back-pressure factor and flow type.");
+        }
+
+        try
+        {
+            highRatio = Double.parseDouble(args.getOrDefault(HIGH_RATIO, "").toString().trim());
+            factor = Integer.parseInt(args.getOrDefault(FACTOR, "").toString().trim());
+            flow = Flow.valueOf(args.getOrDefault(FLOW, "").toString().trim().toUpperCase());
+        }
+        catch (Exception ex)
+        {
+            throw new IllegalArgumentException(ex.getMessage(), ex);
+        }
+
+        if (highRatio <= 0 || highRatio > 1)
+        {
+            throw new IllegalArgumentException("Back-pressure high ratio must be > 0 and <= 1");
+        }
+        if (factor < 1)
+        {
+            throw new IllegalArgumentException("Back-pressure factor must be >= 1");
+        }
+        if (windowSize < 10)
+        {
+            throw new IllegalArgumentException("Back-pressure window size must be >= 10");
+        }
+
+        this.timeSource = timeSource;
+        this.windowSize = windowSize;
+
+        logger.info("Initialized back-pressure with high ratio: {}, factor: {}, flow: {}, window size: {}.",
+                    highRatio, factor, flow, windowSize);
+    }
+
+    @Override
+    public void apply(Set<RateBasedBackPressureState> states, long timeout, TimeUnit unit)
+    {
+        // Go through the back-pressure states, try updating each of them and collect min/max rates:
+        boolean isUpdated = false;
+        double minRateLimit = Double.POSITIVE_INFINITY;
+        double maxRateLimit = Double.NEGATIVE_INFINITY;
+        double minIncomingRate = Double.POSITIVE_INFINITY;
+        RateLimiter currentMin = null;
+        RateLimiter currentMax = null;
+        for (RateBasedBackPressureState backPressure : states)
+        {
+            // Get the incoming/outgoing rates:
+            double incomingRate = backPressure.incomingRate.get(TimeUnit.SECONDS);
+            double outgoingRate = backPressure.outgoingRate.get(TimeUnit.SECONDS);
+            // Compute the min incoming rate:
+            if (incomingRate < minIncomingRate)
+                minIncomingRate = incomingRate;
+
+            // Try acquiring the interval lock:
+            if (backPressure.tryIntervalLock(windowSize))
+            {
+                // If acquired, proceed updating thi back-pressure state rate limit:
+                isUpdated = true;
+                try
+                {
+                    RateLimiter limiter = backPressure.rateLimiter;
+
+                    // If we have sent any outgoing requests during this time window, go ahead with rate limiting
+                    // (this is safe against concurrent back-pressure state updates thanks to the rw-locking in
+                    // RateBasedBackPressureState):
+                    if (outgoingRate > 0)
+                    {
+                        // Compute the incoming/outgoing ratio:
+                        double actualRatio = incomingRate / outgoingRate;
+
+                        // If the ratio is above the high mark, try growing by the back-pressure factor:
+                        if (actualRatio >= highRatio)
+                        {
+                            // Only if the outgoing rate is able to keep up with the rate increase:
+                            if (limiter.getRate() <= outgoingRate)
+                            {
+                                double newRate = limiter.getRate() + ((limiter.getRate() * factor) / 100);
+                                if (newRate > 0 && newRate != Double.POSITIVE_INFINITY)
+                                {
+                                    limiter.setRate(newRate);
+                                }
+                            }
+                        }
+                        // If below, set the rate limiter at the incoming rate, decreased by factor:
+                        else
+                        {
+                            // Only if the new rate is actually less than the actual rate:
+                            double newRate = incomingRate - ((incomingRate * factor) / 100);
+                            if (newRate > 0 && newRate < limiter.getRate())
+                            {
+                                limiter.setRate(newRate);
+                            }
+                        }
+
+                        logger.trace("Back-pressure state for {}: incoming rate {}, outgoing rate {}, ratio {}, rate limiting {}",
+                                     backPressure.getHost(), incomingRate, outgoingRate, actualRatio, limiter.getRate());
+                    }
+                    // Otherwise reset the rate limiter:
+                    else
+                    {
+                        limiter.setRate(Double.POSITIVE_INFINITY);
+                    }
+
+                    // Housekeeping: pruning windows and resetting the last check timestamp!
+                    backPressure.incomingRate.prune();
+                    backPressure.outgoingRate.prune();
+                }
+                finally
+                {
+                    backPressure.releaseIntervalLock();
+                }
+            }
+            if (backPressure.rateLimiter.getRate() <= minRateLimit)
+            {
+                minRateLimit = backPressure.rateLimiter.getRate();
+                currentMin = backPressure.rateLimiter;
+            }
+            if (backPressure.rateLimiter.getRate() >= maxRateLimit)
+            {
+                maxRateLimit = backPressure.rateLimiter.getRate();
+                currentMax = backPressure.rateLimiter;
+            }
+        }
+
+        // Now find the rate limiter corresponding to the replica group represented by these back-pressure states:
+        if (!states.isEmpty())
+        {
+            try
+            {
+                // Get the rate limiter:
+                IntervalRateLimiter rateLimiter = rateLimiters.get(states, () -> new IntervalRateLimiter(timeSource));
+
+                // If the back-pressure was updated and we acquire the interval lock for the rate limiter of this group:
+                if (isUpdated && rateLimiter.tryIntervalLock(windowSize))
+                {
+                    try
+                    {
+                        // Update the rate limiter value based on the configured flow:
+                        if (flow.equals(Flow.FAST))
+                            rateLimiter.limiter = currentMax;
+                        else
+                            rateLimiter.limiter = currentMin;
+
+                        tenSecsNoSpamLogger.info("{} currently applied for remote replicas: {}", rateLimiter.limiter, states);
+                    }
+                    finally
+                    {
+                        rateLimiter.releaseIntervalLock();
+                    }
+                }
+                // Assigning a single rate limiter per replica group once per window size allows the back-pressure rate
+                // limiting to be stable within the group itself.
+
+                // Finally apply the rate limit with a max pause time equal to the provided timeout minus the
+                // response time computed from the incoming rate, to reduce the number of client timeouts by taking into
+                // account how long it could take to process responses after back-pressure:
+                long responseTimeInNanos = (long) (TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS) / minIncomingRate);
+                doRateLimit(rateLimiter.limiter, Math.max(0, TimeUnit.NANOSECONDS.convert(timeout, unit) - responseTimeInNanos));
+            }
+            catch (ExecutionException ex)
+            {
+                throw new IllegalStateException(ex);
+            }
+        }
+    }
+
+    @Override
+    public RateBasedBackPressureState newState(InetAddress host)
+    {
+        return new RateBasedBackPressureState(host, timeSource, windowSize);
+    }
+
+    @VisibleForTesting
+    RateLimiter getRateLimiterForReplicaGroup(Set<RateBasedBackPressureState> states)
+    {
+        IntervalRateLimiter rateLimiter = rateLimiters.getIfPresent(states);
+        return rateLimiter != null ? rateLimiter.limiter : RateLimiter.create(Double.POSITIVE_INFINITY);
+    }
+
+    @VisibleForTesting
+    boolean doRateLimit(RateLimiter rateLimiter, long timeoutInNanos)
+    {
+        if (!rateLimiter.tryAcquire(1, timeoutInNanos, TimeUnit.NANOSECONDS))
+        {
+            timeSource.sleepUninterruptibly(timeoutInNanos, TimeUnit.NANOSECONDS);
+            oneMinNoSpamLogger.info("Cannot apply {} due to exceeding write timeout, pausing {} nanoseconds instead.",
+                                    rateLimiter, timeoutInNanos);
+
+            return false;
+        }
+
+        return true;
+    }
+
+    private static class IntervalRateLimiter extends IntervalLock
+    {
+        public volatile RateLimiter limiter = RateLimiter.create(Double.POSITIVE_INFINITY);
+
+        IntervalRateLimiter(TimeSource timeSource)
+        {
+            super(timeSource);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java b/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java
new file mode 100644
index 0000000..c19f277
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.cassandra.utils.SlidingTimeRate;
+import org.apache.cassandra.utils.TimeSource;
+import org.apache.cassandra.utils.concurrent.IntervalLock;
+
+/**
+ * The rate-based back-pressure state, tracked per replica host.
+ * <br/><br/>
+ *
+ * This back-pressure state is made up of the following attributes:
+ * <ul>
+ * <li>windowSize: the length of the back-pressure window in milliseconds.</li>
+ * <li>incomingRate: the rate of back-pressure supporting incoming messages.</li>
+ * <li>outgoingRate: the rate of back-pressure supporting outgoing messages.</li>
+ * <li>rateLimiter: the rate limiter to eventually apply to outgoing messages.</li>
+ * </ul>
+ * <br/>
+ * The incomingRate and outgoingRate are updated together when a response is received to guarantee consistency between
+ * the two.
+ * <br/>
+ * It also provides methods to exclusively lock/release back-pressure windows at given intervals;
+ * this allows to apply back-pressure even under concurrent modifications. Please also note a read lock is acquired
+ * during response processing so that no concurrent rate updates can screw rate computations.
+ */
+class RateBasedBackPressureState extends IntervalLock implements BackPressureState
+{
+    private final InetAddress host;
+    private final long windowSize;
+    final SlidingTimeRate incomingRate;
+    final SlidingTimeRate outgoingRate;
+    final RateLimiter rateLimiter;
+
+    RateBasedBackPressureState(InetAddress host, TimeSource timeSource, long windowSize)
+    {
+        super(timeSource);
+        this.host = host;
+        this.windowSize = windowSize;
+        this.incomingRate = new SlidingTimeRate(timeSource, this.windowSize, this.windowSize / 10, TimeUnit.MILLISECONDS);
+        this.outgoingRate = new SlidingTimeRate(timeSource, this.windowSize, this.windowSize / 10, TimeUnit.MILLISECONDS);
+        this.rateLimiter = RateLimiter.create(Double.POSITIVE_INFINITY);
+    }
+
+    @Override
+    public void onMessageSent(MessageOut<?> message) {}
+
+    @Override
+    public void onResponseReceived()
+    {
+        readLock().lock();
+        try
+        {
+            incomingRate.update(1);
+            outgoingRate.update(1);
+        }
+        finally
+        {
+            readLock().unlock();
+        }
+    }
+
+    @Override
+    public void onResponseTimeout()
+    {
+        readLock().lock();
+        try
+        {
+            outgoingRate.update(1);
+        }
+        finally
+        {
+            readLock().unlock();
+        }
+    }
+
+    @Override
+    public double getBackPressureRateLimit()
+    {
+        return rateLimiter.getRate();
+    }
+
+    @Override
+    public InetAddress getHost()
+    {
+        return host;
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+        if (obj instanceof RateBasedBackPressureState)
+        {
+            RateBasedBackPressureState other = (RateBasedBackPressureState) obj;
+            return this.host.equals(other.host);
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return this.host.hashCode();
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("[host: %s, incoming rate: %.3f, outgoing rate: %.3f, rate limit: %.3f]",
+                             host, incomingRate.get(TimeUnit.SECONDS), outgoingRate.get(TimeUnit.SECONDS), rateLimiter.getRate());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
index 89e1051..fe22e42 100644
--- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
@@ -52,5 +52,10 @@ public class ResponseVerbHandler implements IVerbHandler
             MessagingService.instance().maybeAddLatency(cb, message.from, latency);
             cb.response(message);
         }
+
+        if (callbackInfo.callback.supportsBackPressure())
+        {
+            MessagingService.instance().updateBackPressureOnReceive(message.from, cb, false);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 7cc854a..8c30b89 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import com.google.common.collect.Iterables;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,6 +54,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
     private volatile int failures = 0;
     private final Map<InetAddress, RequestFailureReason> failureReasonByEndpoint;
     private final long queryStartNanoTime;
+    private volatile boolean supportsBackPressure = true;
 
     /**
      * @param callback A callback to be called when the write is successful.
@@ -78,11 +80,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
 
     public void get() throws WriteTimeoutException, WriteFailureException
     {
-        long requestTimeout = writeType == WriteType.COUNTER
-                            ? DatabaseDescriptor.getCounterWriteRpcTimeout()
-                            : DatabaseDescriptor.getWriteRpcTimeout();
-
-        long timeout = TimeUnit.MILLISECONDS.toNanos(requestTimeout) - (System.nanoTime() - queryStartNanoTime);
+        long timeout = currentTimeout();
 
         boolean success;
         try
@@ -112,6 +110,14 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
         }
     }
 
+    public final long currentTimeout()
+    {
+        long requestTimeout = writeType == WriteType.COUNTER
+                              ? DatabaseDescriptor.getCounterWriteRpcTimeout()
+                              : DatabaseDescriptor.getWriteRpcTimeout();
+        return TimeUnit.MILLISECONDS.toNanos(requestTimeout) - (System.nanoTime() - queryStartNanoTime);
+    }
+
     /**
      * @return the minimum number of endpoints that must reply.
      */
@@ -172,4 +178,15 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
         if (totalBlockFor() + n > totalEndpoints())
             signal();
     }
+
+    @Override
+    public boolean supportsBackPressure()
+    {
+        return supportsBackPressure;
+    }
+
+    public void setSupportsBackPressure(boolean supportsBackPressure)
+    {
+        this.supportsBackPressure = supportsBackPressure;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index ca8a9c6..241aa4f 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -25,6 +25,7 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
@@ -33,7 +34,9 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.collect.*;
 import com.google.common.primitives.Ints;
 import com.google.common.util.concurrent.Uninterruptibles;
+
 import org.apache.commons.lang3.StringUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -527,6 +530,7 @@ public class StorageProxy implements StorageProxyMBean
         {
             AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
             responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistencyLevel, null, WriteType.SIMPLE, queryStartNanoTime);
+            responseHandler.setSupportsBackPressure(false);
         }
 
         MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT, proposal, Commit.serializer);
@@ -1224,6 +1228,10 @@ public class StorageProxy implements StorageProxyMBean
                                              Stage stage)
     throws OverloadedException
     {
+        int targetsSize = Iterables.size(targets);
+
+        // this dc replicas:
+        Collection<InetAddress> localDc = null;
         // extra-datacenter replicas, grouped by dc
         Map<String, Collection<InetAddress>> dcGroups = null;
         // only need to create a Message for non-local writes
@@ -1232,6 +1240,8 @@ public class StorageProxy implements StorageProxyMBean
         boolean insertLocal = false;
         ArrayList<InetAddress> endpointsToHint = null;
 
+        List<InetAddress> backPressureHosts = null;
+
         for (InetAddress destination : targets)
         {
             checkHintOverload(destination);
@@ -1247,12 +1257,17 @@ public class StorageProxy implements StorageProxyMBean
                     // belongs on a different server
                     if (message == null)
                         message = mutation.createMessage();
+
                     String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
+
                     // direct writes to local DC or old Cassandra versions
                     // (1.1 knows how to forward old-style String message IDs; updated to int in 2.0)
                     if (localDataCenter.equals(dc))
                     {
-                        MessagingService.instance().sendRR(message, destination, responseHandler, true);
+                        if (localDc == null)
+                            localDc = new ArrayList<>(targetsSize);
+
+                        localDc.add(destination);
                     }
                     else
                     {
@@ -1264,8 +1279,14 @@ public class StorageProxy implements StorageProxyMBean
                                 dcGroups = new HashMap<>();
                             dcGroups.put(dc, messages);
                         }
+
                         messages.add(destination);
                     }
+
+                    if (backPressureHosts == null)
+                        backPressureHosts = new ArrayList<>(targetsSize);
+
+                    backPressureHosts.add(destination);
                 }
             }
             else
@@ -1273,24 +1294,30 @@ public class StorageProxy implements StorageProxyMBean
                 if (shouldHint(destination))
                 {
                     if (endpointsToHint == null)
-                        endpointsToHint = new ArrayList<>(Iterables.size(targets));
+                        endpointsToHint = new ArrayList<>(targetsSize);
+
                     endpointsToHint.add(destination);
                 }
             }
         }
 
+        if (backPressureHosts != null)
+            MessagingService.instance().applyBackPressure(backPressureHosts, responseHandler.currentTimeout());
+
         if (endpointsToHint != null)
             submitHint(mutation, endpointsToHint, responseHandler);
 
         if (insertLocal)
             performLocally(stage, Optional.of(mutation), mutation::apply, responseHandler);
 
+        if (localDc != null)
+        {
+            for (InetAddress destination : localDc)
+                MessagingService.instance().sendRR(message, destination, responseHandler, true);
+        }
         if (dcGroups != null)
         {
             // for each datacenter, send the message to one node to relay the write to other replicas
-            if (message == null)
-                message = mutation.createMessage();
-
             for (Collection<InetAddress> dcTargets : dcGroups.values())
                 sendMessagesToNonlocalDC(message, dcTargets, responseHandler);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/utils/SlidingTimeRate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/SlidingTimeRate.java b/src/java/org/apache/cassandra/utils/SlidingTimeRate.java
new file mode 100644
index 0000000..3053a05
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/SlidingTimeRate.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * Concurrent rate computation over a sliding time window.
+ */
+public class SlidingTimeRate
+{
+    private final ConcurrentSkipListMap<Long, AtomicInteger> counters = new ConcurrentSkipListMap<>();
+    private final AtomicLong lastCounterTimestamp = new AtomicLong(0);
+    private final ReadWriteLock pruneLock = new ReentrantReadWriteLock();
+    private final long sizeInMillis;
+    private final long precisionInMillis;
+    private final TimeSource timeSource;
+
+    /**
+     * Creates a sliding rate whose time window is of the given size, with the given precision and time unit.
+     * <br/>
+     * The precision defines how accurate the rate computation is, as it will be computed over window size +/-
+     * precision.
+     */
+    public SlidingTimeRate(TimeSource timeSource, long size, long precision, TimeUnit unit)
+    {
+        Preconditions.checkArgument(size > precision, "Size should be greater than precision.");
+        Preconditions.checkArgument(TimeUnit.MILLISECONDS.convert(precision, unit) >= 1, "Precision must be greater than or equal to 1 millisecond.");
+        this.sizeInMillis = TimeUnit.MILLISECONDS.convert(size, unit);
+        this.precisionInMillis = TimeUnit.MILLISECONDS.convert(precision, unit);
+        this.timeSource = timeSource;
+    }
+
+    /**
+     * Updates the rate.
+     */
+    public void update(int delta)
+    {
+        pruneLock.readLock().lock();
+        try
+        {
+            while (true)
+            {
+                long now = timeSource.currentTimeMillis();
+                long lastTimestamp = lastCounterTimestamp.get();
+                boolean isWithinPrecisionRange = (now - lastTimestamp) < precisionInMillis;
+                AtomicInteger lastCounter = counters.get(lastTimestamp);
+                // If there's a valid counter for the current last timestamp, and we're in the precision range,
+                // update such counter:
+                if (lastCounter != null && isWithinPrecisionRange)
+                {
+                    lastCounter.addAndGet(delta);
+
+                    break;
+                }
+                // Else if there's no counter or we're past the precision range, try to create a new counter,
+                // but only the thread updating the last timestamp will create a new counter:
+                else if (lastCounterTimestamp.compareAndSet(lastTimestamp, now))
+                {
+                    AtomicInteger existing = counters.putIfAbsent(now, new AtomicInteger(delta));
+                    if (existing != null)
+                    {
+                        existing.addAndGet(delta);
+                    }
+
+                    break;
+                }
+            }
+        }
+        finally
+        {
+            pruneLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Gets the current rate in the given time unit from the beginning of the time window to the
+     * provided point in time ago.
+     */
+    public double get(long toAgo, TimeUnit unit)
+    {
+        pruneLock.readLock().lock();
+        try
+        {
+            long toAgoInMillis = TimeUnit.MILLISECONDS.convert(toAgo, unit);
+            Preconditions.checkArgument(toAgoInMillis < sizeInMillis, "Cannot get rate in the past!");
+
+            long now = timeSource.currentTimeMillis();
+            long sum = 0;
+            ConcurrentNavigableMap<Long, AtomicInteger> tailCounters = counters
+                    .tailMap(now - sizeInMillis, true)
+                    .headMap(now - toAgoInMillis, true);
+            for (AtomicInteger i : tailCounters.values())
+            {
+                sum += i.get();
+            }
+
+            double rateInMillis = sum == 0
+                                  ? sum
+                                  : sum / (double) Math.max(1000, (now - toAgoInMillis) - tailCounters.firstKey());
+            double multiplier = TimeUnit.MILLISECONDS.convert(1, unit);
+            return rateInMillis * multiplier;
+        }
+        finally
+        {
+            pruneLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Gets the current rate in the given time unit.
+     */
+    public double get(TimeUnit unit)
+    {
+        return get(0, unit);
+    }
+
+    /**
+     * Prunes the time window of old unused updates.
+     */
+    public void prune()
+    {
+        pruneLock.writeLock().lock();
+        try
+        {
+            long now = timeSource.currentTimeMillis();
+            counters.headMap(now - sizeInMillis, false).clear();
+        }
+        finally
+        {
+            pruneLock.writeLock().unlock();
+        }
+    }
+
+    @VisibleForTesting
+    public int size()
+    {
+        return counters.values().stream().reduce(new AtomicInteger(), (v1, v2) -> {
+            v1.addAndGet(v2.get());
+            return v1;
+        }).get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/utils/SystemTimeSource.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/SystemTimeSource.java b/src/java/org/apache/cassandra/utils/SystemTimeSource.java
new file mode 100644
index 0000000..fef525e
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/SystemTimeSource.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+
+/**
+ * Time source backed by JVM clock.
+ */
+public class SystemTimeSource implements TimeSource
+{
+    @Override
+    public long currentTimeMillis()
+    {
+        return System.currentTimeMillis();
+    }
+
+    @Override
+    public long nanoTime()
+    {
+        return System.nanoTime();
+    }
+
+    @Override
+    public TimeSource sleepUninterruptibly(long sleepFor, TimeUnit unit)
+    {
+        Uninterruptibles.sleepUninterruptibly(sleepFor, unit);
+        return this;
+    }
+
+    @Override
+    public TimeSource sleep(long sleepFor, TimeUnit unit) throws InterruptedException
+    {
+        TimeUnit.NANOSECONDS.sleep(TimeUnit.NANOSECONDS.convert(sleepFor, unit));
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/utils/TestRateLimiter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/TestRateLimiter.java b/src/java/org/apache/cassandra/utils/TestRateLimiter.java
new file mode 100644
index 0000000..a9eb871
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/TestRateLimiter.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.jboss.byteman.rule.Rule;
+import org.jboss.byteman.rule.helper.Helper;
+
+/**
+ * Helper class to apply rate limiting during fault injection testing;
+ * for an example script, see test/resources/byteman/mutation_limiter.btm.
+ */
+@VisibleForTesting
+public class TestRateLimiter extends Helper
+{
+    private static final AtomicReference<RateLimiter> ref = new AtomicReference<>();
+
+    protected TestRateLimiter(Rule rule)
+    {
+        super(rule);
+    }
+
+    /**
+     * Acquires a single unit at the given rate. If the rate changes between calls, a new rate limiter is created
+     * and the old one is discarded.
+     */
+    public void acquire(double rate)
+    {
+        RateLimiter limiter = ref.get();
+        if (limiter == null || limiter.getRate() != rate)
+        {
+            ref.compareAndSet(limiter, RateLimiter.create(rate));
+            limiter = ref.get();
+        }
+        limiter.acquire(1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/utils/TimeSource.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/TimeSource.java b/src/java/org/apache/cassandra/utils/TimeSource.java
new file mode 100644
index 0000000..5d8acec
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/TimeSource.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.TimeUnit;
+
+public interface TimeSource
+{
+    /**
+     *
+     * @return the current time in milliseconds
+     */
+    long currentTimeMillis();
+
+    /**
+     *
+     * @return Returns the current time value in nanoseconds.
+     *
+     * <p>This method can only be used to measure elapsed time and is
+     * not related to any other notion of system or wall-clock time.
+     */
+    long nanoTime();
+
+    /**
+     * Sleep for the given amount of time uninterruptibly.
+     *
+     * @param  sleepFor given amout.
+     * @param  unit time unit
+     * @return The time source itself after the given sleep period.
+     */
+    TimeSource sleepUninterruptibly(long sleepFor, TimeUnit unit);
+
+    /**
+     * Sleep for the given amount of time. This operation could interrupted.
+     * Hence after returning from this method, it is not guaranteed
+     * that the request amount of time has passed.
+     *
+     * @param  sleepFor given amout.
+     * @param  unit time unit
+     * @return The time source itself after the given sleep period.
+     */
+    TimeSource sleep(long sleepFor, TimeUnit unit) throws InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/utils/concurrent/IntervalLock.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/IntervalLock.java b/src/java/org/apache/cassandra/utils/concurrent/IntervalLock.java
new file mode 100644
index 0000000..382a2dc
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/concurrent/IntervalLock.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.concurrent;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.utils.TimeSource;
+
+/**
+ * This class extends ReentrantReadWriteLock to provide a write lock that can only be acquired at provided intervals.
+ */
+public class IntervalLock extends ReentrantReadWriteLock
+{
+    private final AtomicLong lastAcquire = new AtomicLong();
+    private final TimeSource timeSource;
+
+    public IntervalLock(TimeSource timeSource)
+    {
+        this.timeSource = timeSource;
+    }
+
+    /**
+     * Try acquiring a write lock if the given interval is passed since the last call to this method.
+     *
+     * @param interval In millis.
+     * @return True if acquired and locked, false otherwise.
+     */
+    public boolean tryIntervalLock(long interval)
+    {
+        long now = timeSource.currentTimeMillis();
+        boolean acquired = (now - lastAcquire.get() >= interval) && writeLock().tryLock();
+        if (acquired)
+            lastAcquire.set(now);
+
+        return acquired;
+    }
+
+    /**
+     * Release the last acquired interval lock.
+     */
+    public void releaseIntervalLock()
+    {
+        writeLock().unlock();
+    }
+
+    @VisibleForTesting
+    public long getLastIntervalAcquire()
+    {
+        return lastAcquire.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/test/resources/byteman/mutation_limiter.btm
----------------------------------------------------------------------
diff --git a/test/resources/byteman/mutation_limiter.btm b/test/resources/byteman/mutation_limiter.btm
new file mode 100644
index 0000000..ca936fc
--- /dev/null
+++ b/test/resources/byteman/mutation_limiter.btm
@@ -0,0 +1,8 @@
+RULE mutation_limiter
+CLASS org.apache.cassandra.db.MutationVerbHandler
+METHOD doVerb
+HELPER org.apache.cassandra.utils.TestRateLimiter
+AT ENTRY
+IF TRUE
+DO acquire(1000.0)
+ENDRULE

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index 63c44e0..2dcfbd1 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -101,6 +101,7 @@ public class DatabaseDescriptorRefTest
     "org.apache.cassandra.io.util.DiskOptimizationStrategy",
     "org.apache.cassandra.locator.SimpleSeedProvider",
     "org.apache.cassandra.locator.SeedProvider",
+    "org.apache.cassandra.net.BackPressureStrategy",
     "org.apache.cassandra.scheduler.IRequestScheduler",
     "org.apache.cassandra.security.EncryptionContext",
     "org.apache.cassandra.service.CacheService$CacheType",

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
index 0e0e4ba..2a3ecbe 100644
--- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
+++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
@@ -24,34 +24,53 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.Iterables;
+
 import com.codahale.metrics.Timer;
 
 import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
 import org.caffinitas.ohc.histo.EstimatedHistogram;
+
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
-
 import org.apache.cassandra.config.DatabaseDescriptor;
 
+import static org.junit.Assert.*;
+
 public class MessagingServiceTest
 {
+    private final static long ONE_SECOND = TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS);
+    private final static long[] bucketOffsets = new EstimatedHistogram(160).getBucketOffsets();
+    private final MessagingService messagingService = MessagingService.test();
+
     @BeforeClass
-    public static void initDD()
+    public static void beforeClass() throws UnknownHostException
     {
         DatabaseDescriptor.daemonInitialization();
+        DatabaseDescriptor.setBackPressureStrategy(new MockBackPressureStrategy(Collections.emptyMap()));
+        DatabaseDescriptor.setBroadcastAddress(InetAddress.getByName("127.0.0.1"));
     }
 
-    private final MessagingService messagingService = MessagingService.test();
-    private final static long[] bucketOffsets = new EstimatedHistogram(160).getBucketOffsets();
+    @Before
+    public void before() throws UnknownHostException
+    {
+        MockBackPressureStrategy.applied = false;
+        messagingService.destroyConnectionPool(InetAddress.getByName("127.0.0.2"));
+        messagingService.destroyConnectionPool(InetAddress.getByName("127.0.0.3"));
+    }
 
     @Test
     public void testDroppedMessages()
@@ -77,17 +96,6 @@ public class MessagingServiceTest
         assertEquals(7500, (int)messagingService.getDroppedMessages().get(verb.toString()));
     }
 
-    private static void addDCLatency(long sentAt, long now) throws IOException
-    {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        try (DataOutputStreamPlus out = new WrappedDataOutputStreamPlus(baos))
-        {
-            out.writeInt((int) sentAt);
-        }
-        DataInputStreamPlus in = new DataInputStreamPlus(new ByteArrayInputStream(baos.toByteArray()));
-        MessageIn.readTimestamp(InetAddress.getLocalHost(), in, now);
-    }
-
     @Test
     public void testDCLatency() throws Exception
     {
@@ -123,4 +131,211 @@ public class MessagingServiceTest
         addDCLatency(sentAt, now);
         assertNull(dcLatency.get("datacenter1"));
     }
+
+    @Test
+    public void testUpdatesBackPressureOnSendWhenEnabledAndWithSupportedCallback() throws UnknownHostException
+    {
+        MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getConnectionPool(InetAddress.getByName("127.0.0.2")).getBackPressureState();
+        IAsyncCallback bpCallback = new BackPressureCallback();
+        IAsyncCallback noCallback = new NoBackPressureCallback();
+        MessageOut<?> ignored = null;
+
+        DatabaseDescriptor.setBackPressureEnabled(true);
+        messagingService.updateBackPressureOnSend(InetAddress.getByName("127.0.0.2"), noCallback, ignored);
+        assertFalse(backPressureState.onSend);
+
+        DatabaseDescriptor.setBackPressureEnabled(false);
+        messagingService.updateBackPressureOnSend(InetAddress.getByName("127.0.0.2"), bpCallback, ignored);
+        assertFalse(backPressureState.onSend);
+
+        DatabaseDescriptor.setBackPressureEnabled(true);
+        messagingService.updateBackPressureOnSend(InetAddress.getByName("127.0.0.2"), bpCallback, ignored);
+        assertTrue(backPressureState.onSend);
+    }
+
+    @Test
+    public void testUpdatesBackPressureOnReceiveWhenEnabledAndWithSupportedCallback() throws UnknownHostException
+    {
+        MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getConnectionPool(InetAddress.getByName("127.0.0.2")).getBackPressureState();
+        IAsyncCallback bpCallback = new BackPressureCallback();
+        IAsyncCallback noCallback = new NoBackPressureCallback();
+        boolean timeout = false;
+
+        DatabaseDescriptor.setBackPressureEnabled(true);
+        messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), noCallback, timeout);
+        assertFalse(backPressureState.onReceive);
+        assertFalse(backPressureState.onTimeout);
+
+        DatabaseDescriptor.setBackPressureEnabled(false);
+        messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), bpCallback, timeout);
+        assertFalse(backPressureState.onReceive);
+        assertFalse(backPressureState.onTimeout);
+
+        DatabaseDescriptor.setBackPressureEnabled(true);
+        messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), bpCallback, timeout);
+        assertTrue(backPressureState.onReceive);
+        assertFalse(backPressureState.onTimeout);
+    }
+
+    @Test
+    public void testUpdatesBackPressureOnTimeoutWhenEnabledAndWithSupportedCallback() throws UnknownHostException
+    {
+        MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getConnectionPool(InetAddress.getByName("127.0.0.2")).getBackPressureState();
+        IAsyncCallback bpCallback = new BackPressureCallback();
+        IAsyncCallback noCallback = new NoBackPressureCallback();
+        boolean timeout = true;
+
+        DatabaseDescriptor.setBackPressureEnabled(true);
+        messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), noCallback, timeout);
+        assertFalse(backPressureState.onReceive);
+        assertFalse(backPressureState.onTimeout);
+
+        DatabaseDescriptor.setBackPressureEnabled(false);
+        messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), bpCallback, timeout);
+        assertFalse(backPressureState.onReceive);
+        assertFalse(backPressureState.onTimeout);
+
+        DatabaseDescriptor.setBackPressureEnabled(true);
+        messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), bpCallback, timeout);
+        assertFalse(backPressureState.onReceive);
+        assertTrue(backPressureState.onTimeout);
+    }
+
+    @Test
+    public void testAppliesBackPressureWhenEnabled() throws UnknownHostException
+    {
+        DatabaseDescriptor.setBackPressureEnabled(false);
+        messagingService.applyBackPressure(Arrays.asList(InetAddress.getByName("127.0.0.2")), ONE_SECOND);
+        assertFalse(MockBackPressureStrategy.applied);
+
+        DatabaseDescriptor.setBackPressureEnabled(true);
+        messagingService.applyBackPressure(Arrays.asList(InetAddress.getByName("127.0.0.2")), ONE_SECOND);
+        assertTrue(MockBackPressureStrategy.applied);
+    }
+
+    @Test
+    public void testDoesntApplyBackPressureToBroadcastAddress() throws UnknownHostException
+    {
+        DatabaseDescriptor.setBackPressureEnabled(true);
+        messagingService.applyBackPressure(Arrays.asList(InetAddress.getByName("127.0.0.1")), ONE_SECOND);
+        assertFalse(MockBackPressureStrategy.applied);
+    }
+
+    private static void addDCLatency(long sentAt, long now) throws IOException
+    {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (DataOutputStreamPlus out = new WrappedDataOutputStreamPlus(baos))
+        {
+            out.writeInt((int) sentAt);
+        }
+        DataInputStreamPlus in = new DataInputStreamPlus(new ByteArrayInputStream(baos.toByteArray()));
+        MessageIn.readTimestamp(InetAddress.getLocalHost(), in, now);
+    }
+
+    public static class MockBackPressureStrategy implements BackPressureStrategy<MockBackPressureStrategy.MockBackPressureState>
+    {
+        public static volatile boolean applied = false;
+
+        public MockBackPressureStrategy(Map<String, Object> args)
+        {
+        }
+
+        @Override
+        public void apply(Set<MockBackPressureState> states, long timeout, TimeUnit unit)
+        {
+            if (!Iterables.isEmpty(states))
+                applied = true;
+        }
+
+        @Override
+        public MockBackPressureState newState(InetAddress host)
+        {
+            return new MockBackPressureState(host);
+        }
+
+        public static class MockBackPressureState implements BackPressureState
+        {
+            private final InetAddress host;
+            public volatile boolean onSend = false;
+            public volatile boolean onReceive = false;
+            public volatile boolean onTimeout = false;
+
+            private MockBackPressureState(InetAddress host)
+            {
+                this.host = host;
+            }
+
+            @Override
+            public void onMessageSent(MessageOut<?> message)
+            {
+                onSend = true;
+            }
+
+            @Override
+            public void onResponseReceived()
+            {
+                onReceive = true;
+            }
+
+            @Override
+            public void onResponseTimeout()
+            {
+                onTimeout = true;
+            }
+
+            @Override
+            public double getBackPressureRateLimit()
+            {
+                throw new UnsupportedOperationException("Not supported yet.");
+            }
+
+            @Override
+            public InetAddress getHost()
+            {
+                return host;
+            }
+        }
+    }
+
+    private static class BackPressureCallback implements IAsyncCallback
+    {
+        @Override
+        public boolean supportsBackPressure()
+        {
+            return true;
+        }
+
+        @Override
+        public boolean isLatencyForSnitch()
+        {
+            return false;
+        }
+
+        @Override
+        public void response(MessageIn msg)
+        {
+            throw new UnsupportedOperationException("Not supported.");
+        }
+    }
+
+    private static class NoBackPressureCallback implements IAsyncCallback
+    {
+        @Override
+        public boolean supportsBackPressure()
+        {
+            return false;
+        }
+
+        @Override
+        public boolean isLatencyForSnitch()
+        {
+            return false;
+        }
+
+        @Override
+        public void response(MessageIn msg)
+        {
+            throw new UnsupportedOperationException("Not supported.");
+        }
+    }
 }