You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2017/02/13 18:59:47 UTC
[01/10] cassandra git commit: Coalescing strategies improvements
CASSANDRA-13090
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.2 7e05f393f -> 5725e2c42
refs/heads/cassandra-3.0 3a570d744 -> 3d01e9061
refs/heads/cassandra-3.11 01b91cce2 -> 702ec088f
refs/heads/trunk ffc82b1ee -> edcbef3e3
Coalescing strategies improvements CASSANDRA-13090
With the previous code TIMEHORIZON and MOVINGAVERAGE
coalesing strategy would wait even when the backlog
still contains data which would make it grow even more.
Also:
- cleanups parkLoop()
- add otc_coalescing_max_coalesced_messages
- add otc_coalescing_enough_coalesced_messages
- add other otc_* settings to cassandra.yaml
patch by Corentin Chary <c....@criteo.com> reviewed by Ariel Weisberg <aw...@apple.com> for CASSANDRA-13090
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5725e2c4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5725e2c4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5725e2c4
Branch: refs/heads/cassandra-2.2
Commit: 5725e2c422d21d8efe5ae3bc4389842939553650
Parents: 7e05f39
Author: Corentin Chary <c....@criteo.com>
Authored: Mon Jan 9 12:06:56 2017 -0500
Committer: Ariel Weisberg <aw...@apple.com>
Committed: Mon Feb 13 12:58:29 2017 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 32 ++++++++++++++++
.../org/apache/cassandra/config/Config.java | 3 +-
.../cassandra/config/DatabaseDescriptor.java | 16 ++++++++
.../cassandra/net/OutboundTcpConnection.java | 4 +-
.../cassandra/utils/CoalescingStrategies.java | 32 ++++++++++++----
.../utils/CoalescingStrategiesTest.java | 40 ++++++++++++++++++++
7 files changed, 119 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 214fe97..4052b0f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.9
+ * Coalescing strategy sleeps too much and shouldn't be enabled by default (CASSANDRA-13090)
* Fix negative mean latency metric (CASSANDRA-12876)
* Use only one file pointer when creating commitlog segments (CASSANDRA-12539)
* Fix speculative retry bugs (CASSANDRA-13009)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index ab2aa14..ddb7927 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -884,3 +884,35 @@ enable_user_defined_functions: false
# below their system default. The sysinternals 'clockres' tool can confirm your system's default
# setting.
windows_timer_interval: 1
+
+# Maximum size of any value in SSTables. Safety measure to detect SSTable corruption
+# early. Any value size larger than this threshold will result into marking an SSTable
+# as corrupted.
+# max_value_size_in_mb: 256
+
+# Coalescing Strategies #
+# Coalescing multiples messages turns out to significantly boost message processing throughput (think doubling or more).
+# On bare metal, the floor for packet processing throughput is high enough that many applications won\u2019t notice, but in
+# virtualized environments, the point at which an application can be bound by network packet processing can be
+# surprisingly low compared to the throughput of task processing that is possible inside a VM. It\u2019s not that bare metal
+# doesn\u2019t benefit from coalescing messages, it\u2019s that the number of packets a bare metal network interface can process
+# is sufficient for many applications such that no load starvation is experienced even without coalescing.
+# There are other benefits to coalescing network messages that are harder to isolate with a simple metric like messages
+# per second. By coalescing multiple tasks together, a network thread can process multiple messages for the cost of one
+# trip to read from a socket, and all the task submission work can be done at the same time reducing context switching
+# and increasing cache friendliness of network message processing.
+# See CASSANDRA-8692 for details.
+
+# Strategy to use for coalescing messages in OutboundTcpConnection.
+# Can be fixed, movingaverage, timehorizon (default), disabled.
+# You can also specify a subclass of CoalescingStrategies.CoalescingStrategy by name.
+# otc_coalescing_strategy: TIMEHORIZON
+
+# How many microseconds to wait for coalescing. For fixed strategy this is the amount of time after the first
+# message is received before it will be sent with any accompanying messages. For moving average this is the
+# maximum amount of time that will be waited as well as the interval at which messages must arrive on average
+# for coalescing to be enabled.
+# otc_coalescing_window_us: 200
+
+# Do not try to coalesce messages if we already got that many messages. This should be more than 2 and less than 128.
+# otc_coalescing_enough_coalesced_messages: 8
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 9f5a22f..64b36a0 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -281,12 +281,13 @@ public class Config
/*
* How many microseconds to wait for coalescing. For fixed strategy this is the amount of time after the first
- * messgae is received before it will be sent with any accompanying messages. For moving average this is the
+ * message is received before it will be sent with any accompanying messages. For moving average this is the
* maximum amount of time that will be waited as well as the interval at which messages must arrive on average
* for coalescing to be enabled.
*/
public static final int otc_coalescing_window_us_default = 200;
public int otc_coalescing_window_us = otc_coalescing_window_us_default;
+ public int otc_coalescing_enough_coalesced_messages = 8;
public int windows_timer_interval = 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 35debd0..981026d 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -689,6 +689,12 @@ public class DatabaseDescriptor
}
if (seedProvider.getSeeds().size() == 0)
throw new ConfigurationException("The seed provider lists no seeds.", false);
+
+ if (conf.otc_coalescing_enough_coalesced_messages > 128)
+ throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be smaller than 128", false);
+
+ if (conf.otc_coalescing_enough_coalesced_messages <= 0)
+ throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be positive", false);
}
private static FileStore guessFileStore(String dir) throws IOException
@@ -1810,6 +1816,16 @@ public class DatabaseDescriptor
return conf.otc_coalescing_window_us;
}
+ public static int getOtcCoalescingEnoughCoalescedMessages()
+ {
+ return conf.otc_coalescing_enough_coalesced_messages;
+ }
+
+ public static void setOtcCoalescingEnoughCoalescedMessages(int otc_coalescing_enough_coalesced_messages)
+ {
+ conf.otc_coalescing_enough_coalesced_messages = otc_coalescing_enough_coalesced_messages;
+ }
+
public static boolean enableUserDefinedFunctions()
{
return conf.enable_user_defined_functions;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 1bf3ea3..6ec78a4 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -81,6 +81,8 @@ public class OutboundTcpConnection extends Thread
private static final String BUFFER_SIZE_PROPERTY = PREFIX + "otc_buffer_size";
private static final int BUFFER_SIZE = Integer.getInteger(BUFFER_SIZE_PROPERTY, 1024 * 64);
+ public static final int MAX_COALESCED_MESSAGES = 128;
+
private static CoalescingStrategy newCoalescingStrategy(String displayName)
{
return CoalescingStrategies.newCoalescingStrategy(DatabaseDescriptor.getOtcCoalescingStrategy(),
@@ -186,7 +188,7 @@ public class OutboundTcpConnection extends Thread
public void run()
{
- final int drainedMessageSize = 128;
+ final int drainedMessageSize = MAX_COALESCED_MESSAGES;
// keeping list (batch) size small for now; that way we don't have an unbounded array (that we never resize)
final List<QueuedMessage> drainedMessages = new ArrayList<>(drainedMessageSize);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
index 826bd64..52d4240 100644
--- a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
+++ b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
@@ -18,8 +18,11 @@
package org.apache.cassandra.utils;
import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.FileUtils;
+
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.RandomAccessFile;
@@ -38,6 +41,7 @@ import com.google.common.base.Preconditions;
public class CoalescingStrategies
{
+ static protected final Logger logger = LoggerFactory.getLogger(CoalescingStrategies.class);
/*
* Log debug information at info level about what the average is and when coalescing is enabled/disabled
@@ -85,15 +89,23 @@ public class CoalescingStrategies
{
long now = System.nanoTime();
final long timer = now + nanos;
+ // We shouldn't loop if it's within a few % of the target sleep time if on a second iteration.
+ // See CASSANDRA-8692.
+ final long limit = timer - nanos / 16;
do
{
LockSupport.parkNanos(timer - now);
+ now = System.nanoTime();
}
- while (timer - (now = System.nanoTime()) > nanos / 16);
+ while (now < limit);
}
private static boolean maybeSleep(int messages, long averageGap, long maxCoalesceWindow, Parker parker)
{
+ // Do not sleep if there are still items in the backlog (CASSANDRA-13090).
+ if (messages >= DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages())
+ return false;
+
// only sleep if we can expect to double the number of messages we're sending in the time interval
long sleep = messages * averageGap;
if (sleep <= 0 || sleep > maxCoalesceWindow)
@@ -329,7 +341,7 @@ public class CoalescingStrategies
if (input.drainTo(out, maxItems) == 0)
{
out.add(input.take());
- input.drainTo(out, maxItems - 1);
+ input.drainTo(out, maxItems - out.size());
}
for (Coalescable qm : out)
@@ -411,15 +423,16 @@ public class CoalescingStrategies
if (input.drainTo(out, maxItems) == 0)
{
out.add(input.take());
+ input.drainTo(out, maxItems - out.size());
}
long average = notifyOfSample(out.get(0).timestampNanos());
-
debugGap(average);
- maybeSleep(out.size(), average, maxCoalesceWindow, parker);
+ if (maybeSleep(out.size(), average, maxCoalesceWindow, parker)) {
+ input.drainTo(out, maxItems - out.size());
+ }
- input.drainTo(out, maxItems - out.size());
for (int ii = 1; ii < out.size(); ii++)
notifyOfSample(out.get(ii).timestampNanos());
}
@@ -447,11 +460,16 @@ public class CoalescingStrategies
@Override
protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException
{
+ int enough = DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages();
+
if (input.drainTo(out, maxItems) == 0)
{
out.add(input.take());
- parker.park(coalesceWindow);
- input.drainTo(out, maxItems - 1);
+ input.drainTo(out, maxItems - out.size());
+ if (out.size() < enough) {
+ parker.park(coalesceWindow);
+ input.drainTo(out, maxItems - out.size());
+ }
}
debugTimestamps(out);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
index 97d15fe..26b6b3a 100644
--- a/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
+++ b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
@@ -17,10 +17,12 @@
*/
package org.apache.cassandra.utils;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.utils.CoalescingStrategies.Clock;
import org.apache.cassandra.utils.CoalescingStrategies.Coalescable;
import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy;
import org.apache.cassandra.utils.CoalescingStrategies.Parker;
+import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
@@ -101,6 +103,12 @@ public class CoalescingStrategiesTest
Semaphore queueParked = new Semaphore(0);
Semaphore queueRelease = new Semaphore(0);
+ @BeforeClass
+ public static void initDD()
+ {
+ DatabaseDescriptor.forceStaticInitialization();
+ }
+
@SuppressWarnings({ "serial" })
@Before
public void setUp() throws Exception
@@ -207,6 +215,38 @@ public class CoalescingStrategiesTest
}
@Test
+ public void testFixedCoalescingStrategyEnough() throws Exception
+ {
+ int oldValue = DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages();
+ DatabaseDescriptor.setOtcCoalescingEnoughCoalescedMessages(1);
+ try {
+ cs = newStrategy("FIXED", 200);
+
+ //Test that when a stream of messages continues arriving it keeps sending until all are drained
+ //It does this because it is already awake and sending messages
+ add(42);
+ add(42);
+ cs.coalesce(input, output, 128);
+ assertEquals(2, output.size());
+ assertNull(parker.parks.poll());
+
+ clear();
+
+ runBlocker(queueParked);
+ add(42);
+ add(42);
+ add(42);
+ release();
+ assertEquals(3, output.size());
+ assertNull(parker.parks.poll());
+ }
+ finally {
+ DatabaseDescriptor.setOtcCoalescingEnoughCoalescedMessages(oldValue);
+ }
+
+ }
+
+ @Test
public void testDisabledCoalescingStrateg() throws Exception
{
cs = newStrategy("DISABLED", 200);
[09/10] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.11
Posted by aw...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/702ec088
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/702ec088
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/702ec088
Branch: refs/heads/trunk
Commit: 702ec088f5f61106b41e128f3fb8f109da8cbe1c
Parents: 01b91cc 3d01e90
Author: Ariel Weisberg <aw...@apple.com>
Authored: Mon Feb 13 13:31:23 2017 -0500
Committer: Ariel Weisberg <aw...@apple.com>
Committed: Mon Feb 13 13:32:30 2017 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 27 +++++++++++++
.../org/apache/cassandra/config/Config.java | 3 +-
.../cassandra/config/DatabaseDescriptor.java | 16 ++++++++
.../cassandra/net/OutboundTcpConnection.java | 4 +-
.../cassandra/utils/CoalescingStrategies.java | 32 ++++++++++++----
.../utils/CoalescingStrategiesTest.java | 40 ++++++++++++++++++++
7 files changed, 114 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 67c45e8,b19550a..8164a52
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -215,20 -100,6 +215,21 @@@ Merged from 3.0
* Correct log message for statistics of offheap memtable flush (CASSANDRA-12776)
* Explicitly set locale for string validation (CASSANDRA-12541,CASSANDRA-12542,CASSANDRA-12543,CASSANDRA-12545)
Merged from 2.2:
++ * Coalescing strategy sleeps too much and shouldn't be enabled by default (CASSANDRA-13090)
+ * Fix speculative retry bugs (CASSANDRA-13009)
+ * Fix handling of nulls and unsets in IN conditions (CASSANDRA-12981)
+ * Fix race causing infinite loop if Thrift server is stopped before it starts listening (CASSANDRA-12856)
+ * CompactionTasks now correctly drops sstables out of compaction when not enough disk space is available (CASSANDRA-12979)
+ * Remove support for non-JavaScript UDFs (CASSANDRA-12883)
+ * Fix DynamicEndpointSnitch noop in multi-datacenter situations (CASSANDRA-13074)
+ * cqlsh copy-from: encode column names to avoid primary key parsing errors (CASSANDRA-12909)
+ * Temporarily fix bug that creates commit log when running offline tools (CASSANDRA-8616)
+ * Reduce granuality of OpOrder.Group during index build (CASSANDRA-12796)
+ * Test bind parameters and unset parameters in InsertUpdateIfConditionTest (CASSANDRA-12980)
+ * Use saved tokens when setting local tokens on StorageService.joinRing (CASSANDRA-12935)
+ * cqlsh: fix DESC TYPES errors (CASSANDRA-12914)
+ * Fix leak on skipped SSTables in sstableupgrade (CASSANDRA-12899)
+ * Avoid blocking gossip during pending range calculation (CASSANDRA-12281)
* Fix purgeability of tombstones with max timestamp (CASSANDRA-12792)
* Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
* cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/conf/cassandra.yaml
----------------------------------------------------------------------
diff --cc conf/cassandra.yaml
index 4436a02,a9d4c01..063a0b7
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@@ -1173,24 -959,29 +1173,51 @@@ gc_warn_threshold_in_ms: 100
# as corrupted.
# max_value_size_in_mb: 256
+# Back-pressure settings #
+# If enabled, the coordinator will apply the back-pressure strategy specified below to each mutation
+# sent to replicas, with the aim of reducing pressure on overloaded replicas.
+back_pressure_enabled: false
+# The back-pressure strategy applied.
+# The default implementation, RateBasedBackPressure, takes three arguments:
+# high ratio, factor, and flow type, and uses the ratio between incoming mutation responses and outgoing mutation requests.
+# If below high ratio, outgoing mutations are rate limited according to the incoming rate decreased by the given factor;
+# if above high ratio, the rate limiting is increased by the given factor;
+# such factor is usually best configured between 1 and 10, use larger values for a faster recovery
+# at the expense of potentially more dropped mutations;
+# the rate limiting is applied according to the flow type: if FAST, it's rate limited at the speed of the fastest replica,
+# if SLOW at the speed of the slowest one.
+# New strategies can be added. Implementors need to implement org.apache.cassandra.net.BackpressureStrategy and
+# provide a public constructor accepting a Map<String, Object>.
+back_pressure_strategy:
+ - class_name: org.apache.cassandra.net.RateBasedBackPressure
+ parameters:
+ - high_ratio: 0.90
+ factor: 5
+ flow: FAST
++
+ # Coalescing Strategies #
+ # Coalescing multiples messages turns out to significantly boost message processing throughput (think doubling or more).
+ # On bare metal, the floor for packet processing throughput is high enough that many applications won\u2019t notice, but in
+ # virtualized environments, the point at which an application can be bound by network packet processing can be
+ # surprisingly low compared to the throughput of task processing that is possible inside a VM. It\u2019s not that bare metal
+ # doesn\u2019t benefit from coalescing messages, it\u2019s that the number of packets a bare metal network interface can process
+ # is sufficient for many applications such that no load starvation is experienced even without coalescing.
+ # There are other benefits to coalescing network messages that are harder to isolate with a simple metric like messages
+ # per second. By coalescing multiple tasks together, a network thread can process multiple messages for the cost of one
+ # trip to read from a socket, and all the task submission work can be done at the same time reducing context switching
+ # and increasing cache friendliness of network message processing.
+ # See CASSANDRA-8692 for details.
+
+ # Strategy to use for coalescing messages in OutboundTcpConnection.
+ # Can be fixed, movingaverage, timehorizon (default), disabled.
+ # You can also specify a subclass of CoalescingStrategies.CoalescingStrategy by name.
+ # otc_coalescing_strategy: TIMEHORIZON
+
+ # How many microseconds to wait for coalescing. For fixed strategy this is the amount of time after the first
+ # message is received before it will be sent with any accompanying messages. For moving average this is the
+ # maximum amount of time that will be waited as well as the interval at which messages must arrive on average
+ # for coalescing to be enabled.
+ # otc_coalescing_window_us: 200
+
+ # Do not try to coalesce messages if we already got that many messages. This should be more than 2 and less than 128.
+ # otc_coalescing_enough_coalesced_messages: 8
http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 4261674,602214f..ce3adfe
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -626,341 -655,45 +626,347 @@@ public class DatabaseDescripto
catch (NumberFormatException e)
{
throw new ConfigurationException("key_cache_size_in_mb option was set incorrectly to '"
- + conf.key_cache_size_in_mb + "', supported values are <integer> >= 0.", false);
+ + conf.key_cache_size_in_mb + "', supported values are <integer> >= 0.", false);
+ }
+
+ try
+ {
+ // if counter_cache_size_in_mb option was set to "auto" then size of the cache should be "min(2.5% of Heap (in MB), 50MB)
+ counterCacheSizeInMB = (conf.counter_cache_size_in_mb == null)
+ ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.025 / 1024 / 1024)), 50)
+ : conf.counter_cache_size_in_mb;
+
+ if (counterCacheSizeInMB < 0)
+ throw new NumberFormatException(); // to escape duplicating error message
+ }
+ catch (NumberFormatException e)
+ {
+ throw new ConfigurationException("counter_cache_size_in_mb option was set incorrectly to '"
+ + conf.counter_cache_size_in_mb + "', supported values are <integer> >= 0.", false);
+ }
+
+ // if set to empty/"auto" then use 5% of Heap size
+ indexSummaryCapacityInMB = (conf.index_summary_capacity_in_mb == null)
+ ? Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024))
+ : conf.index_summary_capacity_in_mb;
+
+ if (indexSummaryCapacityInMB < 0)
+ throw new ConfigurationException("index_summary_capacity_in_mb option was set incorrectly to '"
+ + conf.index_summary_capacity_in_mb + "', it should be a non-negative integer.", false);
+
+ if (conf.index_interval != null)
+ logger.warn("index_interval has been deprecated and should be removed from cassandra.yaml");
+
+ if(conf.encryption_options != null)
+ {
+ logger.warn("Please rename encryption_options as server_encryption_options in the yaml");
+ //operate under the assumption that server_encryption_options is not set in yaml rather than both
+ conf.server_encryption_options = conf.encryption_options;
+ }
+
+ if (conf.user_defined_function_fail_timeout < 0)
+ throw new ConfigurationException("user_defined_function_fail_timeout must not be negative", false);
+ if (conf.user_defined_function_warn_timeout < 0)
+ throw new ConfigurationException("user_defined_function_warn_timeout must not be negative", false);
+
+ if (conf.user_defined_function_fail_timeout < conf.user_defined_function_warn_timeout)
+ throw new ConfigurationException("user_defined_function_warn_timeout must less than user_defined_function_fail_timeout", false);
+
+ if (conf.max_mutation_size_in_kb == null)
+ conf.max_mutation_size_in_kb = conf.commitlog_segment_size_in_mb * 1024 / 2;
+ else if (conf.commitlog_segment_size_in_mb * 1024 < 2 * conf.max_mutation_size_in_kb)
+ throw new ConfigurationException("commitlog_segment_size_in_mb must be at least twice the size of max_mutation_size_in_kb / 1024", false);
+
+ // native transport encryption options
+ if (conf.native_transport_port_ssl != null
+ && conf.native_transport_port_ssl != conf.native_transport_port
+ && !conf.client_encryption_options.enabled)
+ {
+ throw new ConfigurationException("Encryption must be enabled in client_encryption_options for native_transport_port_ssl", false);
+ }
+
+ if (conf.max_value_size_in_mb <= 0)
+ throw new ConfigurationException("max_value_size_in_mb must be positive", false);
+
+ switch (conf.disk_optimization_strategy)
+ {
+ case ssd:
+ diskOptimizationStrategy = new SsdDiskOptimizationStrategy(conf.disk_optimization_page_cross_chance);
+ break;
+ case spinning:
+ diskOptimizationStrategy = new SpinningDiskOptimizationStrategy();
+ break;
+ }
+
+ try
+ {
+ ParameterizedClass strategy = conf.back_pressure_strategy != null ? conf.back_pressure_strategy : RateBasedBackPressure.withDefaultParams();
+ Class<?> clazz = Class.forName(strategy.class_name);
+ if (!BackPressureStrategy.class.isAssignableFrom(clazz))
+ throw new ConfigurationException(strategy + " is not an instance of " + BackPressureStrategy.class.getCanonicalName(), false);
+
+ Constructor<?> ctor = clazz.getConstructor(Map.class);
+ BackPressureStrategy instance = (BackPressureStrategy) ctor.newInstance(strategy.parameters);
+ logger.info("Back-pressure is {} with strategy {}.", backPressureEnabled() ? "enabled" : "disabled", conf.back_pressure_strategy);
+ backPressureStrategy = instance;
+ }
+ catch (ConfigurationException ex)
+ {
+ throw ex;
+ }
+ catch (Exception ex)
+ {
+ throw new ConfigurationException("Error configuring back-pressure strategy: " + conf.back_pressure_strategy, ex);
+ }
++
++ if (conf.otc_coalescing_enough_coalesced_messages > 128)
++ throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be smaller than 128", false);
++
++ if (conf.otc_coalescing_enough_coalesced_messages <= 0)
++ throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be positive", false);
+ }
+
+ private static String storagedirFor(String type)
+ {
+ return storagedir(type + "_directory") + File.separator + type;
+ }
+
+ private static String storagedir(String errMsgType)
+ {
+ String storagedir = System.getProperty(Config.PROPERTY_PREFIX + "storagedir", null);
+ if (storagedir == null)
+ throw new ConfigurationException(errMsgType + " is missing and -Dcassandra.storagedir is not set", false);
+ return storagedir;
+ }
+
+ public static void applyAddressConfig() throws ConfigurationException
+ {
+ applyAddressConfig(conf);
+ }
+
+ public static void applyAddressConfig(Config config) throws ConfigurationException
+ {
+ listenAddress = null;
+ rpcAddress = null;
+ broadcastAddress = null;
+ broadcastRpcAddress = null;
+
+ /* Local IP, hostname or interface to bind services to */
+ if (config.listen_address != null && config.listen_interface != null)
+ {
+ throw new ConfigurationException("Set listen_address OR listen_interface, not both", false);
+ }
+ else if (config.listen_address != null)
+ {
+ try
+ {
+ listenAddress = InetAddress.getByName(config.listen_address);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new ConfigurationException("Unknown listen_address '" + config.listen_address + "'", false);
+ }
+
+ if (listenAddress.isAnyLocalAddress())
+ throw new ConfigurationException("listen_address cannot be a wildcard address (" + config.listen_address + ")!", false);
+ }
+ else if (config.listen_interface != null)
+ {
+ listenAddress = getNetworkInterfaceAddress(config.listen_interface, "listen_interface", config.listen_interface_prefer_ipv6);
+ }
+
+ /* Gossip Address to broadcast */
+ if (config.broadcast_address != null)
+ {
+ try
+ {
+ broadcastAddress = InetAddress.getByName(config.broadcast_address);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new ConfigurationException("Unknown broadcast_address '" + config.broadcast_address + "'", false);
+ }
+
+ if (broadcastAddress.isAnyLocalAddress())
+ throw new ConfigurationException("broadcast_address cannot be a wildcard address (" + config.broadcast_address + ")!", false);
+ }
+
+ /* Local IP, hostname or interface to bind RPC server to */
+ if (config.rpc_address != null && config.rpc_interface != null)
+ {
+ throw new ConfigurationException("Set rpc_address OR rpc_interface, not both", false);
+ }
+ else if (config.rpc_address != null)
+ {
+ try
+ {
+ rpcAddress = InetAddress.getByName(config.rpc_address);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new ConfigurationException("Unknown host in rpc_address " + config.rpc_address, false);
+ }
+ }
+ else if (config.rpc_interface != null)
+ {
+ rpcAddress = getNetworkInterfaceAddress(config.rpc_interface, "rpc_interface", config.rpc_interface_prefer_ipv6);
+ }
+ else
+ {
+ rpcAddress = FBUtilities.getLocalAddress();
+ }
+
+ /* RPC address to broadcast */
+ if (config.broadcast_rpc_address != null)
+ {
+ try
+ {
+ broadcastRpcAddress = InetAddress.getByName(config.broadcast_rpc_address);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new ConfigurationException("Unknown broadcast_rpc_address '" + config.broadcast_rpc_address + "'", false);
+ }
+
+ if (broadcastRpcAddress.isAnyLocalAddress())
+ throw new ConfigurationException("broadcast_rpc_address cannot be a wildcard address (" + config.broadcast_rpc_address + ")!", false);
+ }
+ else
+ {
+ if (rpcAddress.isAnyLocalAddress())
+ throw new ConfigurationException("If rpc_address is set to a wildcard address (" + config.rpc_address + "), then " +
+ "you must set broadcast_rpc_address to a value other than " + config.rpc_address, false);
+ }
+ }
+
+ public static void applyThriftHSHA()
+ {
+ // fail early instead of OOMing (see CASSANDRA-8116)
+ if (ThriftServerType.HSHA.equals(conf.rpc_server_type) && conf.rpc_max_threads == Integer.MAX_VALUE)
+ throw new ConfigurationException("The hsha rpc_server_type is not compatible with an rpc_max_threads " +
+ "setting of 'unlimited'. Please see the comments in cassandra.yaml " +
+ "for rpc_server_type and rpc_max_threads.",
+ false);
+ if (ThriftServerType.HSHA.equals(conf.rpc_server_type) && conf.rpc_max_threads > (FBUtilities.getAvailableProcessors() * 2 + 1024))
+ logger.warn("rpc_max_threads setting of {} may be too high for the hsha server and cause unnecessary thread contention, reducing performance", conf.rpc_max_threads);
+ }
+
+ public static void applyEncryptionContext()
+ {
+ // always attempt to load the cipher factory, as we could be in the situation where the user has disabled encryption,
+ // but has existing commitlogs and sstables on disk that are still encrypted (and still need to be read)
+ encryptionContext = new EncryptionContext(conf.transparent_data_encryption_options);
+ }
+
+ public static void applySeedProvider()
+ {
+ // load the seeds for node contact points
+ if (conf.seed_provider == null)
+ {
+ throw new ConfigurationException("seeds configuration is missing; a minimum of one seed is required.", false);
+ }
+ try
+ {
+ Class<?> seedProviderClass = Class.forName(conf.seed_provider.class_name);
+ seedProvider = (SeedProvider)seedProviderClass.getConstructor(Map.class).newInstance(conf.seed_provider.parameters);
+ }
+ // there are about 5 checked exceptions that could be thrown here.
+ catch (Exception e)
+ {
+ throw new ConfigurationException(e.getMessage() + "\nFatal configuration error; unable to start server. See log for stacktrace.", true);
+ }
+ if (seedProvider.getSeeds().size() == 0)
+ throw new ConfigurationException("The seed provider lists no seeds.", false);
+ }
+
+ public static void applyInitialTokens()
+ {
+ if (conf.initial_token != null)
+ {
+ Collection<String> tokens = tokensFromString(conf.initial_token);
+ if (tokens.size() != conf.num_tokens)
+ throw new ConfigurationException("The number of initial tokens (by initial_token) specified is different from num_tokens value", false);
+
+ for (String token : tokens)
+ partitioner.getTokenFactory().validate(token);
+ }
+ }
+
+ // Maybe safe for clients + tools
+ public static void applyRequestScheduler()
+ {
+ /* Request Scheduler setup */
+ requestSchedulerOptions = conf.request_scheduler_options;
+ if (conf.request_scheduler != null)
+ {
+ try
+ {
+ if (requestSchedulerOptions == null)
+ {
+ requestSchedulerOptions = new RequestSchedulerOptions();
+ }
+ Class<?> cls = Class.forName(conf.request_scheduler);
+ requestScheduler = (IRequestScheduler) cls.getConstructor(RequestSchedulerOptions.class).newInstance(requestSchedulerOptions);
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new ConfigurationException("Invalid Request Scheduler class " + conf.request_scheduler, false);
+ }
+ catch (Exception e)
+ {
+ throw new ConfigurationException("Unable to instantiate request scheduler", e);
+ }
+ }
+ else
+ {
+ requestScheduler = new NoScheduler();
}
- try
+ if (conf.request_scheduler_id == RequestSchedulerId.keyspace)
{
- // if counter_cache_size_in_mb option was set to "auto" then size of the cache should be "min(2.5% of Heap (in MB), 50MB)
- counterCacheSizeInMB = (conf.counter_cache_size_in_mb == null)
- ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.025 / 1024 / 1024)), 50)
- : conf.counter_cache_size_in_mb;
-
- if (counterCacheSizeInMB < 0)
- throw new NumberFormatException(); // to escape duplicating error message
+ requestSchedulerId = conf.request_scheduler_id;
}
- catch (NumberFormatException e)
+ else
{
- throw new ConfigurationException("counter_cache_size_in_mb option was set incorrectly to '"
- + conf.counter_cache_size_in_mb + "', supported values are <integer> >= 0.", false);
+ // Default to Keyspace
+ requestSchedulerId = RequestSchedulerId.keyspace;
}
+ }
- // if set to empty/"auto" then use 5% of Heap size
- indexSummaryCapacityInMB = (conf.index_summary_capacity_in_mb == null)
- ? Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024))
- : conf.index_summary_capacity_in_mb;
-
- if (indexSummaryCapacityInMB < 0)
- throw new ConfigurationException("index_summary_capacity_in_mb option was set incorrectly to '"
- + conf.index_summary_capacity_in_mb + "', it should be a non-negative integer.", false);
-
- if(conf.encryption_options != null)
+ // definitely not safe for tools + clients - implicitly instantiates StorageService
+ public static void applySnitch()
+ {
+ /* end point snitch */
+ if (conf.endpoint_snitch == null)
{
- logger.warn("Please rename encryption_options as server_encryption_options in the yaml");
- //operate under the assumption that server_encryption_options is not set in yaml rather than both
- conf.server_encryption_options = conf.encryption_options;
+ throw new ConfigurationException("Missing endpoint_snitch directive", false);
}
+ snitch = createEndpointSnitch(conf.dynamic_snitch, conf.endpoint_snitch);
+ EndpointSnitchInfo.create();
- // load the seeds for node contact points
- if (conf.seed_provider == null)
+ localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
+ localComparator = new Comparator<InetAddress>()
{
- throw new ConfigurationException("seeds configuration is missing; a minimum of one seed is required.", false);
+ public int compare(InetAddress endpoint1, InetAddress endpoint2)
+ {
+ boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1));
+ boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2));
+ if (local1 && !local2)
+ return -1;
+ if (local2 && !local1)
+ return 1;
+ return 0;
+ }
+ };
+ }
+
+ // definitely not safe for tools + clients - implicitly instantiates schema
+ public static void applyPartitioner()
+ {
+ /* Hashing strategy */
+ if (conf.partitioner == null)
+ {
+ throw new ConfigurationException("Missing directive: partitioner", false);
}
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/CoalescingStrategies.java
index 1a3c13d,d79fa15..9f3b118
--- a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
+++ b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
@@@ -17,10 -17,12 +17,13 @@@
*/
package org.apache.cassandra.utils;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.Config;
+ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.FileUtils;
+
import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.RandomAccessFile;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
index 97d15fe,26b6b3a..b10d70b
--- a/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
+++ b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
@@@ -101,6 -103,12 +103,12 @@@ public class CoalescingStrategiesTes
Semaphore queueParked = new Semaphore(0);
Semaphore queueRelease = new Semaphore(0);
+ @BeforeClass
+ public static void initDD()
+ {
- DatabaseDescriptor.forceStaticInitialization();
++ DatabaseDescriptor.daemonInitialization();
+ }
+
@SuppressWarnings({ "serial" })
@Before
public void setUp() throws Exception
[02/10] cassandra git commit: Coalescing strategies improvements
CASSANDRA-13090
Posted by aw...@apache.org.
Coalescing strategies improvements CASSANDRA-13090
With the previous code TIMEHORIZON and MOVINGAVERAGE
coalesing strategy would wait even when the backlog
still contains data which would make it grow even more.
Also:
- cleanups parkLoop()
- add otc_coalescing_max_coalesced_messages
- add otc_coalescing_enough_coalesced_messages
- add other otc_* settings to cassandra.yaml
patch by Corentin Chary <c....@criteo.com> reviewed by Ariel Weisberg <aw...@apple.com> for CASSANDRA-13090
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5725e2c4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5725e2c4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5725e2c4
Branch: refs/heads/cassandra-3.0
Commit: 5725e2c422d21d8efe5ae3bc4389842939553650
Parents: 7e05f39
Author: Corentin Chary <c....@criteo.com>
Authored: Mon Jan 9 12:06:56 2017 -0500
Committer: Ariel Weisberg <aw...@apple.com>
Committed: Mon Feb 13 12:58:29 2017 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 32 ++++++++++++++++
.../org/apache/cassandra/config/Config.java | 3 +-
.../cassandra/config/DatabaseDescriptor.java | 16 ++++++++
.../cassandra/net/OutboundTcpConnection.java | 4 +-
.../cassandra/utils/CoalescingStrategies.java | 32 ++++++++++++----
.../utils/CoalescingStrategiesTest.java | 40 ++++++++++++++++++++
7 files changed, 119 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 214fe97..4052b0f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.9
+ * Coalescing strategy sleeps too much and shouldn't be enabled by default (CASSANDRA-13090)
* Fix negative mean latency metric (CASSANDRA-12876)
* Use only one file pointer when creating commitlog segments (CASSANDRA-12539)
* Fix speculative retry bugs (CASSANDRA-13009)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index ab2aa14..ddb7927 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -884,3 +884,35 @@ enable_user_defined_functions: false
# below their system default. The sysinternals 'clockres' tool can confirm your system's default
# setting.
windows_timer_interval: 1
+
+# Maximum size of any value in SSTables. Safety measure to detect SSTable corruption
+# early. Any value size larger than this threshold will result into marking an SSTable
+# as corrupted.
+# max_value_size_in_mb: 256
+
+# Coalescing Strategies #
+# Coalescing multiples messages turns out to significantly boost message processing throughput (think doubling or more).
+# On bare metal, the floor for packet processing throughput is high enough that many applications won\u2019t notice, but in
+# virtualized environments, the point at which an application can be bound by network packet processing can be
+# surprisingly low compared to the throughput of task processing that is possible inside a VM. It\u2019s not that bare metal
+# doesn\u2019t benefit from coalescing messages, it\u2019s that the number of packets a bare metal network interface can process
+# is sufficient for many applications such that no load starvation is experienced even without coalescing.
+# There are other benefits to coalescing network messages that are harder to isolate with a simple metric like messages
+# per second. By coalescing multiple tasks together, a network thread can process multiple messages for the cost of one
+# trip to read from a socket, and all the task submission work can be done at the same time reducing context switching
+# and increasing cache friendliness of network message processing.
+# See CASSANDRA-8692 for details.
+
+# Strategy to use for coalescing messages in OutboundTcpConnection.
+# Can be fixed, movingaverage, timehorizon (default), disabled.
+# You can also specify a subclass of CoalescingStrategies.CoalescingStrategy by name.
+# otc_coalescing_strategy: TIMEHORIZON
+
+# How many microseconds to wait for coalescing. For fixed strategy this is the amount of time after the first
+# message is received before it will be sent with any accompanying messages. For moving average this is the
+# maximum amount of time that will be waited as well as the interval at which messages must arrive on average
+# for coalescing to be enabled.
+# otc_coalescing_window_us: 200
+
+# Do not try to coalesce messages if we already got that many messages. This should be more than 2 and less than 128.
+# otc_coalescing_enough_coalesced_messages: 8
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 9f5a22f..64b36a0 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -281,12 +281,13 @@ public class Config
/*
* How many microseconds to wait for coalescing. For fixed strategy this is the amount of time after the first
- * messgae is received before it will be sent with any accompanying messages. For moving average this is the
+ * message is received before it will be sent with any accompanying messages. For moving average this is the
* maximum amount of time that will be waited as well as the interval at which messages must arrive on average
* for coalescing to be enabled.
*/
public static final int otc_coalescing_window_us_default = 200;
public int otc_coalescing_window_us = otc_coalescing_window_us_default;
+ public int otc_coalescing_enough_coalesced_messages = 8;
public int windows_timer_interval = 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 35debd0..981026d 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -689,6 +689,12 @@ public class DatabaseDescriptor
}
if (seedProvider.getSeeds().size() == 0)
throw new ConfigurationException("The seed provider lists no seeds.", false);
+
+ if (conf.otc_coalescing_enough_coalesced_messages > 128)
+ throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be smaller than 128", false);
+
+ if (conf.otc_coalescing_enough_coalesced_messages <= 0)
+ throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be positive", false);
}
private static FileStore guessFileStore(String dir) throws IOException
@@ -1810,6 +1816,16 @@ public class DatabaseDescriptor
return conf.otc_coalescing_window_us;
}
+ public static int getOtcCoalescingEnoughCoalescedMessages()
+ {
+ return conf.otc_coalescing_enough_coalesced_messages;
+ }
+
+ public static void setOtcCoalescingEnoughCoalescedMessages(int otc_coalescing_enough_coalesced_messages)
+ {
+ conf.otc_coalescing_enough_coalesced_messages = otc_coalescing_enough_coalesced_messages;
+ }
+
public static boolean enableUserDefinedFunctions()
{
return conf.enable_user_defined_functions;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 1bf3ea3..6ec78a4 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -81,6 +81,8 @@ public class OutboundTcpConnection extends Thread
private static final String BUFFER_SIZE_PROPERTY = PREFIX + "otc_buffer_size";
private static final int BUFFER_SIZE = Integer.getInteger(BUFFER_SIZE_PROPERTY, 1024 * 64);
+ public static final int MAX_COALESCED_MESSAGES = 128;
+
private static CoalescingStrategy newCoalescingStrategy(String displayName)
{
return CoalescingStrategies.newCoalescingStrategy(DatabaseDescriptor.getOtcCoalescingStrategy(),
@@ -186,7 +188,7 @@ public class OutboundTcpConnection extends Thread
public void run()
{
- final int drainedMessageSize = 128;
+ final int drainedMessageSize = MAX_COALESCED_MESSAGES;
// keeping list (batch) size small for now; that way we don't have an unbounded array (that we never resize)
final List<QueuedMessage> drainedMessages = new ArrayList<>(drainedMessageSize);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
index 826bd64..52d4240 100644
--- a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
+++ b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
@@ -18,8 +18,11 @@
package org.apache.cassandra.utils;
import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.FileUtils;
+
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.RandomAccessFile;
@@ -38,6 +41,7 @@ import com.google.common.base.Preconditions;
public class CoalescingStrategies
{
+ static protected final Logger logger = LoggerFactory.getLogger(CoalescingStrategies.class);
/*
* Log debug information at info level about what the average is and when coalescing is enabled/disabled
@@ -85,15 +89,23 @@ public class CoalescingStrategies
{
long now = System.nanoTime();
final long timer = now + nanos;
+ // We shouldn't loop if it's within a few % of the target sleep time if on a second iteration.
+ // See CASSANDRA-8692.
+ final long limit = timer - nanos / 16;
do
{
LockSupport.parkNanos(timer - now);
+ now = System.nanoTime();
}
- while (timer - (now = System.nanoTime()) > nanos / 16);
+ while (now < limit);
}
private static boolean maybeSleep(int messages, long averageGap, long maxCoalesceWindow, Parker parker)
{
+ // Do not sleep if there are still items in the backlog (CASSANDRA-13090).
+ if (messages >= DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages())
+ return false;
+
// only sleep if we can expect to double the number of messages we're sending in the time interval
long sleep = messages * averageGap;
if (sleep <= 0 || sleep > maxCoalesceWindow)
@@ -329,7 +341,7 @@ public class CoalescingStrategies
if (input.drainTo(out, maxItems) == 0)
{
out.add(input.take());
- input.drainTo(out, maxItems - 1);
+ input.drainTo(out, maxItems - out.size());
}
for (Coalescable qm : out)
@@ -411,15 +423,16 @@ public class CoalescingStrategies
if (input.drainTo(out, maxItems) == 0)
{
out.add(input.take());
+ input.drainTo(out, maxItems - out.size());
}
long average = notifyOfSample(out.get(0).timestampNanos());
-
debugGap(average);
- maybeSleep(out.size(), average, maxCoalesceWindow, parker);
+ if (maybeSleep(out.size(), average, maxCoalesceWindow, parker)) {
+ input.drainTo(out, maxItems - out.size());
+ }
- input.drainTo(out, maxItems - out.size());
for (int ii = 1; ii < out.size(); ii++)
notifyOfSample(out.get(ii).timestampNanos());
}
@@ -447,11 +460,16 @@ public class CoalescingStrategies
@Override
protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException
{
+ int enough = DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages();
+
if (input.drainTo(out, maxItems) == 0)
{
out.add(input.take());
- parker.park(coalesceWindow);
- input.drainTo(out, maxItems - 1);
+ input.drainTo(out, maxItems - out.size());
+ if (out.size() < enough) {
+ parker.park(coalesceWindow);
+ input.drainTo(out, maxItems - out.size());
+ }
}
debugTimestamps(out);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
index 97d15fe..26b6b3a 100644
--- a/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
+++ b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
@@ -17,10 +17,12 @@
*/
package org.apache.cassandra.utils;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.utils.CoalescingStrategies.Clock;
import org.apache.cassandra.utils.CoalescingStrategies.Coalescable;
import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy;
import org.apache.cassandra.utils.CoalescingStrategies.Parker;
+import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
@@ -101,6 +103,12 @@ public class CoalescingStrategiesTest
Semaphore queueParked = new Semaphore(0);
Semaphore queueRelease = new Semaphore(0);
+ @BeforeClass
+ public static void initDD()
+ {
+ DatabaseDescriptor.forceStaticInitialization();
+ }
+
@SuppressWarnings({ "serial" })
@Before
public void setUp() throws Exception
@@ -207,6 +215,38 @@ public class CoalescingStrategiesTest
}
@Test
+ public void testFixedCoalescingStrategyEnough() throws Exception
+ {
+ int oldValue = DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages();
+ DatabaseDescriptor.setOtcCoalescingEnoughCoalescedMessages(1);
+ try {
+ cs = newStrategy("FIXED", 200);
+
+ //Test that when a stream of messages continues arriving it keeps sending until all are drained
+ //It does this because it is already awake and sending messages
+ add(42);
+ add(42);
+ cs.coalesce(input, output, 128);
+ assertEquals(2, output.size());
+ assertNull(parker.parks.poll());
+
+ clear();
+
+ runBlocker(queueParked);
+ add(42);
+ add(42);
+ add(42);
+ release();
+ assertEquals(3, output.size());
+ assertNull(parker.parks.poll());
+ }
+ finally {
+ DatabaseDescriptor.setOtcCoalescingEnoughCoalescedMessages(oldValue);
+ }
+
+ }
+
+ @Test
public void testDisabledCoalescingStrateg() throws Exception
{
cs = newStrategy("DISABLED", 200);
[10/10] cassandra git commit: Merge branch 'cassandra-3.11' into trunk
Posted by aw...@apache.org.
Merge branch 'cassandra-3.11' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/edcbef3e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/edcbef3e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/edcbef3e
Branch: refs/heads/trunk
Commit: edcbef3e343778b4d5affe019f64c89da2a13aa2
Parents: ffc82b1 702ec08
Author: Ariel Weisberg <aw...@apple.com>
Authored: Mon Feb 13 13:33:11 2017 -0500
Committer: Ariel Weisberg <aw...@apple.com>
Committed: Mon Feb 13 13:34:31 2017 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 27 +++++++++++++
.../org/apache/cassandra/config/Config.java | 3 +-
.../cassandra/config/DatabaseDescriptor.java | 16 ++++++++
.../cassandra/net/OutboundTcpConnection.java | 4 +-
.../cassandra/utils/CoalescingStrategies.java | 32 ++++++++++++----
.../utils/CoalescingStrategiesTest.java | 40 ++++++++++++++++++++
7 files changed, 114 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/edcbef3e/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/edcbef3e/conf/cassandra.yaml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/edcbef3e/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/edcbef3e/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/edcbef3e/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 574e9e7,ea64ef4..69480df
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@@ -82,9 -83,8 +82,11 @@@ public class OutboundTcpConnection exte
private static final String BUFFER_SIZE_PROPERTY = PREFIX + "otc_buffer_size";
private static final int BUFFER_SIZE = Integer.getInteger(BUFFER_SIZE_PROPERTY, 1024 * 64);
+ //Size of 3 elements added to every message
+ private static final int PROTOCOL_MAGIC_ID_TIMESTAMP_SIZE = 12;
+
+ public static final int MAX_COALESCED_MESSAGES = 128;
+
private static CoalescingStrategy newCoalescingStrategy(String displayName)
{
return CoalescingStrategies.newCoalescingStrategy(DatabaseDescriptor.getOtcCoalescingStrategy(),
[07/10] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by aw...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3d01e906
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3d01e906
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3d01e906
Branch: refs/heads/trunk
Commit: 3d01e906152f02f826a9663afb4a1232060ad584
Parents: 3a570d7 5725e2c
Author: Ariel Weisberg <aw...@apple.com>
Authored: Mon Feb 13 13:00:20 2017 -0500
Committer: Ariel Weisberg <aw...@apple.com>
Committed: Mon Feb 13 13:23:39 2017 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 27 +++++++++++++
.../org/apache/cassandra/config/Config.java | 3 +-
.../cassandra/config/DatabaseDescriptor.java | 16 ++++++++
.../cassandra/net/OutboundTcpConnection.java | 4 +-
.../cassandra/utils/CoalescingStrategies.java | 32 ++++++++++++----
.../utils/CoalescingStrategiesTest.java | 40 ++++++++++++++++++++
7 files changed, 114 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ac3d1ed,4052b0f..b19550a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,43 -1,5 +1,44 @@@
-2.2.9
+3.0.11
+ * Add vm.max_map_count StartupCheck (CASSANDRA-13008)
+ * Hint related logging should include the IP address of the destination in addition to
+ host ID (CASSANDRA-13205)
+ * Reloading logback.xml does not work (CASSANDRA-13173)
+ * Lightweight transactions temporarily fail after upgrade from 2.1 to 3.0 (CASSANDRA-13109)
+ * Duplicate rows after upgrading from 2.1.16 to 3.0.10/3.9 (CASSANDRA-13125)
+ * Fix UPDATE queries with empty IN restrictions (CASSANDRA-13152)
+ * Abort or retry on failed hints delivery (CASSANDRA-13124)
+ * Fix handling of partition with partition-level deletion plus
+ live rows in sstabledump (CASSANDRA-13177)
+ * Provide user workaround when system_schema.columns does not contain entries
+ for a table that's in system_schema.tables (CASSANDRA-13180)
+ * Dump threads when unit tests time out (CASSANDRA-13117)
+ * Better error when modifying function permissions without explicit keyspace (CASSANDRA-12925)
+ * Indexer is not correctly invoked when building indexes over sstables (CASSANDRA-13075)
+ * Read repair is not blocking repair to finish in foreground repair (CASSANDRA-13115)
+ * Stress daemon help is incorrect (CASSANDRA-12563)
+ * Remove ALTER TYPE support (CASSANDRA-12443)
+ * Fix assertion for certain legacy range tombstone pattern (CASSANDRA-12203)
+ * Set javac encoding to utf-8 (CASSANDRA-11077)
+ * Replace empty strings with null values if they cannot be converted (CASSANDRA-12794)
+ * Fixed flacky SSTableRewriterTest: check file counts before calling validateCFS (CASSANDRA-12348)
+ * Fix deserialization of 2.x DeletedCells (CASSANDRA-12620)
+ * Add parent repair session id to anticompaction log message (CASSANDRA-12186)
+ * Improve contention handling on failure to acquire MV lock for streaming and hints (CASSANDRA-12905)
+ * Fix DELETE and UPDATE queries with empty IN restrictions (CASSANDRA-12829)
+ * Mark MVs as built after successful bootstrap (CASSANDRA-12984)
+ * Estimated TS drop-time histogram updated with Cell.NO_DELETION_TIME (CASSANDRA-13040)
+ * Nodetool compactionstats fails with NullPointerException (CASSANDRA-13021)
+ * Thread local pools never cleaned up (CASSANDRA-13033)
+ * Set RPC_READY to false when draining or if a node is marked as shutdown (CASSANDRA-12781)
+ * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
+ * Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
+ * Nodetool should use a more sane max heap size (CASSANDRA-12739)
+ * LocalToken ensures token values are cloned on heap (CASSANDRA-12651)
+ * AnticompactionRequestSerializer serializedSize is incorrect (CASSANDRA-12934)
+ * Prevent reloading of logback.xml from UDF sandbox (CASSANDRA-12535)
+ * Reenable HeapPool (CASSANDRA-12900)
+Merged from 2.2:
+ * Coalescing strategy sleeps too much and shouldn't be enabled by default (CASSANDRA-13090)
* Fix negative mean latency metric (CASSANDRA-12876)
* Use only one file pointer when creating commitlog segments (CASSANDRA-12539)
* Fix speculative retry bugs (CASSANDRA-13009)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/conf/cassandra.yaml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 91b75ed,981026d..602214f
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -708,29 -690,11 +708,35 @@@ public class DatabaseDescripto
if (seedProvider.getSeeds().size() == 0)
throw new ConfigurationException("The seed provider lists no seeds.", false);
+ if (conf.user_defined_function_fail_timeout < 0)
+ throw new ConfigurationException("user_defined_function_fail_timeout must not be negative", false);
+ if (conf.user_defined_function_warn_timeout < 0)
+ throw new ConfigurationException("user_defined_function_warn_timeout must not be negative", false);
+
+ if (conf.user_defined_function_fail_timeout < conf.user_defined_function_warn_timeout)
+ throw new ConfigurationException("user_defined_function_warn_timeout must less than user_defined_function_fail_timeout", false);
+
+ if (conf.max_mutation_size_in_kb == null)
+ conf.max_mutation_size_in_kb = conf.commitlog_segment_size_in_mb * 1024 / 2;
+ else if (conf.commitlog_segment_size_in_mb * 1024 < 2 * conf.max_mutation_size_in_kb)
+ throw new ConfigurationException("commitlog_segment_size_in_mb must be at least twice the size of max_mutation_size_in_kb / 1024", false);
+
+ // native transport encryption options
+ if (conf.native_transport_port_ssl != null
+ && conf.native_transport_port_ssl.intValue() != conf.native_transport_port.intValue()
+ && !conf.client_encryption_options.enabled)
+ {
+ throw new ConfigurationException("Encryption must be enabled in client_encryption_options for native_transport_port_ssl", false);
+ }
+
+ if (conf.max_value_size_in_mb == null || conf.max_value_size_in_mb <= 0)
+ throw new ConfigurationException("max_value_size_in_mb must be positive", false);
++
+ if (conf.otc_coalescing_enough_coalesced_messages > 128)
+ throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be smaller than 128", false);
+
+ if (conf.otc_coalescing_enough_coalesced_messages <= 0)
+ throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be positive", false);
}
private static FileStore guessFileStore(String dir) throws IOException
@@@ -1951,11 -1816,16 +1957,21 @@@
return conf.otc_coalescing_window_us;
}
+ public static int getOtcCoalescingEnoughCoalescedMessages()
+ {
+ return conf.otc_coalescing_enough_coalesced_messages;
+ }
+
+ public static void setOtcCoalescingEnoughCoalescedMessages(int otc_coalescing_enough_coalesced_messages)
+ {
+ conf.otc_coalescing_enough_coalesced_messages = otc_coalescing_enough_coalesced_messages;
+ }
+
+ public static int getWindowsTimerInterval()
+ {
+ return conf.windows_timer_interval;
+ }
+
public static boolean enableUserDefinedFunctions()
{
return conf.enable_user_defined_functions;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
----------------------------------------------------------------------
[08/10] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.11
Posted by aw...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/702ec088
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/702ec088
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/702ec088
Branch: refs/heads/cassandra-3.11
Commit: 702ec088f5f61106b41e128f3fb8f109da8cbe1c
Parents: 01b91cc 3d01e90
Author: Ariel Weisberg <aw...@apple.com>
Authored: Mon Feb 13 13:31:23 2017 -0500
Committer: Ariel Weisberg <aw...@apple.com>
Committed: Mon Feb 13 13:32:30 2017 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 27 +++++++++++++
.../org/apache/cassandra/config/Config.java | 3 +-
.../cassandra/config/DatabaseDescriptor.java | 16 ++++++++
.../cassandra/net/OutboundTcpConnection.java | 4 +-
.../cassandra/utils/CoalescingStrategies.java | 32 ++++++++++++----
.../utils/CoalescingStrategiesTest.java | 40 ++++++++++++++++++++
7 files changed, 114 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 67c45e8,b19550a..8164a52
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -215,20 -100,6 +215,21 @@@ Merged from 3.0
* Correct log message for statistics of offheap memtable flush (CASSANDRA-12776)
* Explicitly set locale for string validation (CASSANDRA-12541,CASSANDRA-12542,CASSANDRA-12543,CASSANDRA-12545)
Merged from 2.2:
++ * Coalescing strategy sleeps too much and shouldn't be enabled by default (CASSANDRA-13090)
+ * Fix speculative retry bugs (CASSANDRA-13009)
+ * Fix handling of nulls and unsets in IN conditions (CASSANDRA-12981)
+ * Fix race causing infinite loop if Thrift server is stopped before it starts listening (CASSANDRA-12856)
+ * CompactionTasks now correctly drops sstables out of compaction when not enough disk space is available (CASSANDRA-12979)
+ * Remove support for non-JavaScript UDFs (CASSANDRA-12883)
+ * Fix DynamicEndpointSnitch noop in multi-datacenter situations (CASSANDRA-13074)
+ * cqlsh copy-from: encode column names to avoid primary key parsing errors (CASSANDRA-12909)
+ * Temporarily fix bug that creates commit log when running offline tools (CASSANDRA-8616)
+ * Reduce granuality of OpOrder.Group during index build (CASSANDRA-12796)
+ * Test bind parameters and unset parameters in InsertUpdateIfConditionTest (CASSANDRA-12980)
+ * Use saved tokens when setting local tokens on StorageService.joinRing (CASSANDRA-12935)
+ * cqlsh: fix DESC TYPES errors (CASSANDRA-12914)
+ * Fix leak on skipped SSTables in sstableupgrade (CASSANDRA-12899)
+ * Avoid blocking gossip during pending range calculation (CASSANDRA-12281)
* Fix purgeability of tombstones with max timestamp (CASSANDRA-12792)
* Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
* cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/conf/cassandra.yaml
----------------------------------------------------------------------
diff --cc conf/cassandra.yaml
index 4436a02,a9d4c01..063a0b7
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@@ -1173,24 -959,29 +1173,51 @@@ gc_warn_threshold_in_ms: 100
# as corrupted.
# max_value_size_in_mb: 256
+# Back-pressure settings #
+# If enabled, the coordinator will apply the back-pressure strategy specified below to each mutation
+# sent to replicas, with the aim of reducing pressure on overloaded replicas.
+back_pressure_enabled: false
+# The back-pressure strategy applied.
+# The default implementation, RateBasedBackPressure, takes three arguments:
+# high ratio, factor, and flow type, and uses the ratio between incoming mutation responses and outgoing mutation requests.
+# If below high ratio, outgoing mutations are rate limited according to the incoming rate decreased by the given factor;
+# if above high ratio, the rate limiting is increased by the given factor;
+# such factor is usually best configured between 1 and 10, use larger values for a faster recovery
+# at the expense of potentially more dropped mutations;
+# the rate limiting is applied according to the flow type: if FAST, it's rate limited at the speed of the fastest replica,
+# if SLOW at the speed of the slowest one.
+# New strategies can be added. Implementors need to implement org.apache.cassandra.net.BackpressureStrategy and
+# provide a public constructor accepting a Map<String, Object>.
+back_pressure_strategy:
+ - class_name: org.apache.cassandra.net.RateBasedBackPressure
+ parameters:
+ - high_ratio: 0.90
+ factor: 5
+ flow: FAST
++
+ # Coalescing Strategies #
+ # Coalescing multiples messages turns out to significantly boost message processing throughput (think doubling or more).
+ # On bare metal, the floor for packet processing throughput is high enough that many applications won\u2019t notice, but in
+ # virtualized environments, the point at which an application can be bound by network packet processing can be
+ # surprisingly low compared to the throughput of task processing that is possible inside a VM. It\u2019s not that bare metal
+ # doesn\u2019t benefit from coalescing messages, it\u2019s that the number of packets a bare metal network interface can process
+ # is sufficient for many applications such that no load starvation is experienced even without coalescing.
+ # There are other benefits to coalescing network messages that are harder to isolate with a simple metric like messages
+ # per second. By coalescing multiple tasks together, a network thread can process multiple messages for the cost of one
+ # trip to read from a socket, and all the task submission work can be done at the same time reducing context switching
+ # and increasing cache friendliness of network message processing.
+ # See CASSANDRA-8692 for details.
+
+ # Strategy to use for coalescing messages in OutboundTcpConnection.
+ # Can be fixed, movingaverage, timehorizon (default), disabled.
+ # You can also specify a subclass of CoalescingStrategies.CoalescingStrategy by name.
+ # otc_coalescing_strategy: TIMEHORIZON
+
+ # How many microseconds to wait for coalescing. For fixed strategy this is the amount of time after the first
+ # message is received before it will be sent with any accompanying messages. For moving average this is the
+ # maximum amount of time that will be waited as well as the interval at which messages must arrive on average
+ # for coalescing to be enabled.
+ # otc_coalescing_window_us: 200
+
+ # Do not try to coalesce messages if we already got that many messages. This should be more than 2 and less than 128.
+ # otc_coalescing_enough_coalesced_messages: 8
http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 4261674,602214f..ce3adfe
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -626,341 -655,45 +626,347 @@@ public class DatabaseDescripto
catch (NumberFormatException e)
{
throw new ConfigurationException("key_cache_size_in_mb option was set incorrectly to '"
- + conf.key_cache_size_in_mb + "', supported values are <integer> >= 0.", false);
+ + conf.key_cache_size_in_mb + "', supported values are <integer> >= 0.", false);
+ }
+
+ try
+ {
+ // if counter_cache_size_in_mb option was set to "auto" then size of the cache should be "min(2.5% of Heap (in MB), 50MB)
+ counterCacheSizeInMB = (conf.counter_cache_size_in_mb == null)
+ ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.025 / 1024 / 1024)), 50)
+ : conf.counter_cache_size_in_mb;
+
+ if (counterCacheSizeInMB < 0)
+ throw new NumberFormatException(); // to escape duplicating error message
+ }
+ catch (NumberFormatException e)
+ {
+ throw new ConfigurationException("counter_cache_size_in_mb option was set incorrectly to '"
+ + conf.counter_cache_size_in_mb + "', supported values are <integer> >= 0.", false);
+ }
+
+ // if set to empty/"auto" then use 5% of Heap size
+ indexSummaryCapacityInMB = (conf.index_summary_capacity_in_mb == null)
+ ? Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024))
+ : conf.index_summary_capacity_in_mb;
+
+ if (indexSummaryCapacityInMB < 0)
+ throw new ConfigurationException("index_summary_capacity_in_mb option was set incorrectly to '"
+ + conf.index_summary_capacity_in_mb + "', it should be a non-negative integer.", false);
+
+ if (conf.index_interval != null)
+ logger.warn("index_interval has been deprecated and should be removed from cassandra.yaml");
+
+ if(conf.encryption_options != null)
+ {
+ logger.warn("Please rename encryption_options as server_encryption_options in the yaml");
+ //operate under the assumption that server_encryption_options is not set in yaml rather than both
+ conf.server_encryption_options = conf.encryption_options;
+ }
+
+ if (conf.user_defined_function_fail_timeout < 0)
+ throw new ConfigurationException("user_defined_function_fail_timeout must not be negative", false);
+ if (conf.user_defined_function_warn_timeout < 0)
+ throw new ConfigurationException("user_defined_function_warn_timeout must not be negative", false);
+
+ if (conf.user_defined_function_fail_timeout < conf.user_defined_function_warn_timeout)
+ throw new ConfigurationException("user_defined_function_warn_timeout must less than user_defined_function_fail_timeout", false);
+
+ if (conf.max_mutation_size_in_kb == null)
+ conf.max_mutation_size_in_kb = conf.commitlog_segment_size_in_mb * 1024 / 2;
+ else if (conf.commitlog_segment_size_in_mb * 1024 < 2 * conf.max_mutation_size_in_kb)
+ throw new ConfigurationException("commitlog_segment_size_in_mb must be at least twice the size of max_mutation_size_in_kb / 1024", false);
+
+ // native transport encryption options
+ if (conf.native_transport_port_ssl != null
+ && conf.native_transport_port_ssl != conf.native_transport_port
+ && !conf.client_encryption_options.enabled)
+ {
+ throw new ConfigurationException("Encryption must be enabled in client_encryption_options for native_transport_port_ssl", false);
+ }
+
+ if (conf.max_value_size_in_mb <= 0)
+ throw new ConfigurationException("max_value_size_in_mb must be positive", false);
+
+ switch (conf.disk_optimization_strategy)
+ {
+ case ssd:
+ diskOptimizationStrategy = new SsdDiskOptimizationStrategy(conf.disk_optimization_page_cross_chance);
+ break;
+ case spinning:
+ diskOptimizationStrategy = new SpinningDiskOptimizationStrategy();
+ break;
+ }
+
+ try
+ {
+ ParameterizedClass strategy = conf.back_pressure_strategy != null ? conf.back_pressure_strategy : RateBasedBackPressure.withDefaultParams();
+ Class<?> clazz = Class.forName(strategy.class_name);
+ if (!BackPressureStrategy.class.isAssignableFrom(clazz))
+ throw new ConfigurationException(strategy + " is not an instance of " + BackPressureStrategy.class.getCanonicalName(), false);
+
+ Constructor<?> ctor = clazz.getConstructor(Map.class);
+ BackPressureStrategy instance = (BackPressureStrategy) ctor.newInstance(strategy.parameters);
+ logger.info("Back-pressure is {} with strategy {}.", backPressureEnabled() ? "enabled" : "disabled", conf.back_pressure_strategy);
+ backPressureStrategy = instance;
+ }
+ catch (ConfigurationException ex)
+ {
+ throw ex;
+ }
+ catch (Exception ex)
+ {
+ throw new ConfigurationException("Error configuring back-pressure strategy: " + conf.back_pressure_strategy, ex);
+ }
++
++ if (conf.otc_coalescing_enough_coalesced_messages > 128)
++ throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be smaller than 128", false);
++
++ if (conf.otc_coalescing_enough_coalesced_messages <= 0)
++ throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be positive", false);
+ }
+
+ private static String storagedirFor(String type)
+ {
+ return storagedir(type + "_directory") + File.separator + type;
+ }
+
+ private static String storagedir(String errMsgType)
+ {
+ String storagedir = System.getProperty(Config.PROPERTY_PREFIX + "storagedir", null);
+ if (storagedir == null)
+ throw new ConfigurationException(errMsgType + " is missing and -Dcassandra.storagedir is not set", false);
+ return storagedir;
+ }
+
+ public static void applyAddressConfig() throws ConfigurationException
+ {
+ applyAddressConfig(conf);
+ }
+
+ public static void applyAddressConfig(Config config) throws ConfigurationException
+ {
+ listenAddress = null;
+ rpcAddress = null;
+ broadcastAddress = null;
+ broadcastRpcAddress = null;
+
+ /* Local IP, hostname or interface to bind services to */
+ if (config.listen_address != null && config.listen_interface != null)
+ {
+ throw new ConfigurationException("Set listen_address OR listen_interface, not both", false);
+ }
+ else if (config.listen_address != null)
+ {
+ try
+ {
+ listenAddress = InetAddress.getByName(config.listen_address);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new ConfigurationException("Unknown listen_address '" + config.listen_address + "'", false);
+ }
+
+ if (listenAddress.isAnyLocalAddress())
+ throw new ConfigurationException("listen_address cannot be a wildcard address (" + config.listen_address + ")!", false);
+ }
+ else if (config.listen_interface != null)
+ {
+ listenAddress = getNetworkInterfaceAddress(config.listen_interface, "listen_interface", config.listen_interface_prefer_ipv6);
+ }
+
+ /* Gossip Address to broadcast */
+ if (config.broadcast_address != null)
+ {
+ try
+ {
+ broadcastAddress = InetAddress.getByName(config.broadcast_address);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new ConfigurationException("Unknown broadcast_address '" + config.broadcast_address + "'", false);
+ }
+
+ if (broadcastAddress.isAnyLocalAddress())
+ throw new ConfigurationException("broadcast_address cannot be a wildcard address (" + config.broadcast_address + ")!", false);
+ }
+
+ /* Local IP, hostname or interface to bind RPC server to */
+ if (config.rpc_address != null && config.rpc_interface != null)
+ {
+ throw new ConfigurationException("Set rpc_address OR rpc_interface, not both", false);
+ }
+ else if (config.rpc_address != null)
+ {
+ try
+ {
+ rpcAddress = InetAddress.getByName(config.rpc_address);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new ConfigurationException("Unknown host in rpc_address " + config.rpc_address, false);
+ }
+ }
+ else if (config.rpc_interface != null)
+ {
+ rpcAddress = getNetworkInterfaceAddress(config.rpc_interface, "rpc_interface", config.rpc_interface_prefer_ipv6);
+ }
+ else
+ {
+ rpcAddress = FBUtilities.getLocalAddress();
+ }
+
+ /* RPC address to broadcast */
+ if (config.broadcast_rpc_address != null)
+ {
+ try
+ {
+ broadcastRpcAddress = InetAddress.getByName(config.broadcast_rpc_address);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new ConfigurationException("Unknown broadcast_rpc_address '" + config.broadcast_rpc_address + "'", false);
+ }
+
+ if (broadcastRpcAddress.isAnyLocalAddress())
+ throw new ConfigurationException("broadcast_rpc_address cannot be a wildcard address (" + config.broadcast_rpc_address + ")!", false);
+ }
+ else
+ {
+ if (rpcAddress.isAnyLocalAddress())
+ throw new ConfigurationException("If rpc_address is set to a wildcard address (" + config.rpc_address + "), then " +
+ "you must set broadcast_rpc_address to a value other than " + config.rpc_address, false);
+ }
+ }
+
+ public static void applyThriftHSHA()
+ {
+ // fail early instead of OOMing (see CASSANDRA-8116)
+ if (ThriftServerType.HSHA.equals(conf.rpc_server_type) && conf.rpc_max_threads == Integer.MAX_VALUE)
+ throw new ConfigurationException("The hsha rpc_server_type is not compatible with an rpc_max_threads " +
+ "setting of 'unlimited'. Please see the comments in cassandra.yaml " +
+ "for rpc_server_type and rpc_max_threads.",
+ false);
+ if (ThriftServerType.HSHA.equals(conf.rpc_server_type) && conf.rpc_max_threads > (FBUtilities.getAvailableProcessors() * 2 + 1024))
+ logger.warn("rpc_max_threads setting of {} may be too high for the hsha server and cause unnecessary thread contention, reducing performance", conf.rpc_max_threads);
+ }
+
+ public static void applyEncryptionContext()
+ {
+ // always attempt to load the cipher factory, as we could be in the situation where the user has disabled encryption,
+ // but has existing commitlogs and sstables on disk that are still encrypted (and still need to be read)
+ encryptionContext = new EncryptionContext(conf.transparent_data_encryption_options);
+ }
+
+ public static void applySeedProvider()
+ {
+ // load the seeds for node contact points
+ if (conf.seed_provider == null)
+ {
+ throw new ConfigurationException("seeds configuration is missing; a minimum of one seed is required.", false);
+ }
+ try
+ {
+ Class<?> seedProviderClass = Class.forName(conf.seed_provider.class_name);
+ seedProvider = (SeedProvider)seedProviderClass.getConstructor(Map.class).newInstance(conf.seed_provider.parameters);
+ }
+ // there are about 5 checked exceptions that could be thrown here.
+ catch (Exception e)
+ {
+ throw new ConfigurationException(e.getMessage() + "\nFatal configuration error; unable to start server. See log for stacktrace.", true);
+ }
+ if (seedProvider.getSeeds().size() == 0)
+ throw new ConfigurationException("The seed provider lists no seeds.", false);
+ }
+
+ public static void applyInitialTokens()
+ {
+ if (conf.initial_token != null)
+ {
+ Collection<String> tokens = tokensFromString(conf.initial_token);
+ if (tokens.size() != conf.num_tokens)
+ throw new ConfigurationException("The number of initial tokens (by initial_token) specified is different from num_tokens value", false);
+
+ for (String token : tokens)
+ partitioner.getTokenFactory().validate(token);
+ }
+ }
+
+ // Maybe safe for clients + tools
+ public static void applyRequestScheduler()
+ {
+ /* Request Scheduler setup */
+ requestSchedulerOptions = conf.request_scheduler_options;
+ if (conf.request_scheduler != null)
+ {
+ try
+ {
+ if (requestSchedulerOptions == null)
+ {
+ requestSchedulerOptions = new RequestSchedulerOptions();
+ }
+ Class<?> cls = Class.forName(conf.request_scheduler);
+ requestScheduler = (IRequestScheduler) cls.getConstructor(RequestSchedulerOptions.class).newInstance(requestSchedulerOptions);
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new ConfigurationException("Invalid Request Scheduler class " + conf.request_scheduler, false);
+ }
+ catch (Exception e)
+ {
+ throw new ConfigurationException("Unable to instantiate request scheduler", e);
+ }
+ }
+ else
+ {
+ requestScheduler = new NoScheduler();
}
- try
+ if (conf.request_scheduler_id == RequestSchedulerId.keyspace)
{
- // if counter_cache_size_in_mb option was set to "auto" then size of the cache should be "min(2.5% of Heap (in MB), 50MB)
- counterCacheSizeInMB = (conf.counter_cache_size_in_mb == null)
- ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.025 / 1024 / 1024)), 50)
- : conf.counter_cache_size_in_mb;
-
- if (counterCacheSizeInMB < 0)
- throw new NumberFormatException(); // to escape duplicating error message
+ requestSchedulerId = conf.request_scheduler_id;
}
- catch (NumberFormatException e)
+ else
{
- throw new ConfigurationException("counter_cache_size_in_mb option was set incorrectly to '"
- + conf.counter_cache_size_in_mb + "', supported values are <integer> >= 0.", false);
+ // Default to Keyspace
+ requestSchedulerId = RequestSchedulerId.keyspace;
}
+ }
- // if set to empty/"auto" then use 5% of Heap size
- indexSummaryCapacityInMB = (conf.index_summary_capacity_in_mb == null)
- ? Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024))
- : conf.index_summary_capacity_in_mb;
-
- if (indexSummaryCapacityInMB < 0)
- throw new ConfigurationException("index_summary_capacity_in_mb option was set incorrectly to '"
- + conf.index_summary_capacity_in_mb + "', it should be a non-negative integer.", false);
-
- if(conf.encryption_options != null)
+ // definitely not safe for tools + clients - implicitly instantiates StorageService
+ public static void applySnitch()
+ {
+ /* end point snitch */
+ if (conf.endpoint_snitch == null)
{
- logger.warn("Please rename encryption_options as server_encryption_options in the yaml");
- //operate under the assumption that server_encryption_options is not set in yaml rather than both
- conf.server_encryption_options = conf.encryption_options;
+ throw new ConfigurationException("Missing endpoint_snitch directive", false);
}
+ snitch = createEndpointSnitch(conf.dynamic_snitch, conf.endpoint_snitch);
+ EndpointSnitchInfo.create();
- // load the seeds for node contact points
- if (conf.seed_provider == null)
+ localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
+ localComparator = new Comparator<InetAddress>()
{
- throw new ConfigurationException("seeds configuration is missing; a minimum of one seed is required.", false);
+ public int compare(InetAddress endpoint1, InetAddress endpoint2)
+ {
+ boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1));
+ boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2));
+ if (local1 && !local2)
+ return -1;
+ if (local2 && !local1)
+ return 1;
+ return 0;
+ }
+ };
+ }
+
+ // definitely not safe for tools + clients - implicitly instantiates schema
+ public static void applyPartitioner()
+ {
+ /* Hashing strategy */
+ if (conf.partitioner == null)
+ {
+ throw new ConfigurationException("Missing directive: partitioner", false);
}
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/CoalescingStrategies.java
index 1a3c13d,d79fa15..9f3b118
--- a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
+++ b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
@@@ -17,10 -17,12 +17,13 @@@
*/
package org.apache.cassandra.utils;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.Config;
+ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.FileUtils;
+
import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.RandomAccessFile;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
index 97d15fe,26b6b3a..b10d70b
--- a/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
+++ b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
@@@ -101,6 -103,12 +103,12 @@@ public class CoalescingStrategiesTes
Semaphore queueParked = new Semaphore(0);
Semaphore queueRelease = new Semaphore(0);
+ @BeforeClass
+ public static void initDD()
+ {
- DatabaseDescriptor.forceStaticInitialization();
++ DatabaseDescriptor.daemonInitialization();
+ }
+
@SuppressWarnings({ "serial" })
@Before
public void setUp() throws Exception
[05/10] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by aw...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3d01e906
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3d01e906
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3d01e906
Branch: refs/heads/cassandra-3.11
Commit: 3d01e906152f02f826a9663afb4a1232060ad584
Parents: 3a570d7 5725e2c
Author: Ariel Weisberg <aw...@apple.com>
Authored: Mon Feb 13 13:00:20 2017 -0500
Committer: Ariel Weisberg <aw...@apple.com>
Committed: Mon Feb 13 13:23:39 2017 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 27 +++++++++++++
.../org/apache/cassandra/config/Config.java | 3 +-
.../cassandra/config/DatabaseDescriptor.java | 16 ++++++++
.../cassandra/net/OutboundTcpConnection.java | 4 +-
.../cassandra/utils/CoalescingStrategies.java | 32 ++++++++++++----
.../utils/CoalescingStrategiesTest.java | 40 ++++++++++++++++++++
7 files changed, 114 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ac3d1ed,4052b0f..b19550a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,43 -1,5 +1,44 @@@
-2.2.9
+3.0.11
+ * Add vm.max_map_count StartupCheck (CASSANDRA-13008)
+ * Hint related logging should include the IP address of the destination in addition to
+ host ID (CASSANDRA-13205)
+ * Reloading logback.xml does not work (CASSANDRA-13173)
+ * Lightweight transactions temporarily fail after upgrade from 2.1 to 3.0 (CASSANDRA-13109)
+ * Duplicate rows after upgrading from 2.1.16 to 3.0.10/3.9 (CASSANDRA-13125)
+ * Fix UPDATE queries with empty IN restrictions (CASSANDRA-13152)
+ * Abort or retry on failed hints delivery (CASSANDRA-13124)
+ * Fix handling of partition with partition-level deletion plus
+ live rows in sstabledump (CASSANDRA-13177)
+ * Provide user workaround when system_schema.columns does not contain entries
+ for a table that's in system_schema.tables (CASSANDRA-13180)
+ * Dump threads when unit tests time out (CASSANDRA-13117)
+ * Better error when modifying function permissions without explicit keyspace (CASSANDRA-12925)
+ * Indexer is not correctly invoked when building indexes over sstables (CASSANDRA-13075)
+ * Read repair is not blocking repair to finish in foreground repair (CASSANDRA-13115)
+ * Stress daemon help is incorrect (CASSANDRA-12563)
+ * Remove ALTER TYPE support (CASSANDRA-12443)
+ * Fix assertion for certain legacy range tombstone pattern (CASSANDRA-12203)
+ * Set javac encoding to utf-8 (CASSANDRA-11077)
+ * Replace empty strings with null values if they cannot be converted (CASSANDRA-12794)
+ * Fixed flacky SSTableRewriterTest: check file counts before calling validateCFS (CASSANDRA-12348)
+ * Fix deserialization of 2.x DeletedCells (CASSANDRA-12620)
+ * Add parent repair session id to anticompaction log message (CASSANDRA-12186)
+ * Improve contention handling on failure to acquire MV lock for streaming and hints (CASSANDRA-12905)
+ * Fix DELETE and UPDATE queries with empty IN restrictions (CASSANDRA-12829)
+ * Mark MVs as built after successful bootstrap (CASSANDRA-12984)
+ * Estimated TS drop-time histogram updated with Cell.NO_DELETION_TIME (CASSANDRA-13040)
+ * Nodetool compactionstats fails with NullPointerException (CASSANDRA-13021)
+ * Thread local pools never cleaned up (CASSANDRA-13033)
+ * Set RPC_READY to false when draining or if a node is marked as shutdown (CASSANDRA-12781)
+ * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
+ * Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
+ * Nodetool should use a more sane max heap size (CASSANDRA-12739)
+ * LocalToken ensures token values are cloned on heap (CASSANDRA-12651)
+ * AnticompactionRequestSerializer serializedSize is incorrect (CASSANDRA-12934)
+ * Prevent reloading of logback.xml from UDF sandbox (CASSANDRA-12535)
+ * Reenable HeapPool (CASSANDRA-12900)
+Merged from 2.2:
+ * Coalescing strategy sleeps too much and shouldn't be enabled by default (CASSANDRA-13090)
* Fix negative mean latency metric (CASSANDRA-12876)
* Use only one file pointer when creating commitlog segments (CASSANDRA-12539)
* Fix speculative retry bugs (CASSANDRA-13009)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/conf/cassandra.yaml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 91b75ed,981026d..602214f
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -708,29 -690,11 +708,35 @@@ public class DatabaseDescripto
if (seedProvider.getSeeds().size() == 0)
throw new ConfigurationException("The seed provider lists no seeds.", false);
+ if (conf.user_defined_function_fail_timeout < 0)
+ throw new ConfigurationException("user_defined_function_fail_timeout must not be negative", false);
+ if (conf.user_defined_function_warn_timeout < 0)
+ throw new ConfigurationException("user_defined_function_warn_timeout must not be negative", false);
+
+ if (conf.user_defined_function_fail_timeout < conf.user_defined_function_warn_timeout)
+ throw new ConfigurationException("user_defined_function_warn_timeout must less than user_defined_function_fail_timeout", false);
+
+ if (conf.max_mutation_size_in_kb == null)
+ conf.max_mutation_size_in_kb = conf.commitlog_segment_size_in_mb * 1024 / 2;
+ else if (conf.commitlog_segment_size_in_mb * 1024 < 2 * conf.max_mutation_size_in_kb)
+ throw new ConfigurationException("commitlog_segment_size_in_mb must be at least twice the size of max_mutation_size_in_kb / 1024", false);
+
+ // native transport encryption options
+ if (conf.native_transport_port_ssl != null
+ && conf.native_transport_port_ssl.intValue() != conf.native_transport_port.intValue()
+ && !conf.client_encryption_options.enabled)
+ {
+ throw new ConfigurationException("Encryption must be enabled in client_encryption_options for native_transport_port_ssl", false);
+ }
+
+ if (conf.max_value_size_in_mb == null || conf.max_value_size_in_mb <= 0)
+ throw new ConfigurationException("max_value_size_in_mb must be positive", false);
++
+ if (conf.otc_coalescing_enough_coalesced_messages > 128)
+ throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be smaller than 128", false);
+
+ if (conf.otc_coalescing_enough_coalesced_messages <= 0)
+ throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be positive", false);
}
private static FileStore guessFileStore(String dir) throws IOException
@@@ -1951,11 -1816,16 +1957,21 @@@
return conf.otc_coalescing_window_us;
}
+ public static int getOtcCoalescingEnoughCoalescedMessages()
+ {
+ return conf.otc_coalescing_enough_coalesced_messages;
+ }
+
+ public static void setOtcCoalescingEnoughCoalescedMessages(int otc_coalescing_enough_coalesced_messages)
+ {
+ conf.otc_coalescing_enough_coalesced_messages = otc_coalescing_enough_coalesced_messages;
+ }
+
+ public static int getWindowsTimerInterval()
+ {
+ return conf.windows_timer_interval;
+ }
+
public static boolean enableUserDefinedFunctions()
{
return conf.enable_user_defined_functions;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
----------------------------------------------------------------------
[06/10] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by aw...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3d01e906
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3d01e906
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3d01e906
Branch: refs/heads/cassandra-3.0
Commit: 3d01e906152f02f826a9663afb4a1232060ad584
Parents: 3a570d7 5725e2c
Author: Ariel Weisberg <aw...@apple.com>
Authored: Mon Feb 13 13:00:20 2017 -0500
Committer: Ariel Weisberg <aw...@apple.com>
Committed: Mon Feb 13 13:23:39 2017 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 27 +++++++++++++
.../org/apache/cassandra/config/Config.java | 3 +-
.../cassandra/config/DatabaseDescriptor.java | 16 ++++++++
.../cassandra/net/OutboundTcpConnection.java | 4 +-
.../cassandra/utils/CoalescingStrategies.java | 32 ++++++++++++----
.../utils/CoalescingStrategiesTest.java | 40 ++++++++++++++++++++
7 files changed, 114 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ac3d1ed,4052b0f..b19550a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,43 -1,5 +1,44 @@@
-2.2.9
+3.0.11
+ * Add vm.max_map_count StartupCheck (CASSANDRA-13008)
+ * Hint related logging should include the IP address of the destination in addition to
+ host ID (CASSANDRA-13205)
+ * Reloading logback.xml does not work (CASSANDRA-13173)
+ * Lightweight transactions temporarily fail after upgrade from 2.1 to 3.0 (CASSANDRA-13109)
+ * Duplicate rows after upgrading from 2.1.16 to 3.0.10/3.9 (CASSANDRA-13125)
+ * Fix UPDATE queries with empty IN restrictions (CASSANDRA-13152)
+ * Abort or retry on failed hints delivery (CASSANDRA-13124)
+ * Fix handling of partition with partition-level deletion plus
+ live rows in sstabledump (CASSANDRA-13177)
+ * Provide user workaround when system_schema.columns does not contain entries
+ for a table that's in system_schema.tables (CASSANDRA-13180)
+ * Dump threads when unit tests time out (CASSANDRA-13117)
+ * Better error when modifying function permissions without explicit keyspace (CASSANDRA-12925)
+ * Indexer is not correctly invoked when building indexes over sstables (CASSANDRA-13075)
+ * Read repair is not blocking repair to finish in foreground repair (CASSANDRA-13115)
+ * Stress daemon help is incorrect (CASSANDRA-12563)
+ * Remove ALTER TYPE support (CASSANDRA-12443)
+ * Fix assertion for certain legacy range tombstone pattern (CASSANDRA-12203)
+ * Set javac encoding to utf-8 (CASSANDRA-11077)
+ * Replace empty strings with null values if they cannot be converted (CASSANDRA-12794)
+ * Fixed flacky SSTableRewriterTest: check file counts before calling validateCFS (CASSANDRA-12348)
+ * Fix deserialization of 2.x DeletedCells (CASSANDRA-12620)
+ * Add parent repair session id to anticompaction log message (CASSANDRA-12186)
+ * Improve contention handling on failure to acquire MV lock for streaming and hints (CASSANDRA-12905)
+ * Fix DELETE and UPDATE queries with empty IN restrictions (CASSANDRA-12829)
+ * Mark MVs as built after successful bootstrap (CASSANDRA-12984)
+ * Estimated TS drop-time histogram updated with Cell.NO_DELETION_TIME (CASSANDRA-13040)
+ * Nodetool compactionstats fails with NullPointerException (CASSANDRA-13021)
+ * Thread local pools never cleaned up (CASSANDRA-13033)
+ * Set RPC_READY to false when draining or if a node is marked as shutdown (CASSANDRA-12781)
+ * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
+ * Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
+ * Nodetool should use a more sane max heap size (CASSANDRA-12739)
+ * LocalToken ensures token values are cloned on heap (CASSANDRA-12651)
+ * AnticompactionRequestSerializer serializedSize is incorrect (CASSANDRA-12934)
+ * Prevent reloading of logback.xml from UDF sandbox (CASSANDRA-12535)
+ * Reenable HeapPool (CASSANDRA-12900)
+Merged from 2.2:
+ * Coalescing strategy sleeps too much and shouldn't be enabled by default (CASSANDRA-13090)
* Fix negative mean latency metric (CASSANDRA-12876)
* Use only one file pointer when creating commitlog segments (CASSANDRA-12539)
* Fix speculative retry bugs (CASSANDRA-13009)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/conf/cassandra.yaml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 91b75ed,981026d..602214f
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -708,29 -690,11 +708,35 @@@ public class DatabaseDescripto
if (seedProvider.getSeeds().size() == 0)
throw new ConfigurationException("The seed provider lists no seeds.", false);
+ if (conf.user_defined_function_fail_timeout < 0)
+ throw new ConfigurationException("user_defined_function_fail_timeout must not be negative", false);
+ if (conf.user_defined_function_warn_timeout < 0)
+ throw new ConfigurationException("user_defined_function_warn_timeout must not be negative", false);
+
+ if (conf.user_defined_function_fail_timeout < conf.user_defined_function_warn_timeout)
+ throw new ConfigurationException("user_defined_function_warn_timeout must less than user_defined_function_fail_timeout", false);
+
+ if (conf.max_mutation_size_in_kb == null)
+ conf.max_mutation_size_in_kb = conf.commitlog_segment_size_in_mb * 1024 / 2;
+ else if (conf.commitlog_segment_size_in_mb * 1024 < 2 * conf.max_mutation_size_in_kb)
+ throw new ConfigurationException("commitlog_segment_size_in_mb must be at least twice the size of max_mutation_size_in_kb / 1024", false);
+
+ // native transport encryption options
+ if (conf.native_transport_port_ssl != null
+ && conf.native_transport_port_ssl.intValue() != conf.native_transport_port.intValue()
+ && !conf.client_encryption_options.enabled)
+ {
+ throw new ConfigurationException("Encryption must be enabled in client_encryption_options for native_transport_port_ssl", false);
+ }
+
+ if (conf.max_value_size_in_mb == null || conf.max_value_size_in_mb <= 0)
+ throw new ConfigurationException("max_value_size_in_mb must be positive", false);
++
+ if (conf.otc_coalescing_enough_coalesced_messages > 128)
+ throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be smaller than 128", false);
+
+ if (conf.otc_coalescing_enough_coalesced_messages <= 0)
+ throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be positive", false);
}
private static FileStore guessFileStore(String dir) throws IOException
@@@ -1951,11 -1816,16 +1957,21 @@@
return conf.otc_coalescing_window_us;
}
+ public static int getOtcCoalescingEnoughCoalescedMessages()
+ {
+ return conf.otc_coalescing_enough_coalesced_messages;
+ }
+
+ public static void setOtcCoalescingEnoughCoalescedMessages(int otc_coalescing_enough_coalesced_messages)
+ {
+ conf.otc_coalescing_enough_coalesced_messages = otc_coalescing_enough_coalesced_messages;
+ }
+
+ public static int getWindowsTimerInterval()
+ {
+ return conf.windows_timer_interval;
+ }
+
public static boolean enableUserDefinedFunctions()
{
return conf.enable_user_defined_functions;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
----------------------------------------------------------------------
[04/10] cassandra git commit: Coalescing strategies improvements
CASSANDRA-13090
Posted by aw...@apache.org.
Coalescing strategies improvements CASSANDRA-13090
With the previous code TIMEHORIZON and MOVINGAVERAGE
coalesing strategy would wait even when the backlog
still contains data which would make it grow even more.
Also:
- cleanups parkLoop()
- add otc_coalescing_max_coalesced_messages
- add otc_coalescing_enough_coalesced_messages
- add other otc_* settings to cassandra.yaml
patch by Corentin Chary <c....@criteo.com> reviewed by Ariel Weisberg <aw...@apple.com> for CASSANDRA-13090
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5725e2c4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5725e2c4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5725e2c4
Branch: refs/heads/trunk
Commit: 5725e2c422d21d8efe5ae3bc4389842939553650
Parents: 7e05f39
Author: Corentin Chary <c....@criteo.com>
Authored: Mon Jan 9 12:06:56 2017 -0500
Committer: Ariel Weisberg <aw...@apple.com>
Committed: Mon Feb 13 12:58:29 2017 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 32 ++++++++++++++++
.../org/apache/cassandra/config/Config.java | 3 +-
.../cassandra/config/DatabaseDescriptor.java | 16 ++++++++
.../cassandra/net/OutboundTcpConnection.java | 4 +-
.../cassandra/utils/CoalescingStrategies.java | 32 ++++++++++++----
.../utils/CoalescingStrategiesTest.java | 40 ++++++++++++++++++++
7 files changed, 119 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 214fe97..4052b0f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.9
+ * Coalescing strategy sleeps too much and shouldn't be enabled by default (CASSANDRA-13090)
* Fix negative mean latency metric (CASSANDRA-12876)
* Use only one file pointer when creating commitlog segments (CASSANDRA-12539)
* Fix speculative retry bugs (CASSANDRA-13009)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index ab2aa14..ddb7927 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -884,3 +884,35 @@ enable_user_defined_functions: false
# below their system default. The sysinternals 'clockres' tool can confirm your system's default
# setting.
windows_timer_interval: 1
+
+# Maximum size of any value in SSTables. Safety measure to detect SSTable corruption
+# early. Any value size larger than this threshold will result into marking an SSTable
+# as corrupted.
+# max_value_size_in_mb: 256
+
+# Coalescing Strategies #
+# Coalescing multiples messages turns out to significantly boost message processing throughput (think doubling or more).
+# On bare metal, the floor for packet processing throughput is high enough that many applications won\u2019t notice, but in
+# virtualized environments, the point at which an application can be bound by network packet processing can be
+# surprisingly low compared to the throughput of task processing that is possible inside a VM. It\u2019s not that bare metal
+# doesn\u2019t benefit from coalescing messages, it\u2019s that the number of packets a bare metal network interface can process
+# is sufficient for many applications such that no load starvation is experienced even without coalescing.
+# There are other benefits to coalescing network messages that are harder to isolate with a simple metric like messages
+# per second. By coalescing multiple tasks together, a network thread can process multiple messages for the cost of one
+# trip to read from a socket, and all the task submission work can be done at the same time reducing context switching
+# and increasing cache friendliness of network message processing.
+# See CASSANDRA-8692 for details.
+
+# Strategy to use for coalescing messages in OutboundTcpConnection.
+# Can be fixed, movingaverage, timehorizon (default), disabled.
+# You can also specify a subclass of CoalescingStrategies.CoalescingStrategy by name.
+# otc_coalescing_strategy: TIMEHORIZON
+
+# How many microseconds to wait for coalescing. For fixed strategy this is the amount of time after the first
+# message is received before it will be sent with any accompanying messages. For moving average this is the
+# maximum amount of time that will be waited as well as the interval at which messages must arrive on average
+# for coalescing to be enabled.
+# otc_coalescing_window_us: 200
+
+# Do not try to coalesce messages if we already got that many messages. This should be more than 2 and less than 128.
+# otc_coalescing_enough_coalesced_messages: 8
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 9f5a22f..64b36a0 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -281,12 +281,13 @@ public class Config
/*
* How many microseconds to wait for coalescing. For fixed strategy this is the amount of time after the first
- * messgae is received before it will be sent with any accompanying messages. For moving average this is the
+ * message is received before it will be sent with any accompanying messages. For moving average this is the
* maximum amount of time that will be waited as well as the interval at which messages must arrive on average
* for coalescing to be enabled.
*/
public static final int otc_coalescing_window_us_default = 200;
public int otc_coalescing_window_us = otc_coalescing_window_us_default;
+ public int otc_coalescing_enough_coalesced_messages = 8;
public int windows_timer_interval = 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 35debd0..981026d 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -689,6 +689,12 @@ public class DatabaseDescriptor
}
if (seedProvider.getSeeds().size() == 0)
throw new ConfigurationException("The seed provider lists no seeds.", false);
+
+ if (conf.otc_coalescing_enough_coalesced_messages > 128)
+ throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be smaller than 128", false);
+
+ if (conf.otc_coalescing_enough_coalesced_messages <= 0)
+ throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be positive", false);
}
private static FileStore guessFileStore(String dir) throws IOException
@@ -1810,6 +1816,16 @@ public class DatabaseDescriptor
return conf.otc_coalescing_window_us;
}
+ public static int getOtcCoalescingEnoughCoalescedMessages()
+ {
+ return conf.otc_coalescing_enough_coalesced_messages;
+ }
+
+ public static void setOtcCoalescingEnoughCoalescedMessages(int otc_coalescing_enough_coalesced_messages)
+ {
+ conf.otc_coalescing_enough_coalesced_messages = otc_coalescing_enough_coalesced_messages;
+ }
+
public static boolean enableUserDefinedFunctions()
{
return conf.enable_user_defined_functions;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 1bf3ea3..6ec78a4 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -81,6 +81,8 @@ public class OutboundTcpConnection extends Thread
private static final String BUFFER_SIZE_PROPERTY = PREFIX + "otc_buffer_size";
private static final int BUFFER_SIZE = Integer.getInteger(BUFFER_SIZE_PROPERTY, 1024 * 64);
+ public static final int MAX_COALESCED_MESSAGES = 128;
+
private static CoalescingStrategy newCoalescingStrategy(String displayName)
{
return CoalescingStrategies.newCoalescingStrategy(DatabaseDescriptor.getOtcCoalescingStrategy(),
@@ -186,7 +188,7 @@ public class OutboundTcpConnection extends Thread
public void run()
{
- final int drainedMessageSize = 128;
+ final int drainedMessageSize = MAX_COALESCED_MESSAGES;
// keeping list (batch) size small for now; that way we don't have an unbounded array (that we never resize)
final List<QueuedMessage> drainedMessages = new ArrayList<>(drainedMessageSize);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
index 826bd64..52d4240 100644
--- a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
+++ b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
@@ -18,8 +18,11 @@
package org.apache.cassandra.utils;
import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.FileUtils;
+
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.RandomAccessFile;
@@ -38,6 +41,7 @@ import com.google.common.base.Preconditions;
public class CoalescingStrategies
{
+ static protected final Logger logger = LoggerFactory.getLogger(CoalescingStrategies.class);
/*
* Log debug information at info level about what the average is and when coalescing is enabled/disabled
@@ -85,15 +89,23 @@ public class CoalescingStrategies
{
long now = System.nanoTime();
final long timer = now + nanos;
+ // We shouldn't loop if it's within a few % of the target sleep time if on a second iteration.
+ // See CASSANDRA-8692.
+ final long limit = timer - nanos / 16;
do
{
LockSupport.parkNanos(timer - now);
+ now = System.nanoTime();
}
- while (timer - (now = System.nanoTime()) > nanos / 16);
+ while (now < limit);
}
private static boolean maybeSleep(int messages, long averageGap, long maxCoalesceWindow, Parker parker)
{
+ // Do not sleep if there are still items in the backlog (CASSANDRA-13090).
+ if (messages >= DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages())
+ return false;
+
// only sleep if we can expect to double the number of messages we're sending in the time interval
long sleep = messages * averageGap;
if (sleep <= 0 || sleep > maxCoalesceWindow)
@@ -329,7 +341,7 @@ public class CoalescingStrategies
if (input.drainTo(out, maxItems) == 0)
{
out.add(input.take());
- input.drainTo(out, maxItems - 1);
+ input.drainTo(out, maxItems - out.size());
}
for (Coalescable qm : out)
@@ -411,15 +423,16 @@ public class CoalescingStrategies
if (input.drainTo(out, maxItems) == 0)
{
out.add(input.take());
+ input.drainTo(out, maxItems - out.size());
}
long average = notifyOfSample(out.get(0).timestampNanos());
-
debugGap(average);
- maybeSleep(out.size(), average, maxCoalesceWindow, parker);
+ if (maybeSleep(out.size(), average, maxCoalesceWindow, parker)) {
+ input.drainTo(out, maxItems - out.size());
+ }
- input.drainTo(out, maxItems - out.size());
for (int ii = 1; ii < out.size(); ii++)
notifyOfSample(out.get(ii).timestampNanos());
}
@@ -447,11 +460,16 @@ public class CoalescingStrategies
@Override
protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException
{
+ int enough = DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages();
+
if (input.drainTo(out, maxItems) == 0)
{
out.add(input.take());
- parker.park(coalesceWindow);
- input.drainTo(out, maxItems - 1);
+ input.drainTo(out, maxItems - out.size());
+ if (out.size() < enough) {
+ parker.park(coalesceWindow);
+ input.drainTo(out, maxItems - out.size());
+ }
}
debugTimestamps(out);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
index 97d15fe..26b6b3a 100644
--- a/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
+++ b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
@@ -17,10 +17,12 @@
*/
package org.apache.cassandra.utils;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.utils.CoalescingStrategies.Clock;
import org.apache.cassandra.utils.CoalescingStrategies.Coalescable;
import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy;
import org.apache.cassandra.utils.CoalescingStrategies.Parker;
+import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
@@ -101,6 +103,12 @@ public class CoalescingStrategiesTest
Semaphore queueParked = new Semaphore(0);
Semaphore queueRelease = new Semaphore(0);
+ @BeforeClass
+ public static void initDD()
+ {
+ DatabaseDescriptor.forceStaticInitialization();
+ }
+
@SuppressWarnings({ "serial" })
@Before
public void setUp() throws Exception
@@ -207,6 +215,38 @@ public class CoalescingStrategiesTest
}
@Test
+ public void testFixedCoalescingStrategyEnough() throws Exception
+ {
+ int oldValue = DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages();
+ DatabaseDescriptor.setOtcCoalescingEnoughCoalescedMessages(1);
+ try {
+ cs = newStrategy("FIXED", 200);
+
+ //Test that when a stream of messages continues arriving it keeps sending until all are drained
+ //It does this because it is already awake and sending messages
+ add(42);
+ add(42);
+ cs.coalesce(input, output, 128);
+ assertEquals(2, output.size());
+ assertNull(parker.parks.poll());
+
+ clear();
+
+ runBlocker(queueParked);
+ add(42);
+ add(42);
+ add(42);
+ release();
+ assertEquals(3, output.size());
+ assertNull(parker.parks.poll());
+ }
+ finally {
+ DatabaseDescriptor.setOtcCoalescingEnoughCoalescedMessages(oldValue);
+ }
+
+ }
+
+ @Test
public void testDisabledCoalescingStrateg() throws Exception
{
cs = newStrategy("DISABLED", 200);
[03/10] cassandra git commit: Coalescing strategies improvements
CASSANDRA-13090
Posted by aw...@apache.org.
Coalescing strategies improvements CASSANDRA-13090
With the previous code TIMEHORIZON and MOVINGAVERAGE
coalesing strategy would wait even when the backlog
still contains data which would make it grow even more.
Also:
- cleanups parkLoop()
- add otc_coalescing_max_coalesced_messages
- add otc_coalescing_enough_coalesced_messages
- add other otc_* settings to cassandra.yaml
patch by Corentin Chary <c....@criteo.com> reviewed by Ariel Weisberg <aw...@apple.com> for CASSANDRA-13090
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5725e2c4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5725e2c4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5725e2c4
Branch: refs/heads/cassandra-3.11
Commit: 5725e2c422d21d8efe5ae3bc4389842939553650
Parents: 7e05f39
Author: Corentin Chary <c....@criteo.com>
Authored: Mon Jan 9 12:06:56 2017 -0500
Committer: Ariel Weisberg <aw...@apple.com>
Committed: Mon Feb 13 12:58:29 2017 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 32 ++++++++++++++++
.../org/apache/cassandra/config/Config.java | 3 +-
.../cassandra/config/DatabaseDescriptor.java | 16 ++++++++
.../cassandra/net/OutboundTcpConnection.java | 4 +-
.../cassandra/utils/CoalescingStrategies.java | 32 ++++++++++++----
.../utils/CoalescingStrategiesTest.java | 40 ++++++++++++++++++++
7 files changed, 119 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 214fe97..4052b0f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.9
+ * Coalescing strategy sleeps too much and shouldn't be enabled by default (CASSANDRA-13090)
* Fix negative mean latency metric (CASSANDRA-12876)
* Use only one file pointer when creating commitlog segments (CASSANDRA-12539)
* Fix speculative retry bugs (CASSANDRA-13009)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index ab2aa14..ddb7927 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -884,3 +884,35 @@ enable_user_defined_functions: false
# below their system default. The sysinternals 'clockres' tool can confirm your system's default
# setting.
windows_timer_interval: 1
+
+# Maximum size of any value in SSTables. Safety measure to detect SSTable corruption
+# early. Any value size larger than this threshold will result into marking an SSTable
+# as corrupted.
+# max_value_size_in_mb: 256
+
+# Coalescing Strategies #
+# Coalescing multiples messages turns out to significantly boost message processing throughput (think doubling or more).
+# On bare metal, the floor for packet processing throughput is high enough that many applications won\u2019t notice, but in
+# virtualized environments, the point at which an application can be bound by network packet processing can be
+# surprisingly low compared to the throughput of task processing that is possible inside a VM. It\u2019s not that bare metal
+# doesn\u2019t benefit from coalescing messages, it\u2019s that the number of packets a bare metal network interface can process
+# is sufficient for many applications such that no load starvation is experienced even without coalescing.
+# There are other benefits to coalescing network messages that are harder to isolate with a simple metric like messages
+# per second. By coalescing multiple tasks together, a network thread can process multiple messages for the cost of one
+# trip to read from a socket, and all the task submission work can be done at the same time reducing context switching
+# and increasing cache friendliness of network message processing.
+# See CASSANDRA-8692 for details.
+
+# Strategy to use for coalescing messages in OutboundTcpConnection.
+# Can be fixed, movingaverage, timehorizon (default), disabled.
+# You can also specify a subclass of CoalescingStrategies.CoalescingStrategy by name.
+# otc_coalescing_strategy: TIMEHORIZON
+
+# How many microseconds to wait for coalescing. For fixed strategy this is the amount of time after the first
+# message is received before it will be sent with any accompanying messages. For moving average this is the
+# maximum amount of time that will be waited as well as the interval at which messages must arrive on average
+# for coalescing to be enabled.
+# otc_coalescing_window_us: 200
+
+# Do not try to coalesce messages if we already got that many messages. This should be more than 2 and less than 128.
+# otc_coalescing_enough_coalesced_messages: 8
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 9f5a22f..64b36a0 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -281,12 +281,13 @@ public class Config
/*
* How many microseconds to wait for coalescing. For fixed strategy this is the amount of time after the first
- * messgae is received before it will be sent with any accompanying messages. For moving average this is the
+ * message is received before it will be sent with any accompanying messages. For moving average this is the
* maximum amount of time that will be waited as well as the interval at which messages must arrive on average
* for coalescing to be enabled.
*/
public static final int otc_coalescing_window_us_default = 200;
public int otc_coalescing_window_us = otc_coalescing_window_us_default;
+ public int otc_coalescing_enough_coalesced_messages = 8;
public int windows_timer_interval = 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 35debd0..981026d 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -689,6 +689,12 @@ public class DatabaseDescriptor
}
if (seedProvider.getSeeds().size() == 0)
throw new ConfigurationException("The seed provider lists no seeds.", false);
+
+ if (conf.otc_coalescing_enough_coalesced_messages > 128)
+ throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be smaller than 128", false);
+
+ if (conf.otc_coalescing_enough_coalesced_messages <= 0)
+ throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be positive", false);
}
private static FileStore guessFileStore(String dir) throws IOException
@@ -1810,6 +1816,16 @@ public class DatabaseDescriptor
return conf.otc_coalescing_window_us;
}
+ public static int getOtcCoalescingEnoughCoalescedMessages()
+ {
+ return conf.otc_coalescing_enough_coalesced_messages;
+ }
+
+ public static void setOtcCoalescingEnoughCoalescedMessages(int otc_coalescing_enough_coalesced_messages)
+ {
+ conf.otc_coalescing_enough_coalesced_messages = otc_coalescing_enough_coalesced_messages;
+ }
+
public static boolean enableUserDefinedFunctions()
{
return conf.enable_user_defined_functions;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 1bf3ea3..6ec78a4 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -81,6 +81,8 @@ public class OutboundTcpConnection extends Thread
private static final String BUFFER_SIZE_PROPERTY = PREFIX + "otc_buffer_size";
private static final int BUFFER_SIZE = Integer.getInteger(BUFFER_SIZE_PROPERTY, 1024 * 64);
+ public static final int MAX_COALESCED_MESSAGES = 128;
+
private static CoalescingStrategy newCoalescingStrategy(String displayName)
{
return CoalescingStrategies.newCoalescingStrategy(DatabaseDescriptor.getOtcCoalescingStrategy(),
@@ -186,7 +188,7 @@ public class OutboundTcpConnection extends Thread
public void run()
{
- final int drainedMessageSize = 128;
+ final int drainedMessageSize = MAX_COALESCED_MESSAGES;
// keeping list (batch) size small for now; that way we don't have an unbounded array (that we never resize)
final List<QueuedMessage> drainedMessages = new ArrayList<>(drainedMessageSize);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
index 826bd64..52d4240 100644
--- a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
+++ b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
@@ -18,8 +18,11 @@
package org.apache.cassandra.utils;
import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.FileUtils;
+
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.RandomAccessFile;
@@ -38,6 +41,7 @@ import com.google.common.base.Preconditions;
public class CoalescingStrategies
{
+ static protected final Logger logger = LoggerFactory.getLogger(CoalescingStrategies.class);
/*
* Log debug information at info level about what the average is and when coalescing is enabled/disabled
@@ -85,15 +89,23 @@ public class CoalescingStrategies
{
long now = System.nanoTime();
final long timer = now + nanos;
+ // We shouldn't loop if it's within a few % of the target sleep time if on a second iteration.
+ // See CASSANDRA-8692.
+ final long limit = timer - nanos / 16;
do
{
LockSupport.parkNanos(timer - now);
+ now = System.nanoTime();
}
- while (timer - (now = System.nanoTime()) > nanos / 16);
+ while (now < limit);
}
private static boolean maybeSleep(int messages, long averageGap, long maxCoalesceWindow, Parker parker)
{
+ // Do not sleep if there are still items in the backlog (CASSANDRA-13090).
+ if (messages >= DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages())
+ return false;
+
// only sleep if we can expect to double the number of messages we're sending in the time interval
long sleep = messages * averageGap;
if (sleep <= 0 || sleep > maxCoalesceWindow)
@@ -329,7 +341,7 @@ public class CoalescingStrategies
if (input.drainTo(out, maxItems) == 0)
{
out.add(input.take());
- input.drainTo(out, maxItems - 1);
+ input.drainTo(out, maxItems - out.size());
}
for (Coalescable qm : out)
@@ -411,15 +423,16 @@ public class CoalescingStrategies
if (input.drainTo(out, maxItems) == 0)
{
out.add(input.take());
+ input.drainTo(out, maxItems - out.size());
}
long average = notifyOfSample(out.get(0).timestampNanos());
-
debugGap(average);
- maybeSleep(out.size(), average, maxCoalesceWindow, parker);
+ if (maybeSleep(out.size(), average, maxCoalesceWindow, parker)) {
+ input.drainTo(out, maxItems - out.size());
+ }
- input.drainTo(out, maxItems - out.size());
for (int ii = 1; ii < out.size(); ii++)
notifyOfSample(out.get(ii).timestampNanos());
}
@@ -447,11 +460,16 @@ public class CoalescingStrategies
@Override
protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException
{
+ int enough = DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages();
+
if (input.drainTo(out, maxItems) == 0)
{
out.add(input.take());
- parker.park(coalesceWindow);
- input.drainTo(out, maxItems - 1);
+ input.drainTo(out, maxItems - out.size());
+ if (out.size() < enough) {
+ parker.park(coalesceWindow);
+ input.drainTo(out, maxItems - out.size());
+ }
}
debugTimestamps(out);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
index 97d15fe..26b6b3a 100644
--- a/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
+++ b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
@@ -17,10 +17,12 @@
*/
package org.apache.cassandra.utils;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.utils.CoalescingStrategies.Clock;
import org.apache.cassandra.utils.CoalescingStrategies.Coalescable;
import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy;
import org.apache.cassandra.utils.CoalescingStrategies.Parker;
+import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
@@ -101,6 +103,12 @@ public class CoalescingStrategiesTest
Semaphore queueParked = new Semaphore(0);
Semaphore queueRelease = new Semaphore(0);
+ @BeforeClass
+ public static void initDD()
+ {
+ DatabaseDescriptor.forceStaticInitialization();
+ }
+
@SuppressWarnings({ "serial" })
@Before
public void setUp() throws Exception
@@ -207,6 +215,38 @@ public class CoalescingStrategiesTest
}
@Test
+ public void testFixedCoalescingStrategyEnough() throws Exception
+ {
+ int oldValue = DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages();
+ DatabaseDescriptor.setOtcCoalescingEnoughCoalescedMessages(1);
+ try {
+ cs = newStrategy("FIXED", 200);
+
+ //Test that when a stream of messages continues arriving it keeps sending until all are drained
+ //It does this because it is already awake and sending messages
+ add(42);
+ add(42);
+ cs.coalesce(input, output, 128);
+ assertEquals(2, output.size());
+ assertNull(parker.parks.poll());
+
+ clear();
+
+ runBlocker(queueParked);
+ add(42);
+ add(42);
+ add(42);
+ release();
+ assertEquals(3, output.size());
+ assertNull(parker.parks.poll());
+ }
+ finally {
+ DatabaseDescriptor.setOtcCoalescingEnoughCoalescedMessages(oldValue);
+ }
+
+ }
+
+ @Test
public void testDisabledCoalescingStrateg() throws Exception
{
cs = newStrategy("DISABLED", 200);