You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2017/02/13 18:59:49 UTC
[03/10] cassandra git commit: Coalescing strategies improvements
CASSANDRA-13090
Coalescing strategies improvements CASSANDRA-13090
With the previous code TIMEHORIZON and MOVINGAVERAGE
coalesing strategy would wait even when the backlog
still contains data which would make it grow even more.
Also:
- cleanups parkLoop()
- add otc_coalescing_max_coalesced_messages
- add otc_coalescing_enough_coalesced_messages
- add other otc_* settings to cassandra.yaml
patch by Corentin Chary <c....@criteo.com> reviewed by Ariel Weisberg <aw...@apple.com> for CASSANDRA-13090
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5725e2c4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5725e2c4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5725e2c4
Branch: refs/heads/cassandra-3.11
Commit: 5725e2c422d21d8efe5ae3bc4389842939553650
Parents: 7e05f39
Author: Corentin Chary <c....@criteo.com>
Authored: Mon Jan 9 12:06:56 2017 -0500
Committer: Ariel Weisberg <aw...@apple.com>
Committed: Mon Feb 13 12:58:29 2017 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 32 ++++++++++++++++
.../org/apache/cassandra/config/Config.java | 3 +-
.../cassandra/config/DatabaseDescriptor.java | 16 ++++++++
.../cassandra/net/OutboundTcpConnection.java | 4 +-
.../cassandra/utils/CoalescingStrategies.java | 32 ++++++++++++----
.../utils/CoalescingStrategiesTest.java | 40 ++++++++++++++++++++
7 files changed, 119 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 214fe97..4052b0f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.9
+ * Coalescing strategy sleeps too much and shouldn't be enabled by default (CASSANDRA-13090)
* Fix negative mean latency metric (CASSANDRA-12876)
* Use only one file pointer when creating commitlog segments (CASSANDRA-12539)
* Fix speculative retry bugs (CASSANDRA-13009)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index ab2aa14..ddb7927 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -884,3 +884,35 @@ enable_user_defined_functions: false
# below their system default. The sysinternals 'clockres' tool can confirm your system's default
# setting.
windows_timer_interval: 1
+
+# Maximum size of any value in SSTables. Safety measure to detect SSTable corruption
+# early. Any value size larger than this threshold will result into marking an SSTable
+# as corrupted.
+# max_value_size_in_mb: 256
+
+# Coalescing Strategies #
+# Coalescing multiples messages turns out to significantly boost message processing throughput (think doubling or more).
+# On bare metal, the floor for packet processing throughput is high enough that many applications won\u2019t notice, but in
+# virtualized environments, the point at which an application can be bound by network packet processing can be
+# surprisingly low compared to the throughput of task processing that is possible inside a VM. It\u2019s not that bare metal
+# doesn\u2019t benefit from coalescing messages, it\u2019s that the number of packets a bare metal network interface can process
+# is sufficient for many applications such that no load starvation is experienced even without coalescing.
+# There are other benefits to coalescing network messages that are harder to isolate with a simple metric like messages
+# per second. By coalescing multiple tasks together, a network thread can process multiple messages for the cost of one
+# trip to read from a socket, and all the task submission work can be done at the same time reducing context switching
+# and increasing cache friendliness of network message processing.
+# See CASSANDRA-8692 for details.
+
+# Strategy to use for coalescing messages in OutboundTcpConnection.
+# Can be fixed, movingaverage, timehorizon (default), disabled.
+# You can also specify a subclass of CoalescingStrategies.CoalescingStrategy by name.
+# otc_coalescing_strategy: TIMEHORIZON
+
+# How many microseconds to wait for coalescing. For fixed strategy this is the amount of time after the first
+# message is received before it will be sent with any accompanying messages. For moving average this is the
+# maximum amount of time that will be waited as well as the interval at which messages must arrive on average
+# for coalescing to be enabled.
+# otc_coalescing_window_us: 200
+
+# Do not try to coalesce messages if we already got that many messages. This should be more than 2 and less than 128.
+# otc_coalescing_enough_coalesced_messages: 8
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 9f5a22f..64b36a0 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -281,12 +281,13 @@ public class Config
/*
* How many microseconds to wait for coalescing. For fixed strategy this is the amount of time after the first
- * messgae is received before it will be sent with any accompanying messages. For moving average this is the
+ * message is received before it will be sent with any accompanying messages. For moving average this is the
* maximum amount of time that will be waited as well as the interval at which messages must arrive on average
* for coalescing to be enabled.
*/
public static final int otc_coalescing_window_us_default = 200;
public int otc_coalescing_window_us = otc_coalescing_window_us_default;
+ public int otc_coalescing_enough_coalesced_messages = 8;
public int windows_timer_interval = 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 35debd0..981026d 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -689,6 +689,12 @@ public class DatabaseDescriptor
}
if (seedProvider.getSeeds().size() == 0)
throw new ConfigurationException("The seed provider lists no seeds.", false);
+
+ if (conf.otc_coalescing_enough_coalesced_messages > 128)
+ throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be smaller than 128", false);
+
+ if (conf.otc_coalescing_enough_coalesced_messages <= 0)
+ throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be positive", false);
}
private static FileStore guessFileStore(String dir) throws IOException
@@ -1810,6 +1816,16 @@ public class DatabaseDescriptor
return conf.otc_coalescing_window_us;
}
+ public static int getOtcCoalescingEnoughCoalescedMessages()
+ {
+ return conf.otc_coalescing_enough_coalesced_messages;
+ }
+
+ public static void setOtcCoalescingEnoughCoalescedMessages(int otc_coalescing_enough_coalesced_messages)
+ {
+ conf.otc_coalescing_enough_coalesced_messages = otc_coalescing_enough_coalesced_messages;
+ }
+
public static boolean enableUserDefinedFunctions()
{
return conf.enable_user_defined_functions;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 1bf3ea3..6ec78a4 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -81,6 +81,8 @@ public class OutboundTcpConnection extends Thread
private static final String BUFFER_SIZE_PROPERTY = PREFIX + "otc_buffer_size";
private static final int BUFFER_SIZE = Integer.getInteger(BUFFER_SIZE_PROPERTY, 1024 * 64);
+ public static final int MAX_COALESCED_MESSAGES = 128;
+
private static CoalescingStrategy newCoalescingStrategy(String displayName)
{
return CoalescingStrategies.newCoalescingStrategy(DatabaseDescriptor.getOtcCoalescingStrategy(),
@@ -186,7 +188,7 @@ public class OutboundTcpConnection extends Thread
public void run()
{
- final int drainedMessageSize = 128;
+ final int drainedMessageSize = MAX_COALESCED_MESSAGES;
// keeping list (batch) size small for now; that way we don't have an unbounded array (that we never resize)
final List<QueuedMessage> drainedMessages = new ArrayList<>(drainedMessageSize);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
index 826bd64..52d4240 100644
--- a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
+++ b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
@@ -18,8 +18,11 @@
package org.apache.cassandra.utils;
import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.FileUtils;
+
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.RandomAccessFile;
@@ -38,6 +41,7 @@ import com.google.common.base.Preconditions;
public class CoalescingStrategies
{
+ static protected final Logger logger = LoggerFactory.getLogger(CoalescingStrategies.class);
/*
* Log debug information at info level about what the average is and when coalescing is enabled/disabled
@@ -85,15 +89,23 @@ public class CoalescingStrategies
{
long now = System.nanoTime();
final long timer = now + nanos;
+ // We shouldn't loop if it's within a few % of the target sleep time if on a second iteration.
+ // See CASSANDRA-8692.
+ final long limit = timer - nanos / 16;
do
{
LockSupport.parkNanos(timer - now);
+ now = System.nanoTime();
}
- while (timer - (now = System.nanoTime()) > nanos / 16);
+ while (now < limit);
}
private static boolean maybeSleep(int messages, long averageGap, long maxCoalesceWindow, Parker parker)
{
+ // Do not sleep if there are still items in the backlog (CASSANDRA-13090).
+ if (messages >= DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages())
+ return false;
+
// only sleep if we can expect to double the number of messages we're sending in the time interval
long sleep = messages * averageGap;
if (sleep <= 0 || sleep > maxCoalesceWindow)
@@ -329,7 +341,7 @@ public class CoalescingStrategies
if (input.drainTo(out, maxItems) == 0)
{
out.add(input.take());
- input.drainTo(out, maxItems - 1);
+ input.drainTo(out, maxItems - out.size());
}
for (Coalescable qm : out)
@@ -411,15 +423,16 @@ public class CoalescingStrategies
if (input.drainTo(out, maxItems) == 0)
{
out.add(input.take());
+ input.drainTo(out, maxItems - out.size());
}
long average = notifyOfSample(out.get(0).timestampNanos());
-
debugGap(average);
- maybeSleep(out.size(), average, maxCoalesceWindow, parker);
+ if (maybeSleep(out.size(), average, maxCoalesceWindow, parker)) {
+ input.drainTo(out, maxItems - out.size());
+ }
- input.drainTo(out, maxItems - out.size());
for (int ii = 1; ii < out.size(); ii++)
notifyOfSample(out.get(ii).timestampNanos());
}
@@ -447,11 +460,16 @@ public class CoalescingStrategies
@Override
protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException
{
+ int enough = DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages();
+
if (input.drainTo(out, maxItems) == 0)
{
out.add(input.take());
- parker.park(coalesceWindow);
- input.drainTo(out, maxItems - 1);
+ input.drainTo(out, maxItems - out.size());
+ if (out.size() < enough) {
+ parker.park(coalesceWindow);
+ input.drainTo(out, maxItems - out.size());
+ }
}
debugTimestamps(out);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
index 97d15fe..26b6b3a 100644
--- a/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
+++ b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
@@ -17,10 +17,12 @@
*/
package org.apache.cassandra.utils;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.utils.CoalescingStrategies.Clock;
import org.apache.cassandra.utils.CoalescingStrategies.Coalescable;
import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy;
import org.apache.cassandra.utils.CoalescingStrategies.Parker;
+import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
@@ -101,6 +103,12 @@ public class CoalescingStrategiesTest
Semaphore queueParked = new Semaphore(0);
Semaphore queueRelease = new Semaphore(0);
+ @BeforeClass
+ public static void initDD()
+ {
+ DatabaseDescriptor.forceStaticInitialization();
+ }
+
@SuppressWarnings({ "serial" })
@Before
public void setUp() throws Exception
@@ -207,6 +215,38 @@ public class CoalescingStrategiesTest
}
@Test
+ public void testFixedCoalescingStrategyEnough() throws Exception
+ {
+ int oldValue = DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages();
+ DatabaseDescriptor.setOtcCoalescingEnoughCoalescedMessages(1);
+ try {
+ cs = newStrategy("FIXED", 200);
+
+ //Test that when a stream of messages continues arriving it keeps sending until all are drained
+ //It does this because it is already awake and sending messages
+ add(42);
+ add(42);
+ cs.coalesce(input, output, 128);
+ assertEquals(2, output.size());
+ assertNull(parker.parks.poll());
+
+ clear();
+
+ runBlocker(queueParked);
+ add(42);
+ add(42);
+ add(42);
+ release();
+ assertEquals(3, output.size());
+ assertNull(parker.parks.poll());
+ }
+ finally {
+ DatabaseDescriptor.setOtcCoalescingEnoughCoalescedMessages(oldValue);
+ }
+
+ }
+
+ @Test
public void testDisabledCoalescingStrateg() throws Exception
{
cs = newStrategy("DISABLED", 200);