You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by st...@apache.org on 2016/09/20 01:08:45 UTC
[1/2] cassandra git commit: Support optional backpressure strategies
at the coordinator
Repository: cassandra
Updated Branches:
refs/heads/trunk 560faba2f -> d43b9ce50
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java b/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java
new file mode 100644
index 0000000..b94b6ee
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/RateBasedBackPressureTest.java
@@ -0,0 +1,409 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.utils.TestTimeSource;
+import org.apache.cassandra.utils.TimeSource;
+
+import static org.apache.cassandra.net.RateBasedBackPressure.FACTOR;
+import static org.apache.cassandra.net.RateBasedBackPressure.FLOW;
+import static org.apache.cassandra.net.RateBasedBackPressure.HIGH_RATIO;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class RateBasedBackPressureTest
+{
+ @Test(expected = IllegalArgumentException.class)
+ public void testAcceptsNoLessThanThreeArguments() throws Exception
+ {
+ new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "1"), new TestTimeSource(), 10);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testHighRatioMustBeBiggerThanZero() throws Exception
+ {
+ new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0", FACTOR, "2", FLOW, "FAST"), new TestTimeSource(), 10);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testHighRatioMustBeSmallerEqualThanOne() throws Exception
+ {
+ new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "2", FACTOR, "2", FLOW, "FAST"), new TestTimeSource(), 10);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testFactorMustBeBiggerEqualThanOne() throws Exception
+ {
+ new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "0", FLOW, "FAST"), new TestTimeSource(), 10);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testWindowSizeMustBeBiggerEqualThanTen() throws Exception
+ {
+ new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "5", FLOW, "FAST"), new TestTimeSource(), 1);
+ }
+
+ @Test
+ public void testFlowMustBeEitherFASTorSLOW() throws Exception
+ {
+ new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "1", FLOW, "FAST"), new TestTimeSource(), 10);
+ new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "1", FLOW, "SLOW"), new TestTimeSource(), 10);
+ try
+ {
+ new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "1", FLOW, "WRONG"), new TestTimeSource(), 10);
+ fail("Expected to fail with wrong flow type.");
+ }
+ catch (Exception ex)
+ {
+ }
+ }
+
+ @Test
+ public void testBackPressureStateUpdates()
+ {
+ long windowSize = 6000;
+ TestTimeSource timeSource = new TestTimeSource();
+ RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
+
+ RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress());
+ state.onMessageSent(null);
+ assertEquals(0, state.incomingRate.size());
+ assertEquals(0, state.outgoingRate.size());
+
+ state = strategy.newState(InetAddress.getLoopbackAddress());
+ state.onResponseReceived();
+ assertEquals(1, state.incomingRate.size());
+ assertEquals(1, state.outgoingRate.size());
+
+ state = strategy.newState(InetAddress.getLoopbackAddress());
+ state.onResponseTimeout();
+ assertEquals(0, state.incomingRate.size());
+ assertEquals(1, state.outgoingRate.size());
+ }
+
+ @Test
+ public void testBackPressureIsNotUpdatedBeyondInfinity() throws Exception
+ {
+ long windowSize = 6000;
+ TestTimeSource timeSource = new TestTimeSource();
+ RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
+ RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress());
+
+ // Get initial rate:
+ double initialRate = state.rateLimiter.getRate();
+ assertEquals(Double.POSITIVE_INFINITY, initialRate, 0.0);
+
+ // Update incoming and outgoing rate equally:
+ state.incomingRate.update(1);
+ state.outgoingRate.update(1);
+
+ // Move time ahead:
+ timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
+
+ // Verify the rate doesn't change because already at infinity:
+ strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
+ assertEquals(initialRate, state.rateLimiter.getRate(), 0.0);
+ }
+
+ @Test
+ public void testBackPressureIsUpdatedOncePerWindowSize() throws Exception
+ {
+ long windowSize = 6000;
+ TestTimeSource timeSource = new TestTimeSource();
+ RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
+ RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress());
+
+ // Get initial time:
+ long current = state.getLastIntervalAcquire();
+ assertEquals(0, current);
+
+ // Update incoming and outgoing rate:
+ state.incomingRate.update(1);
+ state.outgoingRate.update(1);
+
+ // Move time ahead by window size:
+ timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
+
+ // Verify the timestamp changed:
+ strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
+ current = state.getLastIntervalAcquire();
+ assertEquals(timeSource.currentTimeMillis(), current);
+
+ // Move time ahead by less than interval:
+ long previous = current;
+ timeSource.sleep(windowSize / 2, TimeUnit.MILLISECONDS);
+
+ // Verify the last timestamp didn't change because below the window size:
+ strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
+ current = state.getLastIntervalAcquire();
+ assertEquals(previous, current);
+ }
+
+ @Test
+ public void testBackPressureWhenBelowHighRatio() throws Exception
+ {
+ long windowSize = 6000;
+ TestTimeSource timeSource = new TestTimeSource();
+ RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
+ RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress());
+
+ // Update incoming and outgoing rate so that the ratio is 0.5:
+ state.incomingRate.update(50);
+ state.outgoingRate.update(100);
+
+ // Move time ahead:
+ timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
+
+ // Verify the rate is decreased by factor:
+ strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
+ assertEquals(7.4, state.rateLimiter.getRate(), 0.1);
+ }
+
+ @Test
+ public void testBackPressureRateLimiterIsIncreasedAfterGoingAgainAboveHighRatio() throws Exception
+ {
+ long windowSize = 6000;
+ TestTimeSource timeSource = new TestTimeSource();
+ RateBasedBackPressure strategy = new RateBasedBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
+ RateBasedBackPressureState state = strategy.newState(InetAddress.getLoopbackAddress());
+
+ // Update incoming and outgoing rate so that the ratio is 0.5:
+ state.incomingRate.update(50);
+ state.outgoingRate.update(100);
+
+ // Move time ahead:
+ timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
+
+ // Verify the rate decreased:
+ strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
+ assertEquals(7.4, state.rateLimiter.getRate(), 0.1);
+
+ // Update incoming and outgoing rate back above high rate:
+ state.incomingRate.update(50);
+ state.outgoingRate.update(50);
+
+ // Move time ahead:
+ timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
+
+ // Verify rate limiter is increased by factor:
+ strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
+ assertEquals(8.25, state.rateLimiter.getRate(), 0.1);
+
+ // Update incoming and outgoing rate to keep it below the limiter rate:
+ state.incomingRate.update(1);
+ state.outgoingRate.update(1);
+
+ // Move time ahead:
+ timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
+
+ // Verify rate limiter is not increased as already higher than the actual rate:
+ strategy.apply(Sets.newHashSet(state), 1, TimeUnit.SECONDS);
+ assertEquals(8.25, state.rateLimiter.getRate(), 0.1);
+ }
+
+ @Test
+ public void testBackPressureFastFlow() throws Exception
+ {
+ long windowSize = 6000;
+ TestTimeSource timeSource = new TestTimeSource();
+ TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "FAST"), timeSource, windowSize);
+ RateBasedBackPressureState state1 = strategy.newState(InetAddress.getByName("127.0.0.1"));
+ RateBasedBackPressureState state2 = strategy.newState(InetAddress.getByName("127.0.0.2"));
+ RateBasedBackPressureState state3 = strategy.newState(InetAddress.getByName("127.0.0.3"));
+
+ // Update incoming and outgoing rates:
+ state1.incomingRate.update(50);
+ state1.outgoingRate.update(100);
+ state2.incomingRate.update(80); // fast
+ state2.outgoingRate.update(100);
+ state3.incomingRate.update(20);
+ state3.outgoingRate.update(100);
+
+ // Move time ahead:
+ timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
+
+ // Verify the fast replica rate limiting has been applied:
+ Set<RateBasedBackPressureState> replicaGroup = Sets.newHashSet(state1, state2, state3);
+ strategy.apply(replicaGroup, 1, TimeUnit.SECONDS);
+ assertTrue(strategy.checkAcquired());
+ assertTrue(strategy.checkApplied());
+ assertEquals(12.0, strategy.getRateLimiterForReplicaGroup(replicaGroup).getRate(), 0.1);
+ }
+
+ @Test
+ public void testBackPressureSlowFlow() throws Exception
+ {
+ long windowSize = 6000;
+ TestTimeSource timeSource = new TestTimeSource();
+ TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "SLOW"), timeSource, windowSize);
+ RateBasedBackPressureState state1 = strategy.newState(InetAddress.getByName("127.0.0.1"));
+ RateBasedBackPressureState state2 = strategy.newState(InetAddress.getByName("127.0.0.2"));
+ RateBasedBackPressureState state3 = strategy.newState(InetAddress.getByName("127.0.0.3"));
+
+ // Update incoming and outgoing rates:
+ state1.incomingRate.update(50);
+ state1.outgoingRate.update(100);
+ state2.incomingRate.update(100);
+ state2.outgoingRate.update(100);
+ state3.incomingRate.update(20); // slow
+ state3.outgoingRate.update(100);
+
+ // Move time ahead:
+ timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
+
+ // Verify the slow replica rate limiting has been applied:
+ Set<RateBasedBackPressureState> replicaGroup = Sets.newHashSet(state1, state2, state3);
+ strategy.apply(replicaGroup, 1, TimeUnit.SECONDS);
+ assertTrue(strategy.checkAcquired());
+ assertTrue(strategy.checkApplied());
+ assertEquals(3.0, strategy.getRateLimiterForReplicaGroup(replicaGroup).getRate(), 0.1);
+ }
+
+ @Test
+ public void testBackPressureWithDifferentGroups() throws Exception
+ {
+ long windowSize = 6000;
+ TestTimeSource timeSource = new TestTimeSource();
+ TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "SLOW"), timeSource, windowSize);
+ RateBasedBackPressureState state1 = strategy.newState(InetAddress.getByName("127.0.0.1"));
+ RateBasedBackPressureState state2 = strategy.newState(InetAddress.getByName("127.0.0.2"));
+ RateBasedBackPressureState state3 = strategy.newState(InetAddress.getByName("127.0.0.3"));
+ RateBasedBackPressureState state4 = strategy.newState(InetAddress.getByName("127.0.0.4"));
+
+ // Update incoming and outgoing rates:
+ state1.incomingRate.update(50); // this
+ state1.outgoingRate.update(100);
+ state2.incomingRate.update(100);
+ state2.outgoingRate.update(100);
+ state3.incomingRate.update(20); // this
+ state3.outgoingRate.update(100);
+ state4.incomingRate.update(80);
+ state4.outgoingRate.update(100);
+
+ // Move time ahead:
+ timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
+
+ // Verify the first group:
+ Set<RateBasedBackPressureState> replicaGroup = Sets.newHashSet(state1, state2);
+ strategy.apply(replicaGroup, 1, TimeUnit.SECONDS);
+ assertTrue(strategy.checkAcquired());
+ assertTrue(strategy.checkApplied());
+ assertEquals(7.4, strategy.getRateLimiterForReplicaGroup(replicaGroup).getRate(), 0.1);
+
+ // Verify the second group:
+ replicaGroup = Sets.newHashSet(state3, state4);
+ strategy.apply(replicaGroup, 1, TimeUnit.SECONDS);
+ assertTrue(strategy.checkAcquired());
+ assertTrue(strategy.checkApplied());
+ assertEquals(3.0, strategy.getRateLimiterForReplicaGroup(replicaGroup).getRate(), 0.1);
+ }
+
+ @Test
+ public void testBackPressurePastTimeout() throws Exception
+ {
+ long windowSize = 10000;
+ TestTimeSource timeSource = new TestTimeSource();
+ TestableBackPressure strategy = new TestableBackPressure(ImmutableMap.of(HIGH_RATIO, "0.9", FACTOR, "10", FLOW, "SLOW"), timeSource, windowSize);
+ RateBasedBackPressureState state1 = strategy.newState(InetAddress.getByName("127.0.0.1"));
+ RateBasedBackPressureState state2 = strategy.newState(InetAddress.getByName("127.0.0.2"));
+ RateBasedBackPressureState state3 = strategy.newState(InetAddress.getByName("127.0.0.3"));
+
+ // Update incoming and outgoing rates:
+ state1.incomingRate.update(5); // slow
+ state1.outgoingRate.update(100);
+ state2.incomingRate.update(100);
+ state2.outgoingRate.update(100);
+ state3.incomingRate.update(100);
+ state3.outgoingRate.update(100);
+
+ // Move time ahead:
+ timeSource.sleep(windowSize, TimeUnit.MILLISECONDS);
+
+ // Verify the slow replica rate limiting has been applied:
+ Set<RateBasedBackPressureState> replicaGroup = Sets.newHashSet(state1, state2, state3);
+ strategy.apply(replicaGroup, 4, TimeUnit.SECONDS);
+ assertTrue(strategy.checkAcquired());
+ assertTrue(strategy.checkApplied());
+ assertEquals(0.5, strategy.getRateLimiterForReplicaGroup(replicaGroup).getRate(), 0.1);
+
+ // Make one more apply call to saturate the rate limit timeout (0.5 requests per second means 2 requests span
+ // 4 seconds, but we can only make one as we have to subtract the incoming response time):
+ strategy.apply(replicaGroup, 4, TimeUnit.SECONDS);
+
+ // Now verify another call to apply doesn't acquire the rate limit because of the max timeout of 4 seconds minus
+ // 2 seconds of response time, so the time source itself sleeps two second:
+ long start = timeSource.currentTimeMillis();
+ strategy.apply(replicaGroup, 4, TimeUnit.SECONDS);
+ assertFalse(strategy.checkAcquired());
+ assertTrue(strategy.checkApplied());
+ assertEquals(TimeUnit.NANOSECONDS.convert(2, TimeUnit.SECONDS),
+ strategy.timeout);
+ assertEquals(strategy.timeout,
+ TimeUnit.NANOSECONDS.convert(timeSource.currentTimeMillis() - start, TimeUnit.MILLISECONDS));
+ }
+
+ public static class TestableBackPressure extends RateBasedBackPressure
+ {
+ public volatile boolean acquired = false;
+ public volatile boolean applied = false;
+ public volatile long timeout;
+
+ public TestableBackPressure(Map<String, Object> args, TimeSource timeSource, long windowSize)
+ {
+ super(args, timeSource, windowSize);
+ }
+
+ @Override
+ public boolean doRateLimit(RateLimiter rateLimiter, long timeoutInNanos)
+ {
+ acquired = super.doRateLimit(rateLimiter, timeoutInNanos);
+ applied = true;
+ timeout = timeoutInNanos;
+ return acquired;
+ }
+
+ public boolean checkAcquired()
+ {
+ boolean checked = acquired;
+ acquired = false;
+ return checked;
+ }
+
+ public boolean checkApplied()
+ {
+ boolean checked = applied;
+ applied = false;
+ return checked;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/test/unit/org/apache/cassandra/utils/SlidingTimeRateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/SlidingTimeRateTest.java b/test/unit/org/apache/cassandra/utils/SlidingTimeRateTest.java
new file mode 100644
index 0000000..8c11f9d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/SlidingTimeRateTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SlidingTimeRateTest
+{
+ @Test
+ public void testUpdateAndGet()
+ {
+ SlidingTimeRate rate = new SlidingTimeRate(new TestTimeSource(), 10, 1, TimeUnit.SECONDS);
+ int updates = 100;
+ for (int i = 0; i < updates; i++)
+ {
+ rate.update(1);
+ }
+ Assert.assertEquals(updates, rate.get(TimeUnit.SECONDS), 0.0);
+ }
+
+ @Test
+ public void testUpdateAndGetBetweenWindows() throws InterruptedException
+ {
+ TestTimeSource time = new TestTimeSource();
+ SlidingTimeRate rate = new SlidingTimeRate(time, 5, 1, TimeUnit.SECONDS);
+ int updates = 100;
+ for (int i = 0; i < updates; i++)
+ {
+ rate.update(1);
+ time.sleep(100, TimeUnit.MILLISECONDS);
+ }
+ Assert.assertEquals(10, rate.get(TimeUnit.SECONDS), 0.0);
+ }
+
+ @Test
+ public void testUpdateAndGetPastWindowSize() throws InterruptedException
+ {
+ TestTimeSource time = new TestTimeSource();
+ SlidingTimeRate rate = new SlidingTimeRate(time, 5, 1, TimeUnit.SECONDS);
+ int updates = 100;
+ for (int i = 0; i < updates; i++)
+ {
+ rate.update(1);
+ }
+
+ time.sleep(6, TimeUnit.SECONDS);
+
+ Assert.assertEquals(0, rate.get(TimeUnit.SECONDS), 0.0);
+ }
+
+ @Test
+ public void testUpdateAndGetToPointInTime() throws InterruptedException
+ {
+ TestTimeSource time = new TestTimeSource();
+ SlidingTimeRate rate = new SlidingTimeRate(time, 5, 1, TimeUnit.SECONDS);
+ int updates = 10;
+ for (int i = 0; i < updates; i++)
+ {
+ rate.update(1);
+ time.sleep(100, TimeUnit.MILLISECONDS);
+ }
+
+ time.sleep(1, TimeUnit.SECONDS);
+
+ Assert.assertEquals(5, rate.get(TimeUnit.SECONDS), 0.0);
+ Assert.assertEquals(10, rate.get(1, TimeUnit.SECONDS), 0.0);
+ }
+
+ @Test
+ public void testDecay() throws InterruptedException
+ {
+ TestTimeSource time = new TestTimeSource();
+ SlidingTimeRate rate = new SlidingTimeRate(time, 5, 1, TimeUnit.SECONDS);
+ int updates = 10;
+ for (int i = 0; i < updates; i++)
+ {
+ rate.update(1);
+ time.sleep(100, TimeUnit.MILLISECONDS);
+ }
+ Assert.assertEquals(10, rate.get(TimeUnit.SECONDS), 0.0);
+
+ time.sleep(1, TimeUnit.SECONDS);
+
+ Assert.assertEquals(5, rate.get(TimeUnit.SECONDS), 0.0);
+
+ time.sleep(2, TimeUnit.SECONDS);
+
+ Assert.assertEquals(2.5, rate.get(TimeUnit.SECONDS), 0.0);
+ }
+
+ @Test
+ public void testPruning() throws InterruptedException
+ {
+ TestTimeSource time = new TestTimeSource();
+ SlidingTimeRate rate = new SlidingTimeRate(time, 5, 1, TimeUnit.SECONDS);
+
+ rate.update(1);
+ Assert.assertEquals(1, rate.size());
+
+ time.sleep(6, TimeUnit.SECONDS);
+
+ rate.prune();
+ Assert.assertEquals(0, rate.size());
+ }
+
+ @Test
+ public void testConcurrentUpdateAndGet() throws InterruptedException
+ {
+ final ExecutorService executor = Executors.newFixedThreadPool(FBUtilities.getAvailableProcessors());
+ final TestTimeSource time = new TestTimeSource();
+ final SlidingTimeRate rate = new SlidingTimeRate(time, 5, 1, TimeUnit.SECONDS);
+ int updates = 100000;
+ for (int i = 0; i < updates; i++)
+ {
+ executor.submit(() -> {
+ time.sleep(1, TimeUnit.MILLISECONDS);
+ rate.update(1);
+ });
+ }
+
+ executor.shutdown();
+
+ Assert.assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES));
+ Assert.assertEquals(1000, rate.get(TimeUnit.SECONDS), 100.0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/test/unit/org/apache/cassandra/utils/TestTimeSource.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/TestTimeSource.java b/test/unit/org/apache/cassandra/utils/TestTimeSource.java
new file mode 100644
index 0000000..4ecd086
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/TestTimeSource.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class TestTimeSource implements TimeSource
+{
+ private final AtomicLong timeInMillis = new AtomicLong(System.currentTimeMillis());
+
+ @Override
+ public long currentTimeMillis()
+ {
+ return timeInMillis.get();
+ }
+
+ @Override
+ public long nanoTime()
+ {
+ return timeInMillis.get() * 1_000_000;
+ }
+
+ @Override
+ public TimeSource sleep(long sleepFor, TimeUnit unit)
+ {
+ long current = timeInMillis.get();
+ long sleepInMillis = TimeUnit.MILLISECONDS.convert(sleepFor, unit);
+ boolean elapsed;
+ do
+ {
+ long newTime = current + sleepInMillis;
+ elapsed = timeInMillis.compareAndSet(current, newTime);
+ if (!elapsed)
+ {
+ long updated = timeInMillis.get();
+ if (updated - current >= sleepInMillis)
+ {
+ elapsed = true;
+ }
+ else
+ {
+ sleepInMillis -= updated - current;
+ current = updated;
+ }
+ }
+ }
+ while (!elapsed);
+ return this;
+ }
+
+ @Override
+ public TimeSource sleepUninterruptibly(long sleepFor, TimeUnit unit)
+ {
+ return sleep(sleepFor, unit);
+ }
+}
[2/2] cassandra git commit: Support optional backpressure strategies
at the coordinator
Posted by st...@apache.org.
Support optional backpressure strategies at the coordinator
patch by Sergio Bossa; reviewed by Stefania Alborghetti for CASSANDRA-9318
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d43b9ce5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d43b9ce5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d43b9ce5
Branch: refs/heads/trunk
Commit: d43b9ce5092f8879a1a66afebab74d86e9e127fb
Parents: 560faba
Author: Sergio Bossa <se...@gmail.com>
Authored: Mon Sep 19 10:42:50 2016 +0800
Committer: Stefania Alborghetti <st...@datastax.com>
Committed: Tue Sep 20 09:07:36 2016 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 22 +
.../org/apache/cassandra/config/Config.java | 4 +-
.../cassandra/config/DatabaseDescriptor.java | 46 +++
.../apache/cassandra/hints/HintsDispatcher.java | 6 +
.../apache/cassandra/net/BackPressureState.java | 51 +++
.../cassandra/net/BackPressureStrategy.java | 42 ++
.../apache/cassandra/net/IAsyncCallback.java | 5 +
.../apache/cassandra/net/MessagingService.java | 124 +++++-
.../cassandra/net/MessagingServiceMBean.java | 18 +-
.../net/OutboundTcpConnectionPool.java | 12 +-
.../cassandra/net/RateBasedBackPressure.java | 296 ++++++++++++++
.../net/RateBasedBackPressureState.java | 133 ++++++
.../cassandra/net/ResponseVerbHandler.java | 5 +
.../service/AbstractWriteResponseHandler.java | 27 +-
.../apache/cassandra/service/StorageProxy.java | 37 +-
.../apache/cassandra/utils/SlidingTimeRate.java | 167 ++++++++
.../cassandra/utils/SystemTimeSource.java | 54 +++
.../apache/cassandra/utils/TestRateLimiter.java | 58 +++
.../org/apache/cassandra/utils/TimeSource.java | 58 +++
.../utils/concurrent/IntervalLock.java | 69 ++++
test/resources/byteman/mutation_limiter.btm | 8 +
.../config/DatabaseDescriptorRefTest.java | 1 +
.../cassandra/net/MessagingServiceTest.java | 247 ++++++++++-
.../net/RateBasedBackPressureTest.java | 409 +++++++++++++++++++
.../cassandra/utils/SlidingTimeRateTest.java | 146 +++++++
.../apache/cassandra/utils/TestTimeSource.java | 72 ++++
27 files changed, 2072 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b625a58..e9e8ccf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.10
+ * Support optional backpressure strategies at the coordinator (CASSANDRA-9318)
* Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
* Fix cassandra-stress graphing (CASSANDRA-12237)
* Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index a25e084..8a0b3ee 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1172,3 +1172,25 @@ gc_warn_threshold_in_ms: 1000
# early. Any value size larger than this threshold will result into marking an SSTable
# as corrupted.
# max_value_size_in_mb: 256
+
+# Back-pressure settings #
+# If enabled, the coordinator will apply the back-pressure strategy specified below to each mutation
+# sent to replicas, with the aim of reducing pressure on overloaded replicas.
+back_pressure_enabled: false
+# The back-pressure strategy applied.
+# The default implementation, RateBasedBackPressure, takes three arguments:
+# high ratio, factor, and flow type, and uses the ratio between incoming mutation responses and outgoing mutation requests.
+# If below high ratio, outgoing mutations are rate limited according to the incoming rate decreased by the given factor;
+# if above high ratio, the rate limiting is increased by the given factor;
+# such factor is usually best configured between 1 and 10, use larger values for a faster recovery
+# at the expense of potentially more dropped mutations;
+# the rate limiting is applied according to the flow type: if FAST, it's rate limited at the speed of the fastest replica,
+# if SLOW at the speed of the slowest one.
+# New strategies can be added. Implementors need to implement org.apache.cassandra.net.BackpressureStrategy and
+# provide a public constructor accepting a Map<String, Object>.
+back_pressure_strategy:
+ - class_name: org.apache.cassandra.net.RateBasedBackPressure
+ parameters:
+ - high_ratio: 0.90
+ factor: 5
+ flow: FAST
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index f2f21ad..e6b3638 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -46,7 +46,6 @@ public class Config
*/
public static final String PROPERTY_PREFIX = "cassandra.";
-
public String cluster_name = "Test Cluster";
public String authenticator;
public String authorizer;
@@ -355,6 +354,9 @@ public class Config
*/
public UserFunctionTimeoutPolicy user_function_timeout_policy = UserFunctionTimeoutPolicy.die;
+ public volatile boolean back_pressure_enabled = false;
+ public volatile ParameterizedClass back_pressure_strategy;
+
public static boolean getOutboundBindAny()
{
return outboundBindAny;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index dc4cc36..ce889ff 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.config;
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.Constructor;
import java.net.*;
import java.nio.file.FileStore;
import java.nio.file.Files;
@@ -53,6 +54,8 @@ import org.apache.cassandra.locator.DynamicEndpointSnitch;
import org.apache.cassandra.locator.EndpointSnitchInfo;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.SeedProvider;
+import org.apache.cassandra.net.BackPressureStrategy;
+import org.apache.cassandra.net.RateBasedBackPressure;
import org.apache.cassandra.scheduler.IRequestScheduler;
import org.apache.cassandra.scheduler.NoScheduler;
import org.apache.cassandra.security.EncryptionContext;
@@ -110,6 +113,7 @@ public class DatabaseDescriptor
private static EncryptionContext encryptionContext;
private static boolean hasLoggedConfig;
+ private static BackPressureStrategy backPressureStrategy;
private static DiskOptimizationStrategy diskOptimizationStrategy;
private static boolean clientInitialized;
@@ -706,6 +710,27 @@ public class DatabaseDescriptor
diskOptimizationStrategy = new SpinningDiskOptimizationStrategy();
break;
}
+
+ try
+ {
+ ParameterizedClass strategy = conf.back_pressure_strategy != null ? conf.back_pressure_strategy : RateBasedBackPressure.withDefaultParams();
+ Class<?> clazz = Class.forName(strategy.class_name);
+ if (!BackPressureStrategy.class.isAssignableFrom(clazz))
+ throw new ConfigurationException(strategy + " is not an instance of " + BackPressureStrategy.class.getCanonicalName(), false);
+
+ Constructor<?> ctor = clazz.getConstructor(Map.class);
+ BackPressureStrategy instance = (BackPressureStrategy) ctor.newInstance(strategy.parameters);
+ logger.info("Back-pressure is {} with strategy {}.", backPressureEnabled() ? "enabled" : "disabled", conf.back_pressure_strategy);
+ backPressureStrategy = instance;
+ }
+ catch (ConfigurationException ex)
+ {
+ throw ex;
+ }
+ catch (Exception ex)
+ {
+ throw new ConfigurationException("Error configuring back-pressure strategy: " + conf.back_pressure_strategy, ex);
+ }
}
public static void applyAddressConfig() throws ConfigurationException
@@ -2342,4 +2367,25 @@ public class DatabaseDescriptor
{
return Integer.parseInt(System.getProperty("cassandra.search_concurrency_factor", "1"));
}
+
+ public static void setBackPressureEnabled(boolean backPressureEnabled)
+ {
+ conf.back_pressure_enabled = backPressureEnabled;
+ }
+
+ public static boolean backPressureEnabled()
+ {
+ return conf.back_pressure_enabled;
+ }
+
+ @VisibleForTesting
+ public static void setBackPressureStrategy(BackPressureStrategy strategy)
+ {
+ backPressureStrategy = strategy;
+ }
+
+ public static BackPressureStrategy getBackPressureStrategy()
+ {
+ return backPressureStrategy;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/hints/HintsDispatcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatcher.java b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
index 29bab80..6940e1e 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatcher.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
@@ -209,5 +209,11 @@ final class HintsDispatcher implements AutoCloseable
{
return false;
}
+
+ @Override
+ public boolean supportsBackPressure()
+ {
+ return true;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/BackPressureState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/BackPressureState.java b/src/java/org/apache/cassandra/net/BackPressureState.java
new file mode 100644
index 0000000..34fd0dd
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/BackPressureState.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+
+/**
+ * Interface meant to track the back-pressure state per replica host.
+ */
+public interface BackPressureState
+{
+ /**
+ * Called when a message is sent to a replica.
+ */
+ void onMessageSent(MessageOut<?> message);
+
+ /**
+ * Called when a response is received from a replica.
+ */
+ void onResponseReceived();
+
+ /**
+ * Called when no response is received from replica.
+ */
+ void onResponseTimeout();
+
+ /**
+ * Gets the current back-pressure rate limit.
+ */
+ double getBackPressureRateLimit();
+
+ /**
+ * Returns the host this state refers to.
+ */
+ InetAddress getHost();
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/BackPressureStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/BackPressureStrategy.java b/src/java/org/apache/cassandra/net/BackPressureStrategy.java
new file mode 100644
index 0000000..b61a0a1
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/BackPressureStrategy.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Back-pressure algorithm interface.
+ * <br/>
+ * For experts usage only. Implementors must provide a constructor accepting a single {@code Map<String, Object>} argument,
+ * representing any parameters eventually required by the specific implementation.
+ */
+public interface BackPressureStrategy<S extends BackPressureState>
+{
+ /**
+ * Applies the back-pressure algorithm, based and acting on the given {@link BackPressureState}s, and up to the given
+ * timeout.
+ */
+ void apply(Set<S> states, long timeout, TimeUnit unit);
+
+ /**
+ * Creates a new {@link BackPressureState} initialized as needed by the specific implementation.
+ */
+ S newState(InetAddress host);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/IAsyncCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IAsyncCallback.java b/src/java/org/apache/cassandra/net/IAsyncCallback.java
index d159e0c..7835079 100644
--- a/src/java/org/apache/cassandra/net/IAsyncCallback.java
+++ b/src/java/org/apache/cassandra/net/IAsyncCallback.java
@@ -49,4 +49,9 @@ public interface IAsyncCallback<T>
* given as input to the dynamic snitch.
*/
boolean isLatencyForSnitch();
+
+ default boolean supportsBackPressure()
+ {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index e72a9a2..7632ebd 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -28,6 +28,8 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -39,8 +41,10 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.concurrent.ExecutorLocals;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Stage;
@@ -419,7 +423,6 @@ public final class MessagingService implements MessagingServiceMBean
Verb.BATCH_STORE,
Verb.BATCH_REMOVE);
-
private static final class DroppedMessages
{
final DroppedMessageMetrics metrics;
@@ -445,20 +448,8 @@ public final class MessagingService implements MessagingServiceMBean
// message sinks are a testing hook
private final Set<IMessageSink> messageSinks = new CopyOnWriteArraySet<>();
- public void addMessageSink(IMessageSink sink)
- {
- messageSinks.add(sink);
- }
-
- public void removeMessageSink(IMessageSink sink)
- {
- messageSinks.remove(sink);
- }
-
- public void clearMessageSinks()
- {
- messageSinks.clear();
- }
+ // back-pressure implementation
+ private final BackPressureStrategy backPressure = DatabaseDescriptor.getBackPressureStrategy();
private static class MSHandle
{
@@ -504,9 +495,17 @@ public final class MessagingService implements MessagingServiceMBean
public Object apply(Pair<Integer, ExpiringMap.CacheableObject<CallbackInfo>> pair)
{
final CallbackInfo expiredCallbackInfo = pair.right.value;
+
maybeAddLatency(expiredCallbackInfo.callback, expiredCallbackInfo.target, pair.right.timeout);
+
ConnectionMetrics.totalTimeouts.mark();
getConnectionPool(expiredCallbackInfo.target).incrementTimeout();
+
+ if (expiredCallbackInfo.callback.supportsBackPressure())
+ {
+ updateBackPressureOnReceive(expiredCallbackInfo.target, expiredCallbackInfo.callback, true);
+ }
+
if (expiredCallbackInfo.isFailureCallback())
{
StageManager.getStage(Stage.INTERNAL_RESPONSE).submit(new Runnable()
@@ -545,6 +544,76 @@ public final class MessagingService implements MessagingServiceMBean
}
}
+ public void addMessageSink(IMessageSink sink)
+ {
+ messageSinks.add(sink);
+ }
+
+ public void removeMessageSink(IMessageSink sink)
+ {
+ messageSinks.remove(sink);
+ }
+
+ public void clearMessageSinks()
+ {
+ messageSinks.clear();
+ }
+
+ /**
+ * Updates the back-pressure state on sending to the given host if enabled and the given message callback supports it.
+ *
+ * @param host The replica host the back-pressure state refers to.
+ * @param callback The message callback.
+ * @param message The actual message.
+ */
+ public void updateBackPressureOnSend(InetAddress host, IAsyncCallback callback, MessageOut<?> message)
+ {
+ if (DatabaseDescriptor.backPressureEnabled() && callback.supportsBackPressure())
+ {
+ BackPressureState backPressureState = getConnectionPool(host).getBackPressureState();
+ backPressureState.onMessageSent(message);
+ }
+ }
+
+ /**
+ * Updates the back-pressure state on reception from the given host if enabled and the given message callback supports it.
+ *
+ * @param host The replica host the back-pressure state refers to.
+ * @param callback The message callback.
+ * @param timeout True if updated following a timeout, false otherwise.
+ */
+ public void updateBackPressureOnReceive(InetAddress host, IAsyncCallback callback, boolean timeout)
+ {
+ if (DatabaseDescriptor.backPressureEnabled() && callback.supportsBackPressure())
+ {
+ BackPressureState backPressureState = getConnectionPool(host).getBackPressureState();
+ if (!timeout)
+ backPressureState.onResponseReceived();
+ else
+ backPressureState.onResponseTimeout();
+ }
+ }
+
+ /**
+ * Applies back-pressure for the given hosts, according to the configured strategy.
+ *
+ * If the local host is present, it is removed from the pool, as back-pressure is only applied
+ * to remote hosts.
+ *
+ * @param hosts The hosts to apply back-pressure to.
+ * @param timeoutInNanos The max back-pressure timeout.
+ */
+ public void applyBackPressure(Iterable<InetAddress> hosts, long timeoutInNanos)
+ {
+ if (DatabaseDescriptor.backPressureEnabled())
+ {
+ backPressure.apply(StreamSupport.stream(hosts.spliterator(), false)
+ .filter(h -> !h.equals(FBUtilities.getBroadcastAddress()))
+ .map(h -> getConnectionPool(h).getBackPressureState())
+ .collect(Collectors.toSet()), timeoutInNanos, TimeUnit.NANOSECONDS);
+ }
+ }
+
/**
* Track latency information for the dynamic snitch
*
@@ -699,7 +768,7 @@ public final class MessagingService implements MessagingServiceMBean
OutboundTcpConnectionPool cp = connectionManagers.get(to);
if (cp == null)
{
- cp = new OutboundTcpConnectionPool(to);
+ cp = new OutboundTcpConnectionPool(to, backPressure.newState(to));
OutboundTcpConnectionPool existingPool = connectionManagers.putIfAbsent(to, cp);
if (existingPool != null)
cp = existingPool;
@@ -805,6 +874,7 @@ public final class MessagingService implements MessagingServiceMBean
public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, long timeout, boolean failureCallback)
{
int id = addCallback(cb, message, to, timeout, failureCallback);
+ updateBackPressureOnSend(to, cb, message);
sendOneWay(failureCallback ? message.withParameter(FAILURE_CALLBACK_PARAM, ONE_BYTE) : message, id, to);
return id;
}
@@ -827,6 +897,7 @@ public final class MessagingService implements MessagingServiceMBean
boolean allowHints)
{
int id = addCallback(handler, message, to, message.getTimeout(), handler.consistencyLevel, allowHints);
+ updateBackPressureOnSend(to, handler, message);
sendOneWay(message.withParameter(FAILURE_CALLBACK_PARAM, ONE_BYTE), id, to);
return id;
}
@@ -1379,6 +1450,27 @@ public final class MessagingService implements MessagingServiceMBean
return result;
}
+ public Map<String, Double> getBackPressurePerHost()
+ {
+ Map<String, Double> map = new HashMap<>(connectionManagers.size());
+ for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet())
+ map.put(entry.getKey().getHostAddress(), entry.getValue().getBackPressureState().getBackPressureRateLimit());
+
+ return map;
+ }
+
+ @Override
+ public void setBackPressureEnabled(boolean enabled)
+ {
+ DatabaseDescriptor.setBackPressureEnabled(enabled);
+ }
+
+ @Override
+ public boolean isBackPressureEnabled()
+ {
+ return DatabaseDescriptor.backPressureEnabled();
+ }
+
public static IPartitioner globalPartitioner()
{
return StorageService.instance.getTokenMetadata().partitioner;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
index 3bcb0d5..b2e79e0 100644
--- a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
+++ b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
@@ -23,8 +23,7 @@ import java.net.UnknownHostException;
import java.util.Map;
/**
- * MBean exposing MessagingService metrics.
- * - OutboundConnectionPools - Command/Response - Pending/Completed Tasks
+ * MBean exposing MessagingService metrics plus allowing to enable/disable back-pressure.
*/
public interface MessagingServiceMBean
{
@@ -88,5 +87,20 @@ public interface MessagingServiceMBean
*/
public Map<String, Long> getTimeoutsPerHost();
+ /**
+ * Back-pressure rate limiting per host
+ */
+ public Map<String, Double> getBackPressurePerHost();
+
+ /**
+ * Enable/Disable back-pressure
+ */
+ public void setBackPressureEnabled(boolean enabled);
+
+ /**
+ * Get back-pressure enabled state
+ */
+ public boolean isBackPressureEnabled();
+
public int getVersion(String address) throws UnknownHostException;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
index 0418ff6..b0391ba 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
@@ -50,7 +50,10 @@ public class OutboundTcpConnectionPool
private InetAddress resetEndpoint;
private ConnectionMetrics metrics;
- OutboundTcpConnectionPool(InetAddress remoteEp)
+ // back-pressure state linked to this connection:
+ private final BackPressureState backPressureState;
+
+ OutboundTcpConnectionPool(InetAddress remoteEp, BackPressureState backPressureState)
{
id = remoteEp;
resetEndpoint = SystemKeyspace.getPreferredIP(remoteEp);
@@ -59,6 +62,8 @@ public class OutboundTcpConnectionPool
smallMessages = new OutboundTcpConnection(this, "Small");
largeMessages = new OutboundTcpConnection(this, "Large");
gossipMessages = new OutboundTcpConnection(this, "Gossip");
+
+ this.backPressureState = backPressureState;
}
/**
@@ -74,6 +79,11 @@ public class OutboundTcpConnectionPool
: smallMessages;
}
+ public BackPressureState getBackPressureState()
+ {
+ return backPressureState;
+ }
+
void reset()
{
for (OutboundTcpConnection conn : new OutboundTcpConnection[] { smallMessages, largeMessages, gossipMessages })
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/RateBasedBackPressure.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/RateBasedBackPressure.java b/src/java/org/apache/cassandra/net/RateBasedBackPressure.java
new file mode 100644
index 0000000..1dae243
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/RateBasedBackPressure.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.utils.NoSpamLogger;
+import org.apache.cassandra.utils.SystemTimeSource;
+import org.apache.cassandra.utils.TimeSource;
+import org.apache.cassandra.utils.concurrent.IntervalLock;
+
+/**
+ * Back-pressure algorithm based on rate limiting according to the ratio between incoming and outgoing rates, computed
+ * over a sliding time window with size equal to write RPC timeout.
+ */
+public class RateBasedBackPressure implements BackPressureStrategy<RateBasedBackPressureState>
+{
+ static final String HIGH_RATIO = "high_ratio";
+ static final String FACTOR = "factor";
+ static final String FLOW = "flow";
+ private static final String BACK_PRESSURE_HIGH_RATIO = "0.90";
+ private static final String BACK_PRESSURE_FACTOR = "5";
+ private static final String BACK_PRESSURE_FLOW = "FAST";
+
+ private static final Logger logger = LoggerFactory.getLogger(RateBasedBackPressure.class);
+ private static final NoSpamLogger tenSecsNoSpamLogger = NoSpamLogger.getLogger(logger, 10, TimeUnit.SECONDS);
+ private static final NoSpamLogger oneMinNoSpamLogger = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
+
+ protected final TimeSource timeSource;
+ protected final double highRatio;
+ protected final int factor;
+ protected final Flow flow;
+ protected final long windowSize;
+
+ private final Cache<Set<RateBasedBackPressureState>, IntervalRateLimiter> rateLimiters =
+ CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS).build();
+
+ enum Flow
+ {
+ FAST,
+ SLOW
+ }
+
+ public static ParameterizedClass withDefaultParams()
+ {
+ return new ParameterizedClass(RateBasedBackPressure.class.getName(),
+ ImmutableMap.of(HIGH_RATIO, BACK_PRESSURE_HIGH_RATIO,
+ FACTOR, BACK_PRESSURE_FACTOR,
+ FLOW, BACK_PRESSURE_FLOW));
+ }
+
+ public RateBasedBackPressure(Map<String, Object> args)
+ {
+ this(args, new SystemTimeSource(), DatabaseDescriptor.getWriteRpcTimeout());
+ }
+
+ @VisibleForTesting
+ public RateBasedBackPressure(Map<String, Object> args, TimeSource timeSource, long windowSize)
+ {
+ if (args.size() != 3)
+ {
+ throw new IllegalArgumentException(RateBasedBackPressure.class.getCanonicalName()
+ + " requires 3 arguments: high ratio, back-pressure factor and flow type.");
+ }
+
+ try
+ {
+ highRatio = Double.parseDouble(args.getOrDefault(HIGH_RATIO, "").toString().trim());
+ factor = Integer.parseInt(args.getOrDefault(FACTOR, "").toString().trim());
+ flow = Flow.valueOf(args.getOrDefault(FLOW, "").toString().trim().toUpperCase());
+ }
+ catch (Exception ex)
+ {
+ throw new IllegalArgumentException(ex.getMessage(), ex);
+ }
+
+ if (highRatio <= 0 || highRatio > 1)
+ {
+ throw new IllegalArgumentException("Back-pressure high ratio must be > 0 and <= 1");
+ }
+ if (factor < 1)
+ {
+ throw new IllegalArgumentException("Back-pressure factor must be >= 1");
+ }
+ if (windowSize < 10)
+ {
+ throw new IllegalArgumentException("Back-pressure window size must be >= 10");
+ }
+
+ this.timeSource = timeSource;
+ this.windowSize = windowSize;
+
+ logger.info("Initialized back-pressure with high ratio: {}, factor: {}, flow: {}, window size: {}.",
+ highRatio, factor, flow, windowSize);
+ }
+
+ @Override
+ public void apply(Set<RateBasedBackPressureState> states, long timeout, TimeUnit unit)
+ {
+ // Go through the back-pressure states, try updating each of them and collect min/max rates:
+ boolean isUpdated = false;
+ double minRateLimit = Double.POSITIVE_INFINITY;
+ double maxRateLimit = Double.NEGATIVE_INFINITY;
+ double minIncomingRate = Double.POSITIVE_INFINITY;
+ RateLimiter currentMin = null;
+ RateLimiter currentMax = null;
+ for (RateBasedBackPressureState backPressure : states)
+ {
+ // Get the incoming/outgoing rates:
+ double incomingRate = backPressure.incomingRate.get(TimeUnit.SECONDS);
+ double outgoingRate = backPressure.outgoingRate.get(TimeUnit.SECONDS);
+ // Compute the min incoming rate:
+ if (incomingRate < minIncomingRate)
+ minIncomingRate = incomingRate;
+
+ // Try acquiring the interval lock:
+ if (backPressure.tryIntervalLock(windowSize))
+ {
+ // If acquired, proceed updating thi back-pressure state rate limit:
+ isUpdated = true;
+ try
+ {
+ RateLimiter limiter = backPressure.rateLimiter;
+
+ // If we have sent any outgoing requests during this time window, go ahead with rate limiting
+ // (this is safe against concurrent back-pressure state updates thanks to the rw-locking in
+ // RateBasedBackPressureState):
+ if (outgoingRate > 0)
+ {
+ // Compute the incoming/outgoing ratio:
+ double actualRatio = incomingRate / outgoingRate;
+
+ // If the ratio is above the high mark, try growing by the back-pressure factor:
+ if (actualRatio >= highRatio)
+ {
+ // Only if the outgoing rate is able to keep up with the rate increase:
+ if (limiter.getRate() <= outgoingRate)
+ {
+ double newRate = limiter.getRate() + ((limiter.getRate() * factor) / 100);
+ if (newRate > 0 && newRate != Double.POSITIVE_INFINITY)
+ {
+ limiter.setRate(newRate);
+ }
+ }
+ }
+ // If below, set the rate limiter at the incoming rate, decreased by factor:
+ else
+ {
+ // Only if the new rate is actually less than the actual rate:
+ double newRate = incomingRate - ((incomingRate * factor) / 100);
+ if (newRate > 0 && newRate < limiter.getRate())
+ {
+ limiter.setRate(newRate);
+ }
+ }
+
+ logger.trace("Back-pressure state for {}: incoming rate {}, outgoing rate {}, ratio {}, rate limiting {}",
+ backPressure.getHost(), incomingRate, outgoingRate, actualRatio, limiter.getRate());
+ }
+ // Otherwise reset the rate limiter:
+ else
+ {
+ limiter.setRate(Double.POSITIVE_INFINITY);
+ }
+
+ // Housekeeping: pruning windows and resetting the last check timestamp!
+ backPressure.incomingRate.prune();
+ backPressure.outgoingRate.prune();
+ }
+ finally
+ {
+ backPressure.releaseIntervalLock();
+ }
+ }
+ if (backPressure.rateLimiter.getRate() <= minRateLimit)
+ {
+ minRateLimit = backPressure.rateLimiter.getRate();
+ currentMin = backPressure.rateLimiter;
+ }
+ if (backPressure.rateLimiter.getRate() >= maxRateLimit)
+ {
+ maxRateLimit = backPressure.rateLimiter.getRate();
+ currentMax = backPressure.rateLimiter;
+ }
+ }
+
+ // Now find the rate limiter corresponding to the replica group represented by these back-pressure states:
+ if (!states.isEmpty())
+ {
+ try
+ {
+ // Get the rate limiter:
+ IntervalRateLimiter rateLimiter = rateLimiters.get(states, () -> new IntervalRateLimiter(timeSource));
+
+ // If the back-pressure was updated and we acquire the interval lock for the rate limiter of this group:
+ if (isUpdated && rateLimiter.tryIntervalLock(windowSize))
+ {
+ try
+ {
+ // Update the rate limiter value based on the configured flow:
+ if (flow.equals(Flow.FAST))
+ rateLimiter.limiter = currentMax;
+ else
+ rateLimiter.limiter = currentMin;
+
+ tenSecsNoSpamLogger.info("{} currently applied for remote replicas: {}", rateLimiter.limiter, states);
+ }
+ finally
+ {
+ rateLimiter.releaseIntervalLock();
+ }
+ }
+ // Assigning a single rate limiter per replica group once per window size allows the back-pressure rate
+ // limiting to be stable within the group itself.
+
+ // Finally apply the rate limit with a max pause time equal to the provided timeout minus the
+ // response time computed from the incoming rate, to reduce the number of client timeouts by taking into
+ // account how long it could take to process responses after back-pressure:
+ long responseTimeInNanos = (long) (TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS) / minIncomingRate);
+ doRateLimit(rateLimiter.limiter, Math.max(0, TimeUnit.NANOSECONDS.convert(timeout, unit) - responseTimeInNanos));
+ }
+ catch (ExecutionException ex)
+ {
+ throw new IllegalStateException(ex);
+ }
+ }
+ }
+
+ @Override
+ public RateBasedBackPressureState newState(InetAddress host)
+ {
+ return new RateBasedBackPressureState(host, timeSource, windowSize);
+ }
+
+ @VisibleForTesting
+ RateLimiter getRateLimiterForReplicaGroup(Set<RateBasedBackPressureState> states)
+ {
+ IntervalRateLimiter rateLimiter = rateLimiters.getIfPresent(states);
+ return rateLimiter != null ? rateLimiter.limiter : RateLimiter.create(Double.POSITIVE_INFINITY);
+ }
+
+ @VisibleForTesting
+ boolean doRateLimit(RateLimiter rateLimiter, long timeoutInNanos)
+ {
+ if (!rateLimiter.tryAcquire(1, timeoutInNanos, TimeUnit.NANOSECONDS))
+ {
+ timeSource.sleepUninterruptibly(timeoutInNanos, TimeUnit.NANOSECONDS);
+ oneMinNoSpamLogger.info("Cannot apply {} due to exceeding write timeout, pausing {} nanoseconds instead.",
+ rateLimiter, timeoutInNanos);
+
+ return false;
+ }
+
+ return true;
+ }
+
+ private static class IntervalRateLimiter extends IntervalLock
+ {
+ public volatile RateLimiter limiter = RateLimiter.create(Double.POSITIVE_INFINITY);
+
+ IntervalRateLimiter(TimeSource timeSource)
+ {
+ super(timeSource);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java b/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java
new file mode 100644
index 0000000..c19f277
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.cassandra.utils.SlidingTimeRate;
+import org.apache.cassandra.utils.TimeSource;
+import org.apache.cassandra.utils.concurrent.IntervalLock;
+
+/**
+ * The rate-based back-pressure state, tracked per replica host.
+ * <br/><br/>
+ *
+ * This back-pressure state is made up of the following attributes:
+ * <ul>
+ * <li>windowSize: the length of the back-pressure window in milliseconds.</li>
+ * <li>incomingRate: the rate of back-pressure supporting incoming messages.</li>
+ * <li>outgoingRate: the rate of back-pressure supporting outgoing messages.</li>
+ * <li>rateLimiter: the rate limiter to eventually apply to outgoing messages.</li>
+ * </ul>
+ * <br/>
+ * The incomingRate and outgoingRate are updated together when a response is received to guarantee consistency between
+ * the two.
+ * <br/>
+ * It also provides methods to exclusively lock/release back-pressure windows at given intervals;
+ * this allows to apply back-pressure even under concurrent modifications. Please also note a read lock is acquired
+ * during response processing so that no concurrent rate updates can screw rate computations.
+ */
+class RateBasedBackPressureState extends IntervalLock implements BackPressureState
+{
+ private final InetAddress host;
+ private final long windowSize;
+ final SlidingTimeRate incomingRate;
+ final SlidingTimeRate outgoingRate;
+ final RateLimiter rateLimiter;
+
+ RateBasedBackPressureState(InetAddress host, TimeSource timeSource, long windowSize)
+ {
+ super(timeSource);
+ this.host = host;
+ this.windowSize = windowSize;
+ this.incomingRate = new SlidingTimeRate(timeSource, this.windowSize, this.windowSize / 10, TimeUnit.MILLISECONDS);
+ this.outgoingRate = new SlidingTimeRate(timeSource, this.windowSize, this.windowSize / 10, TimeUnit.MILLISECONDS);
+ this.rateLimiter = RateLimiter.create(Double.POSITIVE_INFINITY);
+ }
+
+ @Override
+ public void onMessageSent(MessageOut<?> message) {}
+
+ @Override
+ public void onResponseReceived()
+ {
+ readLock().lock();
+ try
+ {
+ incomingRate.update(1);
+ outgoingRate.update(1);
+ }
+ finally
+ {
+ readLock().unlock();
+ }
+ }
+
+ @Override
+ public void onResponseTimeout()
+ {
+ readLock().lock();
+ try
+ {
+ outgoingRate.update(1);
+ }
+ finally
+ {
+ readLock().unlock();
+ }
+ }
+
+ @Override
+ public double getBackPressureRateLimit()
+ {
+ return rateLimiter.getRate();
+ }
+
+ @Override
+ public InetAddress getHost()
+ {
+ return host;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (obj instanceof RateBasedBackPressureState)
+ {
+ RateBasedBackPressureState other = (RateBasedBackPressureState) obj;
+ return this.host.equals(other.host);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return this.host.hashCode();
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("[host: %s, incoming rate: %.3f, outgoing rate: %.3f, rate limit: %.3f]",
+ host, incomingRate.get(TimeUnit.SECONDS), outgoingRate.get(TimeUnit.SECONDS), rateLimiter.getRate());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
index 89e1051..fe22e42 100644
--- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
@@ -52,5 +52,10 @@ public class ResponseVerbHandler implements IVerbHandler
MessagingService.instance().maybeAddLatency(cb, message.from, latency);
cb.response(message);
}
+
+ if (callbackInfo.callback.supportsBackPressure())
+ {
+ MessagingService.instance().updateBackPressureOnReceive(message.from, cb, false);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 7cc854a..8c30b89 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import com.google.common.collect.Iterables;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,6 +54,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
private volatile int failures = 0;
private final Map<InetAddress, RequestFailureReason> failureReasonByEndpoint;
private final long queryStartNanoTime;
+ private volatile boolean supportsBackPressure = true;
/**
* @param callback A callback to be called when the write is successful.
@@ -78,11 +80,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
public void get() throws WriteTimeoutException, WriteFailureException
{
- long requestTimeout = writeType == WriteType.COUNTER
- ? DatabaseDescriptor.getCounterWriteRpcTimeout()
- : DatabaseDescriptor.getWriteRpcTimeout();
-
- long timeout = TimeUnit.MILLISECONDS.toNanos(requestTimeout) - (System.nanoTime() - queryStartNanoTime);
+ long timeout = currentTimeout();
boolean success;
try
@@ -112,6 +110,14 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
}
}
+ public final long currentTimeout()
+ {
+ long requestTimeout = writeType == WriteType.COUNTER
+ ? DatabaseDescriptor.getCounterWriteRpcTimeout()
+ : DatabaseDescriptor.getWriteRpcTimeout();
+ return TimeUnit.MILLISECONDS.toNanos(requestTimeout) - (System.nanoTime() - queryStartNanoTime);
+ }
+
/**
* @return the minimum number of endpoints that must reply.
*/
@@ -172,4 +178,15 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
if (totalBlockFor() + n > totalEndpoints())
signal();
}
+
+ @Override
+ public boolean supportsBackPressure()
+ {
+ return supportsBackPressure;
+ }
+
+ public void setSupportsBackPressure(boolean supportsBackPressure)
+ {
+ this.supportsBackPressure = supportsBackPressure;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index ca8a9c6..241aa4f 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -25,6 +25,7 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -33,7 +34,9 @@ import com.google.common.cache.CacheLoader;
import com.google.common.collect.*;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Uninterruptibles;
+
import org.apache.commons.lang3.StringUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -527,6 +530,7 @@ public class StorageProxy implements StorageProxyMBean
{
AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistencyLevel, null, WriteType.SIMPLE, queryStartNanoTime);
+ responseHandler.setSupportsBackPressure(false);
}
MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT, proposal, Commit.serializer);
@@ -1224,6 +1228,10 @@ public class StorageProxy implements StorageProxyMBean
Stage stage)
throws OverloadedException
{
+ int targetsSize = Iterables.size(targets);
+
+ // this dc replicas:
+ Collection<InetAddress> localDc = null;
// extra-datacenter replicas, grouped by dc
Map<String, Collection<InetAddress>> dcGroups = null;
// only need to create a Message for non-local writes
@@ -1232,6 +1240,8 @@ public class StorageProxy implements StorageProxyMBean
boolean insertLocal = false;
ArrayList<InetAddress> endpointsToHint = null;
+ List<InetAddress> backPressureHosts = null;
+
for (InetAddress destination : targets)
{
checkHintOverload(destination);
@@ -1247,12 +1257,17 @@ public class StorageProxy implements StorageProxyMBean
// belongs on a different server
if (message == null)
message = mutation.createMessage();
+
String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
+
// direct writes to local DC or old Cassandra versions
// (1.1 knows how to forward old-style String message IDs; updated to int in 2.0)
if (localDataCenter.equals(dc))
{
- MessagingService.instance().sendRR(message, destination, responseHandler, true);
+ if (localDc == null)
+ localDc = new ArrayList<>(targetsSize);
+
+ localDc.add(destination);
}
else
{
@@ -1264,8 +1279,14 @@ public class StorageProxy implements StorageProxyMBean
dcGroups = new HashMap<>();
dcGroups.put(dc, messages);
}
+
messages.add(destination);
}
+
+ if (backPressureHosts == null)
+ backPressureHosts = new ArrayList<>(targetsSize);
+
+ backPressureHosts.add(destination);
}
}
else
@@ -1273,24 +1294,30 @@ public class StorageProxy implements StorageProxyMBean
if (shouldHint(destination))
{
if (endpointsToHint == null)
- endpointsToHint = new ArrayList<>(Iterables.size(targets));
+ endpointsToHint = new ArrayList<>(targetsSize);
+
endpointsToHint.add(destination);
}
}
}
+ if (backPressureHosts != null)
+ MessagingService.instance().applyBackPressure(backPressureHosts, responseHandler.currentTimeout());
+
if (endpointsToHint != null)
submitHint(mutation, endpointsToHint, responseHandler);
if (insertLocal)
performLocally(stage, Optional.of(mutation), mutation::apply, responseHandler);
+ if (localDc != null)
+ {
+ for (InetAddress destination : localDc)
+ MessagingService.instance().sendRR(message, destination, responseHandler, true);
+ }
if (dcGroups != null)
{
// for each datacenter, send the message to one node to relay the write to other replicas
- if (message == null)
- message = mutation.createMessage();
-
for (Collection<InetAddress> dcTargets : dcGroups.values())
sendMessagesToNonlocalDC(message, dcTargets, responseHandler);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/utils/SlidingTimeRate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/SlidingTimeRate.java b/src/java/org/apache/cassandra/utils/SlidingTimeRate.java
new file mode 100644
index 0000000..3053a05
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/SlidingTimeRate.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * Concurrent rate computation over a sliding time window.
+ */
+public class SlidingTimeRate
+{
+ private final ConcurrentSkipListMap<Long, AtomicInteger> counters = new ConcurrentSkipListMap<>();
+ private final AtomicLong lastCounterTimestamp = new AtomicLong(0);
+ private final ReadWriteLock pruneLock = new ReentrantReadWriteLock();
+ private final long sizeInMillis;
+ private final long precisionInMillis;
+ private final TimeSource timeSource;
+
+ /**
+ * Creates a sliding rate whose time window is of the given size, with the given precision and time unit.
+ * <br/>
+ * The precision defines how accurate the rate computation is, as it will be computed over window size +/-
+ * precision.
+ */
+ public SlidingTimeRate(TimeSource timeSource, long size, long precision, TimeUnit unit)
+ {
+ Preconditions.checkArgument(size > precision, "Size should be greater than precision.");
+ Preconditions.checkArgument(TimeUnit.MILLISECONDS.convert(precision, unit) >= 1, "Precision must be greater than or equal to 1 millisecond.");
+ this.sizeInMillis = TimeUnit.MILLISECONDS.convert(size, unit);
+ this.precisionInMillis = TimeUnit.MILLISECONDS.convert(precision, unit);
+ this.timeSource = timeSource;
+ }
+
+ /**
+ * Updates the rate.
+ */
+ public void update(int delta)
+ {
+ pruneLock.readLock().lock();
+ try
+ {
+ while (true)
+ {
+ long now = timeSource.currentTimeMillis();
+ long lastTimestamp = lastCounterTimestamp.get();
+ boolean isWithinPrecisionRange = (now - lastTimestamp) < precisionInMillis;
+ AtomicInteger lastCounter = counters.get(lastTimestamp);
+ // If there's a valid counter for the current last timestamp, and we're in the precision range,
+ // update such counter:
+ if (lastCounter != null && isWithinPrecisionRange)
+ {
+ lastCounter.addAndGet(delta);
+
+ break;
+ }
+ // Else if there's no counter or we're past the precision range, try to create a new counter,
+ // but only the thread updating the last timestamp will create a new counter:
+ else if (lastCounterTimestamp.compareAndSet(lastTimestamp, now))
+ {
+ AtomicInteger existing = counters.putIfAbsent(now, new AtomicInteger(delta));
+ if (existing != null)
+ {
+ existing.addAndGet(delta);
+ }
+
+ break;
+ }
+ }
+ }
+ finally
+ {
+ pruneLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Gets the current rate in the given time unit from the beginning of the time window to the
+ * provided point in time ago.
+ */
+ public double get(long toAgo, TimeUnit unit)
+ {
+ pruneLock.readLock().lock();
+ try
+ {
+ long toAgoInMillis = TimeUnit.MILLISECONDS.convert(toAgo, unit);
+ Preconditions.checkArgument(toAgoInMillis < sizeInMillis, "Cannot get rate in the past!");
+
+ long now = timeSource.currentTimeMillis();
+ long sum = 0;
+ ConcurrentNavigableMap<Long, AtomicInteger> tailCounters = counters
+ .tailMap(now - sizeInMillis, true)
+ .headMap(now - toAgoInMillis, true);
+ for (AtomicInteger i : tailCounters.values())
+ {
+ sum += i.get();
+ }
+
+ double rateInMillis = sum == 0
+ ? sum
+ : sum / (double) Math.max(1000, (now - toAgoInMillis) - tailCounters.firstKey());
+ double multiplier = TimeUnit.MILLISECONDS.convert(1, unit);
+ return rateInMillis * multiplier;
+ }
+ finally
+ {
+ pruneLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Gets the current rate in the given time unit.
+ */
+ public double get(TimeUnit unit)
+ {
+ return get(0, unit);
+ }
+
+ /**
+ * Prunes the time window of old unused updates.
+ */
+ public void prune()
+ {
+ pruneLock.writeLock().lock();
+ try
+ {
+ long now = timeSource.currentTimeMillis();
+ counters.headMap(now - sizeInMillis, false).clear();
+ }
+ finally
+ {
+ pruneLock.writeLock().unlock();
+ }
+ }
+
+ @VisibleForTesting
+ public int size()
+ {
+ return counters.values().stream().reduce(new AtomicInteger(), (v1, v2) -> {
+ v1.addAndGet(v2.get());
+ return v1;
+ }).get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/utils/SystemTimeSource.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/SystemTimeSource.java b/src/java/org/apache/cassandra/utils/SystemTimeSource.java
new file mode 100644
index 0000000..fef525e
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/SystemTimeSource.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+
+/**
+ * Time source backed by JVM clock.
+ */
+public class SystemTimeSource implements TimeSource
+{
+ @Override
+ public long currentTimeMillis()
+ {
+ return System.currentTimeMillis();
+ }
+
+ @Override
+ public long nanoTime()
+ {
+ return System.nanoTime();
+ }
+
+ @Override
+ public TimeSource sleepUninterruptibly(long sleepFor, TimeUnit unit)
+ {
+ Uninterruptibles.sleepUninterruptibly(sleepFor, unit);
+ return this;
+ }
+
+ @Override
+ public TimeSource sleep(long sleepFor, TimeUnit unit) throws InterruptedException
+ {
+ TimeUnit.NANOSECONDS.sleep(TimeUnit.NANOSECONDS.convert(sleepFor, unit));
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/utils/TestRateLimiter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/TestRateLimiter.java b/src/java/org/apache/cassandra/utils/TestRateLimiter.java
new file mode 100644
index 0000000..a9eb871
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/TestRateLimiter.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.jboss.byteman.rule.Rule;
+import org.jboss.byteman.rule.helper.Helper;
+
+/**
+ * Helper class to apply rate limiting during fault injection testing;
+ * for an example script, see test/resources/byteman/mutation_limiter.btm.
+ */
+@VisibleForTesting
+public class TestRateLimiter extends Helper
+{
+ private static final AtomicReference<RateLimiter> ref = new AtomicReference<>();
+
+ protected TestRateLimiter(Rule rule)
+ {
+ super(rule);
+ }
+
+ /**
+ * Acquires a single unit at the given rate. If the rate changes between calls, a new rate limiter is created
+ * and the old one is discarded.
+ */
+ public void acquire(double rate)
+ {
+ RateLimiter limiter = ref.get();
+ if (limiter == null || limiter.getRate() != rate)
+ {
+ ref.compareAndSet(limiter, RateLimiter.create(rate));
+ limiter = ref.get();
+ }
+ limiter.acquire(1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/utils/TimeSource.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/TimeSource.java b/src/java/org/apache/cassandra/utils/TimeSource.java
new file mode 100644
index 0000000..5d8acec
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/TimeSource.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.TimeUnit;
+
+public interface TimeSource
+{
+ /**
+ *
+ * @return the current time in milliseconds
+ */
+ long currentTimeMillis();
+
+ /**
+ *
+ * @return Returns the current time value in nanoseconds.
+ *
+ * <p>This method can only be used to measure elapsed time and is
+ * not related to any other notion of system or wall-clock time.
+ */
+ long nanoTime();
+
+ /**
+ * Sleep for the given amount of time uninterruptibly.
+ *
+ * @param sleepFor given amout.
+ * @param unit time unit
+ * @return The time source itself after the given sleep period.
+ */
+ TimeSource sleepUninterruptibly(long sleepFor, TimeUnit unit);
+
+ /**
+ * Sleep for the given amount of time. This operation could interrupted.
+ * Hence after returning from this method, it is not guaranteed
+ * that the request amount of time has passed.
+ *
+ * @param sleepFor given amout.
+ * @param unit time unit
+ * @return The time source itself after the given sleep period.
+ */
+ TimeSource sleep(long sleepFor, TimeUnit unit) throws InterruptedException;
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/utils/concurrent/IntervalLock.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/IntervalLock.java b/src/java/org/apache/cassandra/utils/concurrent/IntervalLock.java
new file mode 100644
index 0000000..382a2dc
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/concurrent/IntervalLock.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.concurrent;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.utils.TimeSource;
+
+/**
+ * This class extends ReentrantReadWriteLock to provide a write lock that can only be acquired at provided intervals.
+ */
+public class IntervalLock extends ReentrantReadWriteLock
+{
+ private final AtomicLong lastAcquire = new AtomicLong();
+ private final TimeSource timeSource;
+
+ public IntervalLock(TimeSource timeSource)
+ {
+ this.timeSource = timeSource;
+ }
+
+ /**
+ * Try acquiring a write lock if the given interval is passed since the last call to this method.
+ *
+ * @param interval In millis.
+ * @return True if acquired and locked, false otherwise.
+ */
+ public boolean tryIntervalLock(long interval)
+ {
+ long now = timeSource.currentTimeMillis();
+ boolean acquired = (now - lastAcquire.get() >= interval) && writeLock().tryLock();
+ if (acquired)
+ lastAcquire.set(now);
+
+ return acquired;
+ }
+
+ /**
+ * Release the last acquired interval lock.
+ */
+ public void releaseIntervalLock()
+ {
+ writeLock().unlock();
+ }
+
+ @VisibleForTesting
+ public long getLastIntervalAcquire()
+ {
+ return lastAcquire.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/test/resources/byteman/mutation_limiter.btm
----------------------------------------------------------------------
diff --git a/test/resources/byteman/mutation_limiter.btm b/test/resources/byteman/mutation_limiter.btm
new file mode 100644
index 0000000..ca936fc
--- /dev/null
+++ b/test/resources/byteman/mutation_limiter.btm
@@ -0,0 +1,8 @@
+RULE mutation_limiter
+CLASS org.apache.cassandra.db.MutationVerbHandler
+METHOD doVerb
+HELPER org.apache.cassandra.utils.TestRateLimiter
+AT ENTRY
+IF TRUE
+DO acquire(1000.0)
+ENDRULE
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index 63c44e0..2dcfbd1 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -101,6 +101,7 @@ public class DatabaseDescriptorRefTest
"org.apache.cassandra.io.util.DiskOptimizationStrategy",
"org.apache.cassandra.locator.SimpleSeedProvider",
"org.apache.cassandra.locator.SeedProvider",
+ "org.apache.cassandra.net.BackPressureStrategy",
"org.apache.cassandra.scheduler.IRequestScheduler",
"org.apache.cassandra.security.EncryptionContext",
"org.apache.cassandra.service.CacheService$CacheType",
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
index 0e0e4ba..2a3ecbe 100644
--- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
+++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
@@ -24,34 +24,53 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Iterables;
+
import com.codahale.metrics.Timer;
import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
import org.caffinitas.ohc.histo.EstimatedHistogram;
+
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-import static org.junit.Assert.*;
-
import org.apache.cassandra.config.DatabaseDescriptor;
+import static org.junit.Assert.*;
+
public class MessagingServiceTest
{
+ private final static long ONE_SECOND = TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS);
+ private final static long[] bucketOffsets = new EstimatedHistogram(160).getBucketOffsets();
+ private final MessagingService messagingService = MessagingService.test();
+
@BeforeClass
- public static void initDD()
+ public static void beforeClass() throws UnknownHostException
{
DatabaseDescriptor.daemonInitialization();
+ DatabaseDescriptor.setBackPressureStrategy(new MockBackPressureStrategy(Collections.emptyMap()));
+ DatabaseDescriptor.setBroadcastAddress(InetAddress.getByName("127.0.0.1"));
}
- private final MessagingService messagingService = MessagingService.test();
- private final static long[] bucketOffsets = new EstimatedHistogram(160).getBucketOffsets();
+ @Before
+ public void before() throws UnknownHostException
+ {
+ MockBackPressureStrategy.applied = false;
+ messagingService.destroyConnectionPool(InetAddress.getByName("127.0.0.2"));
+ messagingService.destroyConnectionPool(InetAddress.getByName("127.0.0.3"));
+ }
@Test
public void testDroppedMessages()
@@ -77,17 +96,6 @@ public class MessagingServiceTest
assertEquals(7500, (int)messagingService.getDroppedMessages().get(verb.toString()));
}
- private static void addDCLatency(long sentAt, long now) throws IOException
- {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try (DataOutputStreamPlus out = new WrappedDataOutputStreamPlus(baos))
- {
- out.writeInt((int) sentAt);
- }
- DataInputStreamPlus in = new DataInputStreamPlus(new ByteArrayInputStream(baos.toByteArray()));
- MessageIn.readTimestamp(InetAddress.getLocalHost(), in, now);
- }
-
@Test
public void testDCLatency() throws Exception
{
@@ -123,4 +131,211 @@ public class MessagingServiceTest
addDCLatency(sentAt, now);
assertNull(dcLatency.get("datacenter1"));
}
+
+ @Test
+ public void testUpdatesBackPressureOnSendWhenEnabledAndWithSupportedCallback() throws UnknownHostException
+ {
+ MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getConnectionPool(InetAddress.getByName("127.0.0.2")).getBackPressureState();
+ IAsyncCallback bpCallback = new BackPressureCallback();
+ IAsyncCallback noCallback = new NoBackPressureCallback();
+ MessageOut<?> ignored = null;
+
+ DatabaseDescriptor.setBackPressureEnabled(true);
+ messagingService.updateBackPressureOnSend(InetAddress.getByName("127.0.0.2"), noCallback, ignored);
+ assertFalse(backPressureState.onSend);
+
+ DatabaseDescriptor.setBackPressureEnabled(false);
+ messagingService.updateBackPressureOnSend(InetAddress.getByName("127.0.0.2"), bpCallback, ignored);
+ assertFalse(backPressureState.onSend);
+
+ DatabaseDescriptor.setBackPressureEnabled(true);
+ messagingService.updateBackPressureOnSend(InetAddress.getByName("127.0.0.2"), bpCallback, ignored);
+ assertTrue(backPressureState.onSend);
+ }
+
+ @Test
+ public void testUpdatesBackPressureOnReceiveWhenEnabledAndWithSupportedCallback() throws UnknownHostException
+ {
+ MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getConnectionPool(InetAddress.getByName("127.0.0.2")).getBackPressureState();
+ IAsyncCallback bpCallback = new BackPressureCallback();
+ IAsyncCallback noCallback = new NoBackPressureCallback();
+ boolean timeout = false;
+
+ DatabaseDescriptor.setBackPressureEnabled(true);
+ messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), noCallback, timeout);
+ assertFalse(backPressureState.onReceive);
+ assertFalse(backPressureState.onTimeout);
+
+ DatabaseDescriptor.setBackPressureEnabled(false);
+ messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), bpCallback, timeout);
+ assertFalse(backPressureState.onReceive);
+ assertFalse(backPressureState.onTimeout);
+
+ DatabaseDescriptor.setBackPressureEnabled(true);
+ messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), bpCallback, timeout);
+ assertTrue(backPressureState.onReceive);
+ assertFalse(backPressureState.onTimeout);
+ }
+
+ @Test
+ public void testUpdatesBackPressureOnTimeoutWhenEnabledAndWithSupportedCallback() throws UnknownHostException
+ {
+ MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getConnectionPool(InetAddress.getByName("127.0.0.2")).getBackPressureState();
+ IAsyncCallback bpCallback = new BackPressureCallback();
+ IAsyncCallback noCallback = new NoBackPressureCallback();
+ boolean timeout = true;
+
+ DatabaseDescriptor.setBackPressureEnabled(true);
+ messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), noCallback, timeout);
+ assertFalse(backPressureState.onReceive);
+ assertFalse(backPressureState.onTimeout);
+
+ DatabaseDescriptor.setBackPressureEnabled(false);
+ messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), bpCallback, timeout);
+ assertFalse(backPressureState.onReceive);
+ assertFalse(backPressureState.onTimeout);
+
+ DatabaseDescriptor.setBackPressureEnabled(true);
+ messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), bpCallback, timeout);
+ assertFalse(backPressureState.onReceive);
+ assertTrue(backPressureState.onTimeout);
+ }
+
+ @Test
+ public void testAppliesBackPressureWhenEnabled() throws UnknownHostException
+ {
+ DatabaseDescriptor.setBackPressureEnabled(false);
+ messagingService.applyBackPressure(Arrays.asList(InetAddress.getByName("127.0.0.2")), ONE_SECOND);
+ assertFalse(MockBackPressureStrategy.applied);
+
+ DatabaseDescriptor.setBackPressureEnabled(true);
+ messagingService.applyBackPressure(Arrays.asList(InetAddress.getByName("127.0.0.2")), ONE_SECOND);
+ assertTrue(MockBackPressureStrategy.applied);
+ }
+
+ @Test
+ public void testDoesntApplyBackPressureToBroadcastAddress() throws UnknownHostException
+ {
+ DatabaseDescriptor.setBackPressureEnabled(true);
+ messagingService.applyBackPressure(Arrays.asList(InetAddress.getByName("127.0.0.1")), ONE_SECOND);
+ assertFalse(MockBackPressureStrategy.applied);
+ }
+
+ private static void addDCLatency(long sentAt, long now) throws IOException
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (DataOutputStreamPlus out = new WrappedDataOutputStreamPlus(baos))
+ {
+ out.writeInt((int) sentAt);
+ }
+ DataInputStreamPlus in = new DataInputStreamPlus(new ByteArrayInputStream(baos.toByteArray()));
+ MessageIn.readTimestamp(InetAddress.getLocalHost(), in, now);
+ }
+
+ public static class MockBackPressureStrategy implements BackPressureStrategy<MockBackPressureStrategy.MockBackPressureState>
+ {
+ public static volatile boolean applied = false;
+
+ public MockBackPressureStrategy(Map<String, Object> args)
+ {
+ }
+
+ @Override
+ public void apply(Set<MockBackPressureState> states, long timeout, TimeUnit unit)
+ {
+ if (!Iterables.isEmpty(states))
+ applied = true;
+ }
+
+ @Override
+ public MockBackPressureState newState(InetAddress host)
+ {
+ return new MockBackPressureState(host);
+ }
+
+ public static class MockBackPressureState implements BackPressureState
+ {
+ private final InetAddress host;
+ public volatile boolean onSend = false;
+ public volatile boolean onReceive = false;
+ public volatile boolean onTimeout = false;
+
+ private MockBackPressureState(InetAddress host)
+ {
+ this.host = host;
+ }
+
+ @Override
+ public void onMessageSent(MessageOut<?> message)
+ {
+ onSend = true;
+ }
+
+ @Override
+ public void onResponseReceived()
+ {
+ onReceive = true;
+ }
+
+ @Override
+ public void onResponseTimeout()
+ {
+ onTimeout = true;
+ }
+
+ @Override
+ public double getBackPressureRateLimit()
+ {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public InetAddress getHost()
+ {
+ return host;
+ }
+ }
+ }
+
+ private static class BackPressureCallback implements IAsyncCallback
+ {
+ @Override
+ public boolean supportsBackPressure()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean isLatencyForSnitch()
+ {
+ return false;
+ }
+
+ @Override
+ public void response(MessageIn msg)
+ {
+ throw new UnsupportedOperationException("Not supported.");
+ }
+ }
+
+ private static class NoBackPressureCallback implements IAsyncCallback
+ {
+ @Override
+ public boolean supportsBackPressure()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isLatencyForSnitch()
+ {
+ return false;
+ }
+
+ @Override
+ public void response(MessageIn msg)
+ {
+ throw new UnsupportedOperationException("Not supported.");
+ }
+ }
}