You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2017/02/13 18:59:47 UTC

[01/10] cassandra git commit: Coalescing strategies improvements CASSANDRA-13090

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 7e05f393f -> 5725e2c42
  refs/heads/cassandra-3.0 3a570d744 -> 3d01e9061
  refs/heads/cassandra-3.11 01b91cce2 -> 702ec088f
  refs/heads/trunk ffc82b1ee -> edcbef3e3


Coalescing strategies improvements CASSANDRA-13090

With the previous code TIMEHORIZON and MOVINGAVERAGE
coalesing strategy would wait even when the backlog
still contains data which would make it grow even more.

Also:
- cleanups parkLoop()
- add otc_coalescing_max_coalesced_messages
- add otc_coalescing_enough_coalesced_messages
- add other otc_* settings to cassandra.yaml

patch by Corentin Chary <c....@criteo.com> reviewed by Ariel Weisberg <aw...@apple.com> for CASSANDRA-13090


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5725e2c4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5725e2c4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5725e2c4

Branch: refs/heads/cassandra-2.2
Commit: 5725e2c422d21d8efe5ae3bc4389842939553650
Parents: 7e05f39
Author: Corentin Chary <c....@criteo.com>
Authored: Mon Jan 9 12:06:56 2017 -0500
Committer: Ariel Weisberg <aw...@apple.com>
Committed: Mon Feb 13 12:58:29 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra.yaml                             | 32 ++++++++++++++++
 .../org/apache/cassandra/config/Config.java     |  3 +-
 .../cassandra/config/DatabaseDescriptor.java    | 16 ++++++++
 .../cassandra/net/OutboundTcpConnection.java    |  4 +-
 .../cassandra/utils/CoalescingStrategies.java   | 32 ++++++++++++----
 .../utils/CoalescingStrategiesTest.java         | 40 ++++++++++++++++++++
 7 files changed, 119 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 214fe97..4052b0f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.9
+ * Coalescing strategy sleeps too much and shouldn't be enabled by default (CASSANDRA-13090)
  * Fix negative mean latency metric (CASSANDRA-12876)
  * Use only one file pointer when creating commitlog segments (CASSANDRA-12539)
  * Fix speculative retry bugs (CASSANDRA-13009)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index ab2aa14..ddb7927 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -884,3 +884,35 @@ enable_user_defined_functions: false
 # below their system default. The sysinternals 'clockres' tool can confirm your system's default
 # setting.
 windows_timer_interval: 1
+
+# Maximum size of any value in SSTables. Safety measure to detect SSTable corruption
+# early. Any value size larger than this threshold will result into marking an SSTable
+# as corrupted.
+# max_value_size_in_mb: 256
+
+# Coalescing Strategies #
+# Coalescing multiples messages turns out to significantly boost message processing throughput (think doubling or more).
+# On bare metal, the floor for packet processing throughput is high enough that many applications won\u2019t notice, but in
+# virtualized environments, the point at which an application can be bound by network packet processing can be
+# surprisingly low compared to the throughput of task processing that is possible inside a VM. It\u2019s not that bare metal
+# doesn\u2019t benefit from coalescing messages, it\u2019s that the number of packets a bare metal network interface can process
+# is sufficient for many applications such that no load starvation is experienced even without coalescing.
+# There are other benefits to coalescing network messages that are harder to isolate with a simple metric like messages
+# per second. By coalescing multiple tasks together, a network thread can process multiple messages for the cost of one
+# trip to read from a socket, and all the task submission work can be done at the same time reducing context switching
+# and increasing cache friendliness of network message processing.
+# See CASSANDRA-8692 for details.
+
+# Strategy to use for coalescing messages in OutboundTcpConnection.
+# Can be fixed, movingaverage, timehorizon (default), disabled.
+# You can also specify a subclass of CoalescingStrategies.CoalescingStrategy by name.
+# otc_coalescing_strategy: TIMEHORIZON
+
+# How many microseconds to wait for coalescing. For fixed strategy this is the amount of time after the first
+# message is received before it will be sent with any accompanying messages. For moving average this is the
+# maximum amount of time that will be waited as well as the interval at which messages must arrive on average
+# for coalescing to be enabled.
+# otc_coalescing_window_us: 200
+
+# Do not try to coalesce messages if we already got that many messages. This should be more than 2 and less than 128.
+# otc_coalescing_enough_coalesced_messages: 8

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 9f5a22f..64b36a0 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -281,12 +281,13 @@ public class Config
 
     /*
      * How many microseconds to wait for coalescing. For fixed strategy this is the amount of time after the first
-     * messgae is received before it will be sent with any accompanying messages. For moving average this is the
+     * message is received before it will be sent with any accompanying messages. For moving average this is the
      * maximum amount of time that will be waited as well as the interval at which messages must arrive on average
      * for coalescing to be enabled.
      */
     public static final int otc_coalescing_window_us_default = 200;
     public int otc_coalescing_window_us = otc_coalescing_window_us_default;
+    public int otc_coalescing_enough_coalesced_messages = 8;
 
     public int windows_timer_interval = 0;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 35debd0..981026d 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -689,6 +689,12 @@ public class DatabaseDescriptor
         }
         if (seedProvider.getSeeds().size() == 0)
             throw new ConfigurationException("The seed provider lists no seeds.", false);
+
+        if (conf.otc_coalescing_enough_coalesced_messages > 128)
+            throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be smaller than 128", false);
+
+        if (conf.otc_coalescing_enough_coalesced_messages <= 0)
+            throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be positive", false);
     }
 
     private static FileStore guessFileStore(String dir) throws IOException
@@ -1810,6 +1816,16 @@ public class DatabaseDescriptor
         return conf.otc_coalescing_window_us;
     }
 
+    public static int getOtcCoalescingEnoughCoalescedMessages()
+    {
+        return conf.otc_coalescing_enough_coalesced_messages;
+    }
+
+    public static void setOtcCoalescingEnoughCoalescedMessages(int otc_coalescing_enough_coalesced_messages)
+    {
+        conf.otc_coalescing_enough_coalesced_messages = otc_coalescing_enough_coalesced_messages;
+    }
+
     public static boolean enableUserDefinedFunctions()
     {
         return conf.enable_user_defined_functions;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 1bf3ea3..6ec78a4 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -81,6 +81,8 @@ public class OutboundTcpConnection extends Thread
     private static final String BUFFER_SIZE_PROPERTY = PREFIX + "otc_buffer_size";
     private static final int BUFFER_SIZE = Integer.getInteger(BUFFER_SIZE_PROPERTY, 1024 * 64);
 
+    public static final int MAX_COALESCED_MESSAGES = 128;
+
     private static CoalescingStrategy newCoalescingStrategy(String displayName)
     {
         return CoalescingStrategies.newCoalescingStrategy(DatabaseDescriptor.getOtcCoalescingStrategy(),
@@ -186,7 +188,7 @@ public class OutboundTcpConnection extends Thread
 
     public void run()
     {
-        final int drainedMessageSize = 128;
+        final int drainedMessageSize = MAX_COALESCED_MESSAGES;
         // keeping list (batch) size small for now; that way we don't have an unbounded array (that we never resize)
         final List<QueuedMessage> drainedMessages = new ArrayList<>(drainedMessageSize);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
index 826bd64..52d4240 100644
--- a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
+++ b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
@@ -18,8 +18,11 @@
 package org.apache.cassandra.utils;
 
 import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.util.FileUtils;
+
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.RandomAccessFile;
@@ -38,6 +41,7 @@ import com.google.common.base.Preconditions;
 
 public class CoalescingStrategies
 {
+    static protected final Logger logger = LoggerFactory.getLogger(CoalescingStrategies.class);
 
     /*
      * Log debug information at info level about what the average is and when coalescing is enabled/disabled
@@ -85,15 +89,23 @@ public class CoalescingStrategies
     {
         long now = System.nanoTime();
         final long timer = now + nanos;
+        // We shouldn't loop if it's within a few % of the target sleep time if on a second iteration.
+        // See CASSANDRA-8692.
+        final long limit = timer - nanos / 16;
         do
         {
             LockSupport.parkNanos(timer - now);
+            now = System.nanoTime();
         }
-        while (timer - (now = System.nanoTime()) > nanos / 16);
+        while (now < limit);
     }
 
     private static boolean maybeSleep(int messages, long averageGap, long maxCoalesceWindow, Parker parker)
     {
+        // Do not sleep if there are still items in the backlog (CASSANDRA-13090).
+        if (messages >= DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages())
+            return false;
+
         // only sleep if we can expect to double the number of messages we're sending in the time interval
         long sleep = messages * averageGap;
         if (sleep <= 0 || sleep > maxCoalesceWindow)
@@ -329,7 +341,7 @@ public class CoalescingStrategies
             if (input.drainTo(out, maxItems) == 0)
             {
                 out.add(input.take());
-                input.drainTo(out, maxItems - 1);
+                input.drainTo(out, maxItems - out.size());
             }
 
             for (Coalescable qm : out)
@@ -411,15 +423,16 @@ public class CoalescingStrategies
             if (input.drainTo(out, maxItems) == 0)
             {
                 out.add(input.take());
+                input.drainTo(out, maxItems - out.size());
             }
 
             long average = notifyOfSample(out.get(0).timestampNanos());
-
             debugGap(average);
 
-            maybeSleep(out.size(), average, maxCoalesceWindow, parker);
+            if (maybeSleep(out.size(), average, maxCoalesceWindow, parker)) {
+                input.drainTo(out, maxItems - out.size());
+            }
 
-            input.drainTo(out, maxItems - out.size());
             for (int ii = 1; ii < out.size(); ii++)
                 notifyOfSample(out.get(ii).timestampNanos());
         }
@@ -447,11 +460,16 @@ public class CoalescingStrategies
         @Override
         protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out,  int maxItems) throws InterruptedException
         {
+            int enough = DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages();
+
             if (input.drainTo(out, maxItems) == 0)
             {
                 out.add(input.take());
-                parker.park(coalesceWindow);
-                input.drainTo(out, maxItems - 1);
+                input.drainTo(out, maxItems - out.size());
+                if (out.size() < enough) {
+                    parker.park(coalesceWindow);
+                    input.drainTo(out, maxItems - out.size());
+                }
             }
             debugTimestamps(out);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
index 97d15fe..26b6b3a 100644
--- a/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
+++ b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
@@ -17,10 +17,12 @@
  */
 package org.apache.cassandra.utils;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.utils.CoalescingStrategies.Clock;
 import org.apache.cassandra.utils.CoalescingStrategies.Coalescable;
 import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy;
 import org.apache.cassandra.utils.CoalescingStrategies.Parker;
+import org.junit.BeforeClass;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -101,6 +103,12 @@ public class CoalescingStrategiesTest
     Semaphore queueParked = new Semaphore(0);
     Semaphore queueRelease = new Semaphore(0);
 
+    @BeforeClass
+    public static void initDD()
+    {
+        DatabaseDescriptor.forceStaticInitialization();
+    }
+
     @SuppressWarnings({ "serial" })
     @Before
     public void setUp() throws Exception
@@ -207,6 +215,38 @@ public class CoalescingStrategiesTest
     }
 
     @Test
+    public void testFixedCoalescingStrategyEnough() throws Exception
+    {
+        int oldValue = DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages();
+        DatabaseDescriptor.setOtcCoalescingEnoughCoalescedMessages(1);
+        try {
+            cs = newStrategy("FIXED", 200);
+
+            //Test that when a stream of messages continues arriving it keeps sending until all are drained
+            //It does this because it is already awake and sending messages
+            add(42);
+            add(42);
+            cs.coalesce(input, output, 128);
+            assertEquals(2, output.size());
+            assertNull(parker.parks.poll());
+
+            clear();
+
+            runBlocker(queueParked);
+            add(42);
+            add(42);
+            add(42);
+            release();
+            assertEquals(3, output.size());
+            assertNull(parker.parks.poll());
+        }
+        finally {
+            DatabaseDescriptor.setOtcCoalescingEnoughCoalescedMessages(oldValue);
+        }
+
+    }
+
+    @Test
     public void testDisabledCoalescingStrateg() throws Exception
     {
         cs = newStrategy("DISABLED", 200);


[09/10] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by aw...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/702ec088
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/702ec088
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/702ec088

Branch: refs/heads/trunk
Commit: 702ec088f5f61106b41e128f3fb8f109da8cbe1c
Parents: 01b91cc 3d01e90
Author: Ariel Weisberg <aw...@apple.com>
Authored: Mon Feb 13 13:31:23 2017 -0500
Committer: Ariel Weisberg <aw...@apple.com>
Committed: Mon Feb 13 13:32:30 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra.yaml                             | 27 +++++++++++++
 .../org/apache/cassandra/config/Config.java     |  3 +-
 .../cassandra/config/DatabaseDescriptor.java    | 16 ++++++++
 .../cassandra/net/OutboundTcpConnection.java    |  4 +-
 .../cassandra/utils/CoalescingStrategies.java   | 32 ++++++++++++----
 .../utils/CoalescingStrategiesTest.java         | 40 ++++++++++++++++++++
 7 files changed, 114 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 67c45e8,b19550a..8164a52
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -215,20 -100,6 +215,21 @@@ Merged from 3.0
   * Correct log message for statistics of offheap memtable flush (CASSANDRA-12776)
   * Explicitly set locale for string validation (CASSANDRA-12541,CASSANDRA-12542,CASSANDRA-12543,CASSANDRA-12545)
  Merged from 2.2:
++ * Coalescing strategy sleeps too much and shouldn't be enabled by default (CASSANDRA-13090)
 + * Fix speculative retry bugs (CASSANDRA-13009)
 + * Fix handling of nulls and unsets in IN conditions (CASSANDRA-12981)
 + * Fix race causing infinite loop if Thrift server is stopped before it starts listening (CASSANDRA-12856)
 + * CompactionTasks now correctly drops sstables out of compaction when not enough disk space is available (CASSANDRA-12979)
 + * Remove support for non-JavaScript UDFs (CASSANDRA-12883)
 + * Fix DynamicEndpointSnitch noop in multi-datacenter situations (CASSANDRA-13074)
 + * cqlsh copy-from: encode column names to avoid primary key parsing errors (CASSANDRA-12909)
 + * Temporarily fix bug that creates commit log when running offline tools (CASSANDRA-8616)
 + * Reduce granuality of OpOrder.Group during index build (CASSANDRA-12796)
 + * Test bind parameters and unset parameters in InsertUpdateIfConditionTest (CASSANDRA-12980)
 + * Use saved tokens when setting local tokens on StorageService.joinRing (CASSANDRA-12935)
 + * cqlsh: fix DESC TYPES errors (CASSANDRA-12914)
 + * Fix leak on skipped SSTables in sstableupgrade (CASSANDRA-12899)
 + * Avoid blocking gossip during pending range calculation (CASSANDRA-12281)
   * Fix purgeability of tombstones with max timestamp (CASSANDRA-12792)
   * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
   * cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/conf/cassandra.yaml
----------------------------------------------------------------------
diff --cc conf/cassandra.yaml
index 4436a02,a9d4c01..063a0b7
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@@ -1173,24 -959,29 +1173,51 @@@ gc_warn_threshold_in_ms: 100
  # as corrupted.
  # max_value_size_in_mb: 256
  
 +# Back-pressure settings #
 +# If enabled, the coordinator will apply the back-pressure strategy specified below to each mutation
 +# sent to replicas, with the aim of reducing pressure on overloaded replicas.
 +back_pressure_enabled: false
 +# The back-pressure strategy applied.
 +# The default implementation, RateBasedBackPressure, takes three arguments:
 +# high ratio, factor, and flow type, and uses the ratio between incoming mutation responses and outgoing mutation requests.
 +# If below high ratio, outgoing mutations are rate limited according to the incoming rate decreased by the given factor;
 +# if above high ratio, the rate limiting is increased by the given factor;
 +# such factor is usually best configured between 1 and 10, use larger values for a faster recovery
 +# at the expense of potentially more dropped mutations;
 +# the rate limiting is applied according to the flow type: if FAST, it's rate limited at the speed of the fastest replica,
 +# if SLOW at the speed of the slowest one.
 +# New strategies can be added. Implementors need to implement org.apache.cassandra.net.BackpressureStrategy and
 +# provide a public constructor accepting a Map<String, Object>.
 +back_pressure_strategy:
 +    - class_name: org.apache.cassandra.net.RateBasedBackPressure
 +      parameters:
 +        - high_ratio: 0.90
 +          factor: 5
 +          flow: FAST
++
+ # Coalescing Strategies #
+ # Coalescing multiples messages turns out to significantly boost message processing throughput (think doubling or more).
+ # On bare metal, the floor for packet processing throughput is high enough that many applications won\u2019t notice, but in
+ # virtualized environments, the point at which an application can be bound by network packet processing can be
+ # surprisingly low compared to the throughput of task processing that is possible inside a VM. It\u2019s not that bare metal
+ # doesn\u2019t benefit from coalescing messages, it\u2019s that the number of packets a bare metal network interface can process
+ # is sufficient for many applications such that no load starvation is experienced even without coalescing.
+ # There are other benefits to coalescing network messages that are harder to isolate with a simple metric like messages
+ # per second. By coalescing multiple tasks together, a network thread can process multiple messages for the cost of one
+ # trip to read from a socket, and all the task submission work can be done at the same time reducing context switching
+ # and increasing cache friendliness of network message processing.
+ # See CASSANDRA-8692 for details.
+ 
+ # Strategy to use for coalescing messages in OutboundTcpConnection.
+ # Can be fixed, movingaverage, timehorizon (default), disabled.
+ # You can also specify a subclass of CoalescingStrategies.CoalescingStrategy by name.
+ # otc_coalescing_strategy: TIMEHORIZON
+ 
+ # How many microseconds to wait for coalescing. For fixed strategy this is the amount of time after the first
+ # message is received before it will be sent with any accompanying messages. For moving average this is the
+ # maximum amount of time that will be waited as well as the interval at which messages must arrive on average
+ # for coalescing to be enabled.
+ # otc_coalescing_window_us: 200
+ 
+ # Do not try to coalesce messages if we already got that many messages. This should be more than 2 and less than 128.
+ # otc_coalescing_enough_coalesced_messages: 8

http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 4261674,602214f..ce3adfe
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -626,341 -655,45 +626,347 @@@ public class DatabaseDescripto
          catch (NumberFormatException e)
          {
              throw new ConfigurationException("key_cache_size_in_mb option was set incorrectly to '"
 -                    + conf.key_cache_size_in_mb + "', supported values are <integer> >= 0.", false);
 +                                             + conf.key_cache_size_in_mb + "', supported values are <integer> >= 0.", false);
 +        }
 +
 +        try
 +        {
 +            // if counter_cache_size_in_mb option was set to "auto" then size of the cache should be "min(2.5% of Heap (in MB), 50MB)
 +            counterCacheSizeInMB = (conf.counter_cache_size_in_mb == null)
 +                                   ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.025 / 1024 / 1024)), 50)
 +                                   : conf.counter_cache_size_in_mb;
 +
 +            if (counterCacheSizeInMB < 0)
 +                throw new NumberFormatException(); // to escape duplicating error message
 +        }
 +        catch (NumberFormatException e)
 +        {
 +            throw new ConfigurationException("counter_cache_size_in_mb option was set incorrectly to '"
 +                                             + conf.counter_cache_size_in_mb + "', supported values are <integer> >= 0.", false);
 +        }
 +
 +        // if set to empty/"auto" then use 5% of Heap size
 +        indexSummaryCapacityInMB = (conf.index_summary_capacity_in_mb == null)
 +                                   ? Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024))
 +                                   : conf.index_summary_capacity_in_mb;
 +
 +        if (indexSummaryCapacityInMB < 0)
 +            throw new ConfigurationException("index_summary_capacity_in_mb option was set incorrectly to '"
 +                                             + conf.index_summary_capacity_in_mb + "', it should be a non-negative integer.", false);
 +
 +        if (conf.index_interval != null)
 +            logger.warn("index_interval has been deprecated and should be removed from cassandra.yaml");
 +
 +        if(conf.encryption_options != null)
 +        {
 +            logger.warn("Please rename encryption_options as server_encryption_options in the yaml");
 +            //operate under the assumption that server_encryption_options is not set in yaml rather than both
 +            conf.server_encryption_options = conf.encryption_options;
 +        }
 +
 +        if (conf.user_defined_function_fail_timeout < 0)
 +            throw new ConfigurationException("user_defined_function_fail_timeout must not be negative", false);
 +        if (conf.user_defined_function_warn_timeout < 0)
 +            throw new ConfigurationException("user_defined_function_warn_timeout must not be negative", false);
 +
 +        if (conf.user_defined_function_fail_timeout < conf.user_defined_function_warn_timeout)
 +            throw new ConfigurationException("user_defined_function_warn_timeout must less than user_defined_function_fail_timeout", false);
 +
 +        if (conf.max_mutation_size_in_kb == null)
 +            conf.max_mutation_size_in_kb = conf.commitlog_segment_size_in_mb * 1024 / 2;
 +        else if (conf.commitlog_segment_size_in_mb * 1024 < 2 * conf.max_mutation_size_in_kb)
 +            throw new ConfigurationException("commitlog_segment_size_in_mb must be at least twice the size of max_mutation_size_in_kb / 1024", false);
 +
 +        // native transport encryption options
 +        if (conf.native_transport_port_ssl != null
 +            && conf.native_transport_port_ssl != conf.native_transport_port
 +            && !conf.client_encryption_options.enabled)
 +        {
 +            throw new ConfigurationException("Encryption must be enabled in client_encryption_options for native_transport_port_ssl", false);
 +        }
 +
 +        if (conf.max_value_size_in_mb <= 0)
 +            throw new ConfigurationException("max_value_size_in_mb must be positive", false);
 +
 +        switch (conf.disk_optimization_strategy)
 +        {
 +            case ssd:
 +                diskOptimizationStrategy = new SsdDiskOptimizationStrategy(conf.disk_optimization_page_cross_chance);
 +                break;
 +            case spinning:
 +                diskOptimizationStrategy = new SpinningDiskOptimizationStrategy();
 +                break;
 +        }
 +
 +        try
 +        {
 +            ParameterizedClass strategy = conf.back_pressure_strategy != null ? conf.back_pressure_strategy : RateBasedBackPressure.withDefaultParams();
 +            Class<?> clazz = Class.forName(strategy.class_name);
 +            if (!BackPressureStrategy.class.isAssignableFrom(clazz))
 +                throw new ConfigurationException(strategy + " is not an instance of " + BackPressureStrategy.class.getCanonicalName(), false);
 +
 +            Constructor<?> ctor = clazz.getConstructor(Map.class);
 +            BackPressureStrategy instance = (BackPressureStrategy) ctor.newInstance(strategy.parameters);
 +            logger.info("Back-pressure is {} with strategy {}.", backPressureEnabled() ? "enabled" : "disabled", conf.back_pressure_strategy);
 +            backPressureStrategy = instance;
 +        }
 +        catch (ConfigurationException ex)
 +        {
 +            throw ex;
 +        }
 +        catch (Exception ex)
 +        {
 +            throw new ConfigurationException("Error configuring back-pressure strategy: " + conf.back_pressure_strategy, ex);
 +        }
++
++        if (conf.otc_coalescing_enough_coalesced_messages > 128)
++            throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be smaller than 128", false);
++
++        if (conf.otc_coalescing_enough_coalesced_messages <= 0)
++            throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be positive", false);
 +    }
 +
 +    private static String storagedirFor(String type)
 +    {
 +        return storagedir(type + "_directory") + File.separator + type;
 +    }
 +
 +    private static String storagedir(String errMsgType)
 +    {
 +        String storagedir = System.getProperty(Config.PROPERTY_PREFIX + "storagedir", null);
 +        if (storagedir == null)
 +            throw new ConfigurationException(errMsgType + " is missing and -Dcassandra.storagedir is not set", false);
 +        return storagedir;
 +    }
 +
 +    public static void applyAddressConfig() throws ConfigurationException
 +    {
 +        applyAddressConfig(conf);
 +    }
 +
 +    public static void applyAddressConfig(Config config) throws ConfigurationException
 +    {
 +        listenAddress = null;
 +        rpcAddress = null;
 +        broadcastAddress = null;
 +        broadcastRpcAddress = null;
 +
 +        /* Local IP, hostname or interface to bind services to */
 +        if (config.listen_address != null && config.listen_interface != null)
 +        {
 +            throw new ConfigurationException("Set listen_address OR listen_interface, not both", false);
 +        }
 +        else if (config.listen_address != null)
 +        {
 +            try
 +            {
 +                listenAddress = InetAddress.getByName(config.listen_address);
 +            }
 +            catch (UnknownHostException e)
 +            {
 +                throw new ConfigurationException("Unknown listen_address '" + config.listen_address + "'", false);
 +            }
 +
 +            if (listenAddress.isAnyLocalAddress())
 +                throw new ConfigurationException("listen_address cannot be a wildcard address (" + config.listen_address + ")!", false);
 +        }
 +        else if (config.listen_interface != null)
 +        {
 +            listenAddress = getNetworkInterfaceAddress(config.listen_interface, "listen_interface", config.listen_interface_prefer_ipv6);
 +        }
 +
 +        /* Gossip Address to broadcast */
 +        if (config.broadcast_address != null)
 +        {
 +            try
 +            {
 +                broadcastAddress = InetAddress.getByName(config.broadcast_address);
 +            }
 +            catch (UnknownHostException e)
 +            {
 +                throw new ConfigurationException("Unknown broadcast_address '" + config.broadcast_address + "'", false);
 +            }
 +
 +            if (broadcastAddress.isAnyLocalAddress())
 +                throw new ConfigurationException("broadcast_address cannot be a wildcard address (" + config.broadcast_address + ")!", false);
 +        }
 +
 +        /* Local IP, hostname or interface to bind RPC server to */
 +        if (config.rpc_address != null && config.rpc_interface != null)
 +        {
 +            throw new ConfigurationException("Set rpc_address OR rpc_interface, not both", false);
 +        }
 +        else if (config.rpc_address != null)
 +        {
 +            try
 +            {
 +                rpcAddress = InetAddress.getByName(config.rpc_address);
 +            }
 +            catch (UnknownHostException e)
 +            {
 +                throw new ConfigurationException("Unknown host in rpc_address " + config.rpc_address, false);
 +            }
 +        }
 +        else if (config.rpc_interface != null)
 +        {
 +            rpcAddress = getNetworkInterfaceAddress(config.rpc_interface, "rpc_interface", config.rpc_interface_prefer_ipv6);
 +        }
 +        else
 +        {
 +            rpcAddress = FBUtilities.getLocalAddress();
 +        }
 +
 +        /* RPC address to broadcast */
 +        if (config.broadcast_rpc_address != null)
 +        {
 +            try
 +            {
 +                broadcastRpcAddress = InetAddress.getByName(config.broadcast_rpc_address);
 +            }
 +            catch (UnknownHostException e)
 +            {
 +                throw new ConfigurationException("Unknown broadcast_rpc_address '" + config.broadcast_rpc_address + "'", false);
 +            }
 +
 +            if (broadcastRpcAddress.isAnyLocalAddress())
 +                throw new ConfigurationException("broadcast_rpc_address cannot be a wildcard address (" + config.broadcast_rpc_address + ")!", false);
 +        }
 +        else
 +        {
 +            if (rpcAddress.isAnyLocalAddress())
 +                throw new ConfigurationException("If rpc_address is set to a wildcard address (" + config.rpc_address + "), then " +
 +                                                 "you must set broadcast_rpc_address to a value other than " + config.rpc_address, false);
 +        }
 +    }
 +
 +    public static void applyThriftHSHA()
 +    {
 +        // fail early instead of OOMing (see CASSANDRA-8116)
 +        if (ThriftServerType.HSHA.equals(conf.rpc_server_type) && conf.rpc_max_threads == Integer.MAX_VALUE)
 +            throw new ConfigurationException("The hsha rpc_server_type is not compatible with an rpc_max_threads " +
 +                                             "setting of 'unlimited'.  Please see the comments in cassandra.yaml " +
 +                                             "for rpc_server_type and rpc_max_threads.",
 +                                             false);
 +        if (ThriftServerType.HSHA.equals(conf.rpc_server_type) && conf.rpc_max_threads > (FBUtilities.getAvailableProcessors() * 2 + 1024))
 +            logger.warn("rpc_max_threads setting of {} may be too high for the hsha server and cause unnecessary thread contention, reducing performance", conf.rpc_max_threads);
 +    }
 +
 +    public static void applyEncryptionContext()
 +    {
 +        // always attempt to load the cipher factory, as we could be in the situation where the user has disabled encryption,
 +        // but has existing commitlogs and sstables on disk that are still encrypted (and still need to be read)
 +        encryptionContext = new EncryptionContext(conf.transparent_data_encryption_options);
 +    }
 +
 +    public static void applySeedProvider()
 +    {
 +        // load the seeds for node contact points
 +        if (conf.seed_provider == null)
 +        {
 +            throw new ConfigurationException("seeds configuration is missing; a minimum of one seed is required.", false);
 +        }
 +        try
 +        {
 +            Class<?> seedProviderClass = Class.forName(conf.seed_provider.class_name);
 +            seedProvider = (SeedProvider)seedProviderClass.getConstructor(Map.class).newInstance(conf.seed_provider.parameters);
 +        }
 +        // there are about 5 checked exceptions that could be thrown here.
 +        catch (Exception e)
 +        {
 +            throw new ConfigurationException(e.getMessage() + "\nFatal configuration error; unable to start server.  See log for stacktrace.", true);
 +        }
 +        if (seedProvider.getSeeds().size() == 0)
 +            throw new ConfigurationException("The seed provider lists no seeds.", false);
 +    }
 +
 +    public static void applyInitialTokens()
 +    {
 +        if (conf.initial_token != null)
 +        {
 +            Collection<String> tokens = tokensFromString(conf.initial_token);
 +            if (tokens.size() != conf.num_tokens)
 +                throw new ConfigurationException("The number of initial tokens (by initial_token) specified is different from num_tokens value", false);
 +
 +            for (String token : tokens)
 +                partitioner.getTokenFactory().validate(token);
 +        }
 +    }
 +
 +    // Maybe safe for clients + tools
 +    public static void applyRequestScheduler()
 +    {
 +        /* Request Scheduler setup */
 +        requestSchedulerOptions = conf.request_scheduler_options;
 +        if (conf.request_scheduler != null)
 +        {
 +            try
 +            {
 +                if (requestSchedulerOptions == null)
 +                {
 +                    requestSchedulerOptions = new RequestSchedulerOptions();
 +                }
 +                Class<?> cls = Class.forName(conf.request_scheduler);
 +                requestScheduler = (IRequestScheduler) cls.getConstructor(RequestSchedulerOptions.class).newInstance(requestSchedulerOptions);
 +            }
 +            catch (ClassNotFoundException e)
 +            {
 +                throw new ConfigurationException("Invalid Request Scheduler class " + conf.request_scheduler, false);
 +            }
 +            catch (Exception e)
 +            {
 +                throw new ConfigurationException("Unable to instantiate request scheduler", e);
 +            }
 +        }
 +        else
 +        {
 +            requestScheduler = new NoScheduler();
          }
  
 -        try
 +        if (conf.request_scheduler_id == RequestSchedulerId.keyspace)
          {
 -            // if counter_cache_size_in_mb option was set to "auto" then size of the cache should be "min(2.5% of Heap (in MB), 50MB)
 -            counterCacheSizeInMB = (conf.counter_cache_size_in_mb == null)
 -                    ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.025 / 1024 / 1024)), 50)
 -                    : conf.counter_cache_size_in_mb;
 -
 -            if (counterCacheSizeInMB < 0)
 -                throw new NumberFormatException(); // to escape duplicating error message
 +            requestSchedulerId = conf.request_scheduler_id;
          }
 -        catch (NumberFormatException e)
 +        else
          {
 -            throw new ConfigurationException("counter_cache_size_in_mb option was set incorrectly to '"
 -                    + conf.counter_cache_size_in_mb + "', supported values are <integer> >= 0.", false);
 +            // Default to Keyspace
 +            requestSchedulerId = RequestSchedulerId.keyspace;
          }
 +    }
  
 -        // if set to empty/"auto" then use 5% of Heap size
 -        indexSummaryCapacityInMB = (conf.index_summary_capacity_in_mb == null)
 -            ? Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024))
 -            : conf.index_summary_capacity_in_mb;
 -
 -        if (indexSummaryCapacityInMB < 0)
 -            throw new ConfigurationException("index_summary_capacity_in_mb option was set incorrectly to '"
 -                    + conf.index_summary_capacity_in_mb + "', it should be a non-negative integer.", false);
 -
 -        if(conf.encryption_options != null)
 +    // definitely not safe for tools + clients - implicitly instantiates StorageService
 +    public static void applySnitch()
 +    {
 +        /* end point snitch */
 +        if (conf.endpoint_snitch == null)
          {
 -            logger.warn("Please rename encryption_options as server_encryption_options in the yaml");
 -            //operate under the assumption that server_encryption_options is not set in yaml rather than both
 -            conf.server_encryption_options = conf.encryption_options;
 +            throw new ConfigurationException("Missing endpoint_snitch directive", false);
          }
 +        snitch = createEndpointSnitch(conf.dynamic_snitch, conf.endpoint_snitch);
 +        EndpointSnitchInfo.create();
  
 -        // load the seeds for node contact points
 -        if (conf.seed_provider == null)
 +        localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
 +        localComparator = new Comparator<InetAddress>()
          {
 -            throw new ConfigurationException("seeds configuration is missing; a minimum of one seed is required.", false);
 +            public int compare(InetAddress endpoint1, InetAddress endpoint2)
 +            {
 +                boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1));
 +                boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2));
 +                if (local1 && !local2)
 +                    return -1;
 +                if (local2 && !local1)
 +                    return 1;
 +                return 0;
 +            }
 +        };
 +    }
 +
 +    // definitely not safe for tools + clients - implicitly instantiates schema
 +    public static void applyPartitioner()
 +    {
 +        /* Hashing strategy */
 +        if (conf.partitioner == null)
 +        {
 +            throw new ConfigurationException("Missing directive: partitioner", false);
          }
          try
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/CoalescingStrategies.java
index 1a3c13d,d79fa15..9f3b118
--- a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
+++ b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
@@@ -17,10 -17,12 +17,13 @@@
   */
  package org.apache.cassandra.utils;
  
 +import org.apache.cassandra.concurrent.NamedThreadFactory;
  import org.apache.cassandra.config.Config;
+ import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.io.util.FileUtils;
+ 
  import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
  
  import java.io.File;
  import java.io.RandomAccessFile;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
index 97d15fe,26b6b3a..b10d70b
--- a/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
+++ b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
@@@ -101,6 -103,12 +103,12 @@@ public class CoalescingStrategiesTes
      Semaphore queueParked = new Semaphore(0);
      Semaphore queueRelease = new Semaphore(0);
  
+     @BeforeClass
+     public static void initDD()
+     {
 -        DatabaseDescriptor.forceStaticInitialization();
++        DatabaseDescriptor.daemonInitialization();
+     }
+ 
      @SuppressWarnings({ "serial" })
      @Before
      public void setUp() throws Exception


[02/10] cassandra git commit: Coalescing strategies improvements CASSANDRA-13090

Posted by aw...@apache.org.
Coalescing strategies improvements CASSANDRA-13090

With the previous code TIMEHORIZON and MOVINGAVERAGE
coalesing strategy would wait even when the backlog
still contains data which would make it grow even more.

Also:
- cleanups parkLoop()
- add otc_coalescing_max_coalesced_messages
- add otc_coalescing_enough_coalesced_messages
- add other otc_* settings to cassandra.yaml

patch by Corentin Chary <c....@criteo.com> reviewed by Ariel Weisberg <aw...@apple.com> for CASSANDRA-13090


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5725e2c4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5725e2c4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5725e2c4

Branch: refs/heads/cassandra-3.0
Commit: 5725e2c422d21d8efe5ae3bc4389842939553650
Parents: 7e05f39
Author: Corentin Chary <c....@criteo.com>
Authored: Mon Jan 9 12:06:56 2017 -0500
Committer: Ariel Weisberg <aw...@apple.com>
Committed: Mon Feb 13 12:58:29 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra.yaml                             | 32 ++++++++++++++++
 .../org/apache/cassandra/config/Config.java     |  3 +-
 .../cassandra/config/DatabaseDescriptor.java    | 16 ++++++++
 .../cassandra/net/OutboundTcpConnection.java    |  4 +-
 .../cassandra/utils/CoalescingStrategies.java   | 32 ++++++++++++----
 .../utils/CoalescingStrategiesTest.java         | 40 ++++++++++++++++++++
 7 files changed, 119 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 214fe97..4052b0f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.9
+ * Coalescing strategy sleeps too much and shouldn't be enabled by default (CASSANDRA-13090)
  * Fix negative mean latency metric (CASSANDRA-12876)
  * Use only one file pointer when creating commitlog segments (CASSANDRA-12539)
  * Fix speculative retry bugs (CASSANDRA-13009)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index ab2aa14..ddb7927 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -884,3 +884,35 @@ enable_user_defined_functions: false
 # below their system default. The sysinternals 'clockres' tool can confirm your system's default
 # setting.
 windows_timer_interval: 1
+
+# Maximum size of any value in SSTables. Safety measure to detect SSTable corruption
+# early. Any value size larger than this threshold will result into marking an SSTable
+# as corrupted.
+# max_value_size_in_mb: 256
+
+# Coalescing Strategies #
+# Coalescing multiples messages turns out to significantly boost message processing throughput (think doubling or more).
+# On bare metal, the floor for packet processing throughput is high enough that many applications won\u2019t notice, but in
+# virtualized environments, the point at which an application can be bound by network packet processing can be
+# surprisingly low compared to the throughput of task processing that is possible inside a VM. It\u2019s not that bare metal
+# doesn\u2019t benefit from coalescing messages, it\u2019s that the number of packets a bare metal network interface can process
+# is sufficient for many applications such that no load starvation is experienced even without coalescing.
+# There are other benefits to coalescing network messages that are harder to isolate with a simple metric like messages
+# per second. By coalescing multiple tasks together, a network thread can process multiple messages for the cost of one
+# trip to read from a socket, and all the task submission work can be done at the same time reducing context switching
+# and increasing cache friendliness of network message processing.
+# See CASSANDRA-8692 for details.
+
+# Strategy to use for coalescing messages in OutboundTcpConnection.
+# Can be fixed, movingaverage, timehorizon (default), disabled.
+# You can also specify a subclass of CoalescingStrategies.CoalescingStrategy by name.
+# otc_coalescing_strategy: TIMEHORIZON
+
+# How many microseconds to wait for coalescing. For fixed strategy this is the amount of time after the first
+# message is received before it will be sent with any accompanying messages. For moving average this is the
+# maximum amount of time that will be waited as well as the interval at which messages must arrive on average
+# for coalescing to be enabled.
+# otc_coalescing_window_us: 200
+
+# Do not try to coalesce messages if we already got that many messages. This should be more than 2 and less than 128.
+# otc_coalescing_enough_coalesced_messages: 8

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 9f5a22f..64b36a0 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -281,12 +281,13 @@ public class Config
 
     /*
      * How many microseconds to wait for coalescing. For fixed strategy this is the amount of time after the first
-     * messgae is received before it will be sent with any accompanying messages. For moving average this is the
+     * message is received before it will be sent with any accompanying messages. For moving average this is the
      * maximum amount of time that will be waited as well as the interval at which messages must arrive on average
      * for coalescing to be enabled.
      */
     public static final int otc_coalescing_window_us_default = 200;
     public int otc_coalescing_window_us = otc_coalescing_window_us_default;
+    public int otc_coalescing_enough_coalesced_messages = 8;
 
     public int windows_timer_interval = 0;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 35debd0..981026d 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -689,6 +689,12 @@ public class DatabaseDescriptor
         }
         if (seedProvider.getSeeds().size() == 0)
             throw new ConfigurationException("The seed provider lists no seeds.", false);
+
+        if (conf.otc_coalescing_enough_coalesced_messages > 128)
+            throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be smaller than 128", false);
+
+        if (conf.otc_coalescing_enough_coalesced_messages <= 0)
+            throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be positive", false);
     }
 
     private static FileStore guessFileStore(String dir) throws IOException
@@ -1810,6 +1816,16 @@ public class DatabaseDescriptor
         return conf.otc_coalescing_window_us;
     }
 
+    public static int getOtcCoalescingEnoughCoalescedMessages()
+    {
+        return conf.otc_coalescing_enough_coalesced_messages;
+    }
+
+    public static void setOtcCoalescingEnoughCoalescedMessages(int otc_coalescing_enough_coalesced_messages)
+    {
+        conf.otc_coalescing_enough_coalesced_messages = otc_coalescing_enough_coalesced_messages;
+    }
+
     public static boolean enableUserDefinedFunctions()
     {
         return conf.enable_user_defined_functions;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 1bf3ea3..6ec78a4 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -81,6 +81,8 @@ public class OutboundTcpConnection extends Thread
     private static final String BUFFER_SIZE_PROPERTY = PREFIX + "otc_buffer_size";
     private static final int BUFFER_SIZE = Integer.getInteger(BUFFER_SIZE_PROPERTY, 1024 * 64);
 
+    public static final int MAX_COALESCED_MESSAGES = 128;
+
     private static CoalescingStrategy newCoalescingStrategy(String displayName)
     {
         return CoalescingStrategies.newCoalescingStrategy(DatabaseDescriptor.getOtcCoalescingStrategy(),
@@ -186,7 +188,7 @@ public class OutboundTcpConnection extends Thread
 
     public void run()
     {
-        final int drainedMessageSize = 128;
+        final int drainedMessageSize = MAX_COALESCED_MESSAGES;
         // keeping list (batch) size small for now; that way we don't have an unbounded array (that we never resize)
         final List<QueuedMessage> drainedMessages = new ArrayList<>(drainedMessageSize);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
index 826bd64..52d4240 100644
--- a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
+++ b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
@@ -18,8 +18,11 @@
 package org.apache.cassandra.utils;
 
 import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.util.FileUtils;
+
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.RandomAccessFile;
@@ -38,6 +41,7 @@ import com.google.common.base.Preconditions;
 
 public class CoalescingStrategies
 {
+    static protected final Logger logger = LoggerFactory.getLogger(CoalescingStrategies.class);
 
     /*
      * Log debug information at info level about what the average is and when coalescing is enabled/disabled
@@ -85,15 +89,23 @@ public class CoalescingStrategies
     {
         long now = System.nanoTime();
         final long timer = now + nanos;
+        // We shouldn't loop if it's within a few % of the target sleep time if on a second iteration.
+        // See CASSANDRA-8692.
+        final long limit = timer - nanos / 16;
         do
         {
             LockSupport.parkNanos(timer - now);
+            now = System.nanoTime();
         }
-        while (timer - (now = System.nanoTime()) > nanos / 16);
+        while (now < limit);
     }
 
     private static boolean maybeSleep(int messages, long averageGap, long maxCoalesceWindow, Parker parker)
     {
+        // Do not sleep if there are still items in the backlog (CASSANDRA-13090).
+        if (messages >= DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages())
+            return false;
+
         // only sleep if we can expect to double the number of messages we're sending in the time interval
         long sleep = messages * averageGap;
         if (sleep <= 0 || sleep > maxCoalesceWindow)
@@ -329,7 +341,7 @@ public class CoalescingStrategies
             if (input.drainTo(out, maxItems) == 0)
             {
                 out.add(input.take());
-                input.drainTo(out, maxItems - 1);
+                input.drainTo(out, maxItems - out.size());
             }
 
             for (Coalescable qm : out)
@@ -411,15 +423,16 @@ public class CoalescingStrategies
             if (input.drainTo(out, maxItems) == 0)
             {
                 out.add(input.take());
+                input.drainTo(out, maxItems - out.size());
             }
 
             long average = notifyOfSample(out.get(0).timestampNanos());
-
             debugGap(average);
 
-            maybeSleep(out.size(), average, maxCoalesceWindow, parker);
+            if (maybeSleep(out.size(), average, maxCoalesceWindow, parker)) {
+                input.drainTo(out, maxItems - out.size());
+            }
 
-            input.drainTo(out, maxItems - out.size());
             for (int ii = 1; ii < out.size(); ii++)
                 notifyOfSample(out.get(ii).timestampNanos());
         }
@@ -447,11 +460,16 @@ public class CoalescingStrategies
         @Override
         protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out,  int maxItems) throws InterruptedException
         {
+            int enough = DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages();
+
             if (input.drainTo(out, maxItems) == 0)
             {
                 out.add(input.take());
-                parker.park(coalesceWindow);
-                input.drainTo(out, maxItems - 1);
+                input.drainTo(out, maxItems - out.size());
+                if (out.size() < enough) {
+                    parker.park(coalesceWindow);
+                    input.drainTo(out, maxItems - out.size());
+                }
             }
             debugTimestamps(out);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
index 97d15fe..26b6b3a 100644
--- a/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
+++ b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
@@ -17,10 +17,12 @@
  */
 package org.apache.cassandra.utils;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.utils.CoalescingStrategies.Clock;
 import org.apache.cassandra.utils.CoalescingStrategies.Coalescable;
 import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy;
 import org.apache.cassandra.utils.CoalescingStrategies.Parker;
+import org.junit.BeforeClass;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -101,6 +103,12 @@ public class CoalescingStrategiesTest
     Semaphore queueParked = new Semaphore(0);
     Semaphore queueRelease = new Semaphore(0);
 
+    @BeforeClass
+    public static void initDD()
+    {
+        DatabaseDescriptor.forceStaticInitialization();
+    }
+
     @SuppressWarnings({ "serial" })
     @Before
     public void setUp() throws Exception
@@ -207,6 +215,38 @@ public class CoalescingStrategiesTest
     }
 
     @Test
+    public void testFixedCoalescingStrategyEnough() throws Exception
+    {
+        int oldValue = DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages();
+        DatabaseDescriptor.setOtcCoalescingEnoughCoalescedMessages(1);
+        try {
+            cs = newStrategy("FIXED", 200);
+
+            //Test that when a stream of messages continues arriving it keeps sending until all are drained
+            //It does this because it is already awake and sending messages
+            add(42);
+            add(42);
+            cs.coalesce(input, output, 128);
+            assertEquals(2, output.size());
+            assertNull(parker.parks.poll());
+
+            clear();
+
+            runBlocker(queueParked);
+            add(42);
+            add(42);
+            add(42);
+            release();
+            assertEquals(3, output.size());
+            assertNull(parker.parks.poll());
+        }
+        finally {
+            DatabaseDescriptor.setOtcCoalescingEnoughCoalescedMessages(oldValue);
+        }
+
+    }
+
+    @Test
     public void testDisabledCoalescingStrateg() throws Exception
     {
         cs = newStrategy("DISABLED", 200);


[10/10] cassandra git commit: Merge branch 'cassandra-3.11' into trunk

Posted by aw...@apache.org.
Merge branch 'cassandra-3.11' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/edcbef3e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/edcbef3e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/edcbef3e

Branch: refs/heads/trunk
Commit: edcbef3e343778b4d5affe019f64c89da2a13aa2
Parents: ffc82b1 702ec08
Author: Ariel Weisberg <aw...@apple.com>
Authored: Mon Feb 13 13:33:11 2017 -0500
Committer: Ariel Weisberg <aw...@apple.com>
Committed: Mon Feb 13 13:34:31 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra.yaml                             | 27 +++++++++++++
 .../org/apache/cassandra/config/Config.java     |  3 +-
 .../cassandra/config/DatabaseDescriptor.java    | 16 ++++++++
 .../cassandra/net/OutboundTcpConnection.java    |  4 +-
 .../cassandra/utils/CoalescingStrategies.java   | 32 ++++++++++++----
 .../utils/CoalescingStrategiesTest.java         | 40 ++++++++++++++++++++
 7 files changed, 114 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/edcbef3e/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/edcbef3e/conf/cassandra.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/edcbef3e/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/edcbef3e/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/edcbef3e/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 574e9e7,ea64ef4..69480df
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@@ -82,9 -83,8 +82,11 @@@ public class OutboundTcpConnection exte
      private static final String BUFFER_SIZE_PROPERTY = PREFIX + "otc_buffer_size";
      private static final int BUFFER_SIZE = Integer.getInteger(BUFFER_SIZE_PROPERTY, 1024 * 64);
  
 +    //Size of 3 elements added to every message
 +    private static final int PROTOCOL_MAGIC_ID_TIMESTAMP_SIZE = 12;
 +
+     public static final int MAX_COALESCED_MESSAGES = 128;
+ 
      private static CoalescingStrategy newCoalescingStrategy(String displayName)
      {
          return CoalescingStrategies.newCoalescingStrategy(DatabaseDescriptor.getOtcCoalescingStrategy(),


[07/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by aw...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3d01e906
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3d01e906
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3d01e906

Branch: refs/heads/trunk
Commit: 3d01e906152f02f826a9663afb4a1232060ad584
Parents: 3a570d7 5725e2c
Author: Ariel Weisberg <aw...@apple.com>
Authored: Mon Feb 13 13:00:20 2017 -0500
Committer: Ariel Weisberg <aw...@apple.com>
Committed: Mon Feb 13 13:23:39 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra.yaml                             | 27 +++++++++++++
 .../org/apache/cassandra/config/Config.java     |  3 +-
 .../cassandra/config/DatabaseDescriptor.java    | 16 ++++++++
 .../cassandra/net/OutboundTcpConnection.java    |  4 +-
 .../cassandra/utils/CoalescingStrategies.java   | 32 ++++++++++++----
 .../utils/CoalescingStrategiesTest.java         | 40 ++++++++++++++++++++
 7 files changed, 114 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ac3d1ed,4052b0f..b19550a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,43 -1,5 +1,44 @@@
 -2.2.9
 +3.0.11
 + * Add vm.max_map_count StartupCheck (CASSANDRA-13008)
 + * Hint related logging should include the IP address of the destination in addition to 
 +   host ID (CASSANDRA-13205)
 + * Reloading logback.xml does not work (CASSANDRA-13173)
 + * Lightweight transactions temporarily fail after upgrade from 2.1 to 3.0 (CASSANDRA-13109)
 + * Duplicate rows after upgrading from 2.1.16 to 3.0.10/3.9 (CASSANDRA-13125)
 + * Fix UPDATE queries with empty IN restrictions (CASSANDRA-13152)
 + * Abort or retry on failed hints delivery (CASSANDRA-13124)
 + * Fix handling of partition with partition-level deletion plus
 +   live rows in sstabledump (CASSANDRA-13177)
 + * Provide user workaround when system_schema.columns does not contain entries
 +   for a table that's in system_schema.tables (CASSANDRA-13180)
 + * Dump threads when unit tests time out (CASSANDRA-13117)
 + * Better error when modifying function permissions without explicit keyspace (CASSANDRA-12925)
 + * Indexer is not correctly invoked when building indexes over sstables (CASSANDRA-13075)
 + * Read repair is not blocking repair to finish in foreground repair (CASSANDRA-13115)
 + * Stress daemon help is incorrect (CASSANDRA-12563)
 + * Remove ALTER TYPE support (CASSANDRA-12443)
 + * Fix assertion for certain legacy range tombstone pattern (CASSANDRA-12203)
 + * Set javac encoding to utf-8 (CASSANDRA-11077)
 + * Replace empty strings with null values if they cannot be converted (CASSANDRA-12794)
 + * Fixed flacky SSTableRewriterTest: check file counts before calling validateCFS (CASSANDRA-12348)
 + * Fix deserialization of 2.x DeletedCells (CASSANDRA-12620)
 + * Add parent repair session id to anticompaction log message (CASSANDRA-12186)
 + * Improve contention handling on failure to acquire MV lock for streaming and hints (CASSANDRA-12905)
 + * Fix DELETE and UPDATE queries with empty IN restrictions (CASSANDRA-12829)
 + * Mark MVs as built after successful bootstrap (CASSANDRA-12984)
 + * Estimated TS drop-time histogram updated with Cell.NO_DELETION_TIME (CASSANDRA-13040)
 + * Nodetool compactionstats fails with NullPointerException (CASSANDRA-13021)
 + * Thread local pools never cleaned up (CASSANDRA-13033)
 + * Set RPC_READY to false when draining or if a node is marked as shutdown (CASSANDRA-12781)
 + * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
 + * Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
 + * Nodetool should use a more sane max heap size (CASSANDRA-12739)
 + * LocalToken ensures token values are cloned on heap (CASSANDRA-12651)
 + * AnticompactionRequestSerializer serializedSize is incorrect (CASSANDRA-12934)
 + * Prevent reloading of logback.xml from UDF sandbox (CASSANDRA-12535)
 + * Reenable HeapPool (CASSANDRA-12900)
 +Merged from 2.2:
+  * Coalescing strategy sleeps too much and shouldn't be enabled by default (CASSANDRA-13090)
   * Fix negative mean latency metric (CASSANDRA-12876)
   * Use only one file pointer when creating commitlog segments (CASSANDRA-12539)
   * Fix speculative retry bugs (CASSANDRA-13009)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/conf/cassandra.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 91b75ed,981026d..602214f
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -708,29 -690,11 +708,35 @@@ public class DatabaseDescripto
          if (seedProvider.getSeeds().size() == 0)
              throw new ConfigurationException("The seed provider lists no seeds.", false);
  
 +        if (conf.user_defined_function_fail_timeout < 0)
 +            throw new ConfigurationException("user_defined_function_fail_timeout must not be negative", false);
 +        if (conf.user_defined_function_warn_timeout < 0)
 +            throw new ConfigurationException("user_defined_function_warn_timeout must not be negative", false);
 +
 +        if (conf.user_defined_function_fail_timeout < conf.user_defined_function_warn_timeout)
 +            throw new ConfigurationException("user_defined_function_warn_timeout must less than user_defined_function_fail_timeout", false);
 +
 +        if (conf.max_mutation_size_in_kb == null)
 +            conf.max_mutation_size_in_kb = conf.commitlog_segment_size_in_mb * 1024 / 2;
 +        else if (conf.commitlog_segment_size_in_mb * 1024 < 2 * conf.max_mutation_size_in_kb)
 +            throw new ConfigurationException("commitlog_segment_size_in_mb must be at least twice the size of max_mutation_size_in_kb / 1024", false);
 +
 +        // native transport encryption options
 +        if (conf.native_transport_port_ssl != null
 +            && conf.native_transport_port_ssl.intValue() != conf.native_transport_port.intValue()
 +            && !conf.client_encryption_options.enabled)
 +        {
 +            throw new ConfigurationException("Encryption must be enabled in client_encryption_options for native_transport_port_ssl", false);
 +        }
 +
 +        if (conf.max_value_size_in_mb == null || conf.max_value_size_in_mb <= 0)
 +            throw new ConfigurationException("max_value_size_in_mb must be positive", false);
++
+         if (conf.otc_coalescing_enough_coalesced_messages > 128)
+             throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be smaller than 128", false);
+ 
+         if (conf.otc_coalescing_enough_coalesced_messages <= 0)
+             throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be positive", false);
      }
  
      private static FileStore guessFileStore(String dir) throws IOException
@@@ -1951,11 -1816,16 +1957,21 @@@
          return conf.otc_coalescing_window_us;
      }
  
+     public static int getOtcCoalescingEnoughCoalescedMessages()
+     {
+         return conf.otc_coalescing_enough_coalesced_messages;
+     }
+ 
+     public static void setOtcCoalescingEnoughCoalescedMessages(int otc_coalescing_enough_coalesced_messages)
+     {
+         conf.otc_coalescing_enough_coalesced_messages = otc_coalescing_enough_coalesced_messages;
+     }
+ 
 +    public static int getWindowsTimerInterval()
 +    {
 +        return conf.windows_timer_interval;
 +    }
 +
      public static boolean enableUserDefinedFunctions()
      {
          return conf.enable_user_defined_functions;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
----------------------------------------------------------------------


[08/10] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by aw...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/702ec088
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/702ec088
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/702ec088

Branch: refs/heads/cassandra-3.11
Commit: 702ec088f5f61106b41e128f3fb8f109da8cbe1c
Parents: 01b91cc 3d01e90
Author: Ariel Weisberg <aw...@apple.com>
Authored: Mon Feb 13 13:31:23 2017 -0500
Committer: Ariel Weisberg <aw...@apple.com>
Committed: Mon Feb 13 13:32:30 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra.yaml                             | 27 +++++++++++++
 .../org/apache/cassandra/config/Config.java     |  3 +-
 .../cassandra/config/DatabaseDescriptor.java    | 16 ++++++++
 .../cassandra/net/OutboundTcpConnection.java    |  4 +-
 .../cassandra/utils/CoalescingStrategies.java   | 32 ++++++++++++----
 .../utils/CoalescingStrategiesTest.java         | 40 ++++++++++++++++++++
 7 files changed, 114 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 67c45e8,b19550a..8164a52
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -215,20 -100,6 +215,21 @@@ Merged from 3.0
   * Correct log message for statistics of offheap memtable flush (CASSANDRA-12776)
   * Explicitly set locale for string validation (CASSANDRA-12541,CASSANDRA-12542,CASSANDRA-12543,CASSANDRA-12545)
  Merged from 2.2:
++ * Coalescing strategy sleeps too much and shouldn't be enabled by default (CASSANDRA-13090)
 + * Fix speculative retry bugs (CASSANDRA-13009)
 + * Fix handling of nulls and unsets in IN conditions (CASSANDRA-12981)
 + * Fix race causing infinite loop if Thrift server is stopped before it starts listening (CASSANDRA-12856)
 + * CompactionTasks now correctly drops sstables out of compaction when not enough disk space is available (CASSANDRA-12979)
 + * Remove support for non-JavaScript UDFs (CASSANDRA-12883)
 + * Fix DynamicEndpointSnitch noop in multi-datacenter situations (CASSANDRA-13074)
 + * cqlsh copy-from: encode column names to avoid primary key parsing errors (CASSANDRA-12909)
 + * Temporarily fix bug that creates commit log when running offline tools (CASSANDRA-8616)
 + * Reduce granuality of OpOrder.Group during index build (CASSANDRA-12796)
 + * Test bind parameters and unset parameters in InsertUpdateIfConditionTest (CASSANDRA-12980)
 + * Use saved tokens when setting local tokens on StorageService.joinRing (CASSANDRA-12935)
 + * cqlsh: fix DESC TYPES errors (CASSANDRA-12914)
 + * Fix leak on skipped SSTables in sstableupgrade (CASSANDRA-12899)
 + * Avoid blocking gossip during pending range calculation (CASSANDRA-12281)
   * Fix purgeability of tombstones with max timestamp (CASSANDRA-12792)
   * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
   * cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/conf/cassandra.yaml
----------------------------------------------------------------------
diff --cc conf/cassandra.yaml
index 4436a02,a9d4c01..063a0b7
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@@ -1173,24 -959,29 +1173,51 @@@ gc_warn_threshold_in_ms: 100
  # as corrupted.
  # max_value_size_in_mb: 256
  
 +# Back-pressure settings #
 +# If enabled, the coordinator will apply the back-pressure strategy specified below to each mutation
 +# sent to replicas, with the aim of reducing pressure on overloaded replicas.
 +back_pressure_enabled: false
 +# The back-pressure strategy applied.
 +# The default implementation, RateBasedBackPressure, takes three arguments:
 +# high ratio, factor, and flow type, and uses the ratio between incoming mutation responses and outgoing mutation requests.
 +# If below high ratio, outgoing mutations are rate limited according to the incoming rate decreased by the given factor;
 +# if above high ratio, the rate limiting is increased by the given factor;
 +# such factor is usually best configured between 1 and 10, use larger values for a faster recovery
 +# at the expense of potentially more dropped mutations;
 +# the rate limiting is applied according to the flow type: if FAST, it's rate limited at the speed of the fastest replica,
 +# if SLOW at the speed of the slowest one.
 +# New strategies can be added. Implementors need to implement org.apache.cassandra.net.BackpressureStrategy and
 +# provide a public constructor accepting a Map<String, Object>.
 +back_pressure_strategy:
 +    - class_name: org.apache.cassandra.net.RateBasedBackPressure
 +      parameters:
 +        - high_ratio: 0.90
 +          factor: 5
 +          flow: FAST
++
+ # Coalescing Strategies #
+ # Coalescing multiples messages turns out to significantly boost message processing throughput (think doubling or more).
+ # On bare metal, the floor for packet processing throughput is high enough that many applications won\u2019t notice, but in
+ # virtualized environments, the point at which an application can be bound by network packet processing can be
+ # surprisingly low compared to the throughput of task processing that is possible inside a VM. It\u2019s not that bare metal
+ # doesn\u2019t benefit from coalescing messages, it\u2019s that the number of packets a bare metal network interface can process
+ # is sufficient for many applications such that no load starvation is experienced even without coalescing.
+ # There are other benefits to coalescing network messages that are harder to isolate with a simple metric like messages
+ # per second. By coalescing multiple tasks together, a network thread can process multiple messages for the cost of one
+ # trip to read from a socket, and all the task submission work can be done at the same time reducing context switching
+ # and increasing cache friendliness of network message processing.
+ # See CASSANDRA-8692 for details.
+ 
+ # Strategy to use for coalescing messages in OutboundTcpConnection.
+ # Can be fixed, movingaverage, timehorizon (default), disabled.
+ # You can also specify a subclass of CoalescingStrategies.CoalescingStrategy by name.
+ # otc_coalescing_strategy: TIMEHORIZON
+ 
+ # How many microseconds to wait for coalescing. For fixed strategy this is the amount of time after the first
+ # message is received before it will be sent with any accompanying messages. For moving average this is the
+ # maximum amount of time that will be waited as well as the interval at which messages must arrive on average
+ # for coalescing to be enabled.
+ # otc_coalescing_window_us: 200
+ 
+ # Do not try to coalesce messages if we already got that many messages. This should be more than 2 and less than 128.
+ # otc_coalescing_enough_coalesced_messages: 8

http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 4261674,602214f..ce3adfe
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -626,341 -655,45 +626,347 @@@ public class DatabaseDescripto
          catch (NumberFormatException e)
          {
              throw new ConfigurationException("key_cache_size_in_mb option was set incorrectly to '"
 -                    + conf.key_cache_size_in_mb + "', supported values are <integer> >= 0.", false);
 +                                             + conf.key_cache_size_in_mb + "', supported values are <integer> >= 0.", false);
 +        }
 +
 +        try
 +        {
 +            // if counter_cache_size_in_mb option was set to "auto" then size of the cache should be "min(2.5% of Heap (in MB), 50MB)
 +            counterCacheSizeInMB = (conf.counter_cache_size_in_mb == null)
 +                                   ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.025 / 1024 / 1024)), 50)
 +                                   : conf.counter_cache_size_in_mb;
 +
 +            if (counterCacheSizeInMB < 0)
 +                throw new NumberFormatException(); // to escape duplicating error message
 +        }
 +        catch (NumberFormatException e)
 +        {
 +            throw new ConfigurationException("counter_cache_size_in_mb option was set incorrectly to '"
 +                                             + conf.counter_cache_size_in_mb + "', supported values are <integer> >= 0.", false);
 +        }
 +
 +        // if set to empty/"auto" then use 5% of Heap size
 +        indexSummaryCapacityInMB = (conf.index_summary_capacity_in_mb == null)
 +                                   ? Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024))
 +                                   : conf.index_summary_capacity_in_mb;
 +
 +        if (indexSummaryCapacityInMB < 0)
 +            throw new ConfigurationException("index_summary_capacity_in_mb option was set incorrectly to '"
 +                                             + conf.index_summary_capacity_in_mb + "', it should be a non-negative integer.", false);
 +
 +        if (conf.index_interval != null)
 +            logger.warn("index_interval has been deprecated and should be removed from cassandra.yaml");
 +
 +        if(conf.encryption_options != null)
 +        {
 +            logger.warn("Please rename encryption_options as server_encryption_options in the yaml");
 +            //operate under the assumption that server_encryption_options is not set in yaml rather than both
 +            conf.server_encryption_options = conf.encryption_options;
 +        }
 +
 +        if (conf.user_defined_function_fail_timeout < 0)
 +            throw new ConfigurationException("user_defined_function_fail_timeout must not be negative", false);
 +        if (conf.user_defined_function_warn_timeout < 0)
 +            throw new ConfigurationException("user_defined_function_warn_timeout must not be negative", false);
 +
 +        if (conf.user_defined_function_fail_timeout < conf.user_defined_function_warn_timeout)
 +            throw new ConfigurationException("user_defined_function_warn_timeout must less than user_defined_function_fail_timeout", false);
 +
 +        if (conf.max_mutation_size_in_kb == null)
 +            conf.max_mutation_size_in_kb = conf.commitlog_segment_size_in_mb * 1024 / 2;
 +        else if (conf.commitlog_segment_size_in_mb * 1024 < 2 * conf.max_mutation_size_in_kb)
 +            throw new ConfigurationException("commitlog_segment_size_in_mb must be at least twice the size of max_mutation_size_in_kb / 1024", false);
 +
 +        // native transport encryption options
 +        if (conf.native_transport_port_ssl != null
 +            && conf.native_transport_port_ssl != conf.native_transport_port
 +            && !conf.client_encryption_options.enabled)
 +        {
 +            throw new ConfigurationException("Encryption must be enabled in client_encryption_options for native_transport_port_ssl", false);
 +        }
 +
 +        if (conf.max_value_size_in_mb <= 0)
 +            throw new ConfigurationException("max_value_size_in_mb must be positive", false);
 +
 +        switch (conf.disk_optimization_strategy)
 +        {
 +            case ssd:
 +                diskOptimizationStrategy = new SsdDiskOptimizationStrategy(conf.disk_optimization_page_cross_chance);
 +                break;
 +            case spinning:
 +                diskOptimizationStrategy = new SpinningDiskOptimizationStrategy();
 +                break;
 +        }
 +
 +        try
 +        {
 +            ParameterizedClass strategy = conf.back_pressure_strategy != null ? conf.back_pressure_strategy : RateBasedBackPressure.withDefaultParams();
 +            Class<?> clazz = Class.forName(strategy.class_name);
 +            if (!BackPressureStrategy.class.isAssignableFrom(clazz))
 +                throw new ConfigurationException(strategy + " is not an instance of " + BackPressureStrategy.class.getCanonicalName(), false);
 +
 +            Constructor<?> ctor = clazz.getConstructor(Map.class);
 +            BackPressureStrategy instance = (BackPressureStrategy) ctor.newInstance(strategy.parameters);
 +            logger.info("Back-pressure is {} with strategy {}.", backPressureEnabled() ? "enabled" : "disabled", conf.back_pressure_strategy);
 +            backPressureStrategy = instance;
 +        }
 +        catch (ConfigurationException ex)
 +        {
 +            throw ex;
 +        }
 +        catch (Exception ex)
 +        {
 +            throw new ConfigurationException("Error configuring back-pressure strategy: " + conf.back_pressure_strategy, ex);
 +        }
++
++        if (conf.otc_coalescing_enough_coalesced_messages > 128)
++            throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be smaller than 128", false);
++
++        if (conf.otc_coalescing_enough_coalesced_messages <= 0)
++            throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be positive", false);
 +    }
 +
 +    private static String storagedirFor(String type)
 +    {
 +        return storagedir(type + "_directory") + File.separator + type;
 +    }
 +
 +    private static String storagedir(String errMsgType)
 +    {
 +        String storagedir = System.getProperty(Config.PROPERTY_PREFIX + "storagedir", null);
 +        if (storagedir == null)
 +            throw new ConfigurationException(errMsgType + " is missing and -Dcassandra.storagedir is not set", false);
 +        return storagedir;
 +    }
 +
 +    public static void applyAddressConfig() throws ConfigurationException
 +    {
 +        applyAddressConfig(conf);
 +    }
 +
 +    public static void applyAddressConfig(Config config) throws ConfigurationException
 +    {
 +        listenAddress = null;
 +        rpcAddress = null;
 +        broadcastAddress = null;
 +        broadcastRpcAddress = null;
 +
 +        /* Local IP, hostname or interface to bind services to */
 +        if (config.listen_address != null && config.listen_interface != null)
 +        {
 +            throw new ConfigurationException("Set listen_address OR listen_interface, not both", false);
 +        }
 +        else if (config.listen_address != null)
 +        {
 +            try
 +            {
 +                listenAddress = InetAddress.getByName(config.listen_address);
 +            }
 +            catch (UnknownHostException e)
 +            {
 +                throw new ConfigurationException("Unknown listen_address '" + config.listen_address + "'", false);
 +            }
 +
 +            if (listenAddress.isAnyLocalAddress())
 +                throw new ConfigurationException("listen_address cannot be a wildcard address (" + config.listen_address + ")!", false);
 +        }
 +        else if (config.listen_interface != null)
 +        {
 +            listenAddress = getNetworkInterfaceAddress(config.listen_interface, "listen_interface", config.listen_interface_prefer_ipv6);
 +        }
 +
 +        /* Gossip Address to broadcast */
 +        if (config.broadcast_address != null)
 +        {
 +            try
 +            {
 +                broadcastAddress = InetAddress.getByName(config.broadcast_address);
 +            }
 +            catch (UnknownHostException e)
 +            {
 +                throw new ConfigurationException("Unknown broadcast_address '" + config.broadcast_address + "'", false);
 +            }
 +
 +            if (broadcastAddress.isAnyLocalAddress())
 +                throw new ConfigurationException("broadcast_address cannot be a wildcard address (" + config.broadcast_address + ")!", false);
 +        }
 +
 +        /* Local IP, hostname or interface to bind RPC server to */
 +        if (config.rpc_address != null && config.rpc_interface != null)
 +        {
 +            throw new ConfigurationException("Set rpc_address OR rpc_interface, not both", false);
 +        }
 +        else if (config.rpc_address != null)
 +        {
 +            try
 +            {
 +                rpcAddress = InetAddress.getByName(config.rpc_address);
 +            }
 +            catch (UnknownHostException e)
 +            {
 +                throw new ConfigurationException("Unknown host in rpc_address " + config.rpc_address, false);
 +            }
 +        }
 +        else if (config.rpc_interface != null)
 +        {
 +            rpcAddress = getNetworkInterfaceAddress(config.rpc_interface, "rpc_interface", config.rpc_interface_prefer_ipv6);
 +        }
 +        else
 +        {
 +            rpcAddress = FBUtilities.getLocalAddress();
 +        }
 +
 +        /* RPC address to broadcast */
 +        if (config.broadcast_rpc_address != null)
 +        {
 +            try
 +            {
 +                broadcastRpcAddress = InetAddress.getByName(config.broadcast_rpc_address);
 +            }
 +            catch (UnknownHostException e)
 +            {
 +                throw new ConfigurationException("Unknown broadcast_rpc_address '" + config.broadcast_rpc_address + "'", false);
 +            }
 +
 +            if (broadcastRpcAddress.isAnyLocalAddress())
 +                throw new ConfigurationException("broadcast_rpc_address cannot be a wildcard address (" + config.broadcast_rpc_address + ")!", false);
 +        }
 +        else
 +        {
 +            if (rpcAddress.isAnyLocalAddress())
 +                throw new ConfigurationException("If rpc_address is set to a wildcard address (" + config.rpc_address + "), then " +
 +                                                 "you must set broadcast_rpc_address to a value other than " + config.rpc_address, false);
 +        }
 +    }
 +
 +    public static void applyThriftHSHA()
 +    {
 +        // fail early instead of OOMing (see CASSANDRA-8116)
 +        if (ThriftServerType.HSHA.equals(conf.rpc_server_type) && conf.rpc_max_threads == Integer.MAX_VALUE)
 +            throw new ConfigurationException("The hsha rpc_server_type is not compatible with an rpc_max_threads " +
 +                                             "setting of 'unlimited'.  Please see the comments in cassandra.yaml " +
 +                                             "for rpc_server_type and rpc_max_threads.",
 +                                             false);
 +        if (ThriftServerType.HSHA.equals(conf.rpc_server_type) && conf.rpc_max_threads > (FBUtilities.getAvailableProcessors() * 2 + 1024))
 +            logger.warn("rpc_max_threads setting of {} may be too high for the hsha server and cause unnecessary thread contention, reducing performance", conf.rpc_max_threads);
 +    }
 +
 +    public static void applyEncryptionContext()
 +    {
 +        // always attempt to load the cipher factory, as we could be in the situation where the user has disabled encryption,
 +        // but has existing commitlogs and sstables on disk that are still encrypted (and still need to be read)
 +        encryptionContext = new EncryptionContext(conf.transparent_data_encryption_options);
 +    }
 +
 +    public static void applySeedProvider()
 +    {
 +        // load the seeds for node contact points
 +        if (conf.seed_provider == null)
 +        {
 +            throw new ConfigurationException("seeds configuration is missing; a minimum of one seed is required.", false);
 +        }
 +        try
 +        {
 +            Class<?> seedProviderClass = Class.forName(conf.seed_provider.class_name);
 +            seedProvider = (SeedProvider)seedProviderClass.getConstructor(Map.class).newInstance(conf.seed_provider.parameters);
 +        }
 +        // there are about 5 checked exceptions that could be thrown here.
 +        catch (Exception e)
 +        {
 +            throw new ConfigurationException(e.getMessage() + "\nFatal configuration error; unable to start server.  See log for stacktrace.", true);
 +        }
 +        if (seedProvider.getSeeds().size() == 0)
 +            throw new ConfigurationException("The seed provider lists no seeds.", false);
 +    }
 +
 +    public static void applyInitialTokens()
 +    {
 +        if (conf.initial_token != null)
 +        {
 +            Collection<String> tokens = tokensFromString(conf.initial_token);
 +            if (tokens.size() != conf.num_tokens)
 +                throw new ConfigurationException("The number of initial tokens (by initial_token) specified is different from num_tokens value", false);
 +
 +            for (String token : tokens)
 +                partitioner.getTokenFactory().validate(token);
 +        }
 +    }
 +
 +    // Maybe safe for clients + tools
 +    public static void applyRequestScheduler()
 +    {
 +        /* Request Scheduler setup */
 +        requestSchedulerOptions = conf.request_scheduler_options;
 +        if (conf.request_scheduler != null)
 +        {
 +            try
 +            {
 +                if (requestSchedulerOptions == null)
 +                {
 +                    requestSchedulerOptions = new RequestSchedulerOptions();
 +                }
 +                Class<?> cls = Class.forName(conf.request_scheduler);
 +                requestScheduler = (IRequestScheduler) cls.getConstructor(RequestSchedulerOptions.class).newInstance(requestSchedulerOptions);
 +            }
 +            catch (ClassNotFoundException e)
 +            {
 +                throw new ConfigurationException("Invalid Request Scheduler class " + conf.request_scheduler, false);
 +            }
 +            catch (Exception e)
 +            {
 +                throw new ConfigurationException("Unable to instantiate request scheduler", e);
 +            }
 +        }
 +        else
 +        {
 +            requestScheduler = new NoScheduler();
          }
  
 -        try
 +        if (conf.request_scheduler_id == RequestSchedulerId.keyspace)
          {
 -            // if counter_cache_size_in_mb option was set to "auto" then size of the cache should be "min(2.5% of Heap (in MB), 50MB)
 -            counterCacheSizeInMB = (conf.counter_cache_size_in_mb == null)
 -                    ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.025 / 1024 / 1024)), 50)
 -                    : conf.counter_cache_size_in_mb;
 -
 -            if (counterCacheSizeInMB < 0)
 -                throw new NumberFormatException(); // to escape duplicating error message
 +            requestSchedulerId = conf.request_scheduler_id;
          }
 -        catch (NumberFormatException e)
 +        else
          {
 -            throw new ConfigurationException("counter_cache_size_in_mb option was set incorrectly to '"
 -                    + conf.counter_cache_size_in_mb + "', supported values are <integer> >= 0.", false);
 +            // Default to Keyspace
 +            requestSchedulerId = RequestSchedulerId.keyspace;
          }
 +    }
  
 -        // if set to empty/"auto" then use 5% of Heap size
 -        indexSummaryCapacityInMB = (conf.index_summary_capacity_in_mb == null)
 -            ? Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024))
 -            : conf.index_summary_capacity_in_mb;
 -
 -        if (indexSummaryCapacityInMB < 0)
 -            throw new ConfigurationException("index_summary_capacity_in_mb option was set incorrectly to '"
 -                    + conf.index_summary_capacity_in_mb + "', it should be a non-negative integer.", false);
 -
 -        if(conf.encryption_options != null)
 +    // definitely not safe for tools + clients - implicitly instantiates StorageService
 +    public static void applySnitch()
 +    {
 +        /* end point snitch */
 +        if (conf.endpoint_snitch == null)
          {
 -            logger.warn("Please rename encryption_options as server_encryption_options in the yaml");
 -            //operate under the assumption that server_encryption_options is not set in yaml rather than both
 -            conf.server_encryption_options = conf.encryption_options;
 +            throw new ConfigurationException("Missing endpoint_snitch directive", false);
          }
 +        snitch = createEndpointSnitch(conf.dynamic_snitch, conf.endpoint_snitch);
 +        EndpointSnitchInfo.create();
  
 -        // load the seeds for node contact points
 -        if (conf.seed_provider == null)
 +        localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
 +        localComparator = new Comparator<InetAddress>()
          {
 -            throw new ConfigurationException("seeds configuration is missing; a minimum of one seed is required.", false);
 +            public int compare(InetAddress endpoint1, InetAddress endpoint2)
 +            {
 +                boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1));
 +                boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2));
 +                if (local1 && !local2)
 +                    return -1;
 +                if (local2 && !local1)
 +                    return 1;
 +                return 0;
 +            }
 +        };
 +    }
 +
 +    // definitely not safe for tools + clients - implicitly instantiates schema
 +    public static void applyPartitioner()
 +    {
 +        /* Hashing strategy */
 +        if (conf.partitioner == null)
 +        {
 +            throw new ConfigurationException("Missing directive: partitioner", false);
          }
          try
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/CoalescingStrategies.java
index 1a3c13d,d79fa15..9f3b118
--- a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
+++ b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
@@@ -17,10 -17,12 +17,13 @@@
   */
  package org.apache.cassandra.utils;
  
 +import org.apache.cassandra.concurrent.NamedThreadFactory;
  import org.apache.cassandra.config.Config;
+ import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.io.util.FileUtils;
+ 
  import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
  
  import java.io.File;
  import java.io.RandomAccessFile;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/702ec088/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
index 97d15fe,26b6b3a..b10d70b
--- a/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
+++ b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
@@@ -101,6 -103,12 +103,12 @@@ public class CoalescingStrategiesTes
      Semaphore queueParked = new Semaphore(0);
      Semaphore queueRelease = new Semaphore(0);
  
+     @BeforeClass
+     public static void initDD()
+     {
 -        DatabaseDescriptor.forceStaticInitialization();
++        DatabaseDescriptor.daemonInitialization();
+     }
+ 
      @SuppressWarnings({ "serial" })
      @Before
      public void setUp() throws Exception


[05/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by aw...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3d01e906
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3d01e906
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3d01e906

Branch: refs/heads/cassandra-3.11
Commit: 3d01e906152f02f826a9663afb4a1232060ad584
Parents: 3a570d7 5725e2c
Author: Ariel Weisberg <aw...@apple.com>
Authored: Mon Feb 13 13:00:20 2017 -0500
Committer: Ariel Weisberg <aw...@apple.com>
Committed: Mon Feb 13 13:23:39 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra.yaml                             | 27 +++++++++++++
 .../org/apache/cassandra/config/Config.java     |  3 +-
 .../cassandra/config/DatabaseDescriptor.java    | 16 ++++++++
 .../cassandra/net/OutboundTcpConnection.java    |  4 +-
 .../cassandra/utils/CoalescingStrategies.java   | 32 ++++++++++++----
 .../utils/CoalescingStrategiesTest.java         | 40 ++++++++++++++++++++
 7 files changed, 114 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ac3d1ed,4052b0f..b19550a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,43 -1,5 +1,44 @@@
 -2.2.9
 +3.0.11
 + * Add vm.max_map_count StartupCheck (CASSANDRA-13008)
 + * Hint related logging should include the IP address of the destination in addition to 
 +   host ID (CASSANDRA-13205)
 + * Reloading logback.xml does not work (CASSANDRA-13173)
 + * Lightweight transactions temporarily fail after upgrade from 2.1 to 3.0 (CASSANDRA-13109)
 + * Duplicate rows after upgrading from 2.1.16 to 3.0.10/3.9 (CASSANDRA-13125)
 + * Fix UPDATE queries with empty IN restrictions (CASSANDRA-13152)
 + * Abort or retry on failed hints delivery (CASSANDRA-13124)
 + * Fix handling of partition with partition-level deletion plus
 +   live rows in sstabledump (CASSANDRA-13177)
 + * Provide user workaround when system_schema.columns does not contain entries
 +   for a table that's in system_schema.tables (CASSANDRA-13180)
 + * Dump threads when unit tests time out (CASSANDRA-13117)
 + * Better error when modifying function permissions without explicit keyspace (CASSANDRA-12925)
 + * Indexer is not correctly invoked when building indexes over sstables (CASSANDRA-13075)
 + * Read repair is not blocking repair to finish in foreground repair (CASSANDRA-13115)
 + * Stress daemon help is incorrect (CASSANDRA-12563)
 + * Remove ALTER TYPE support (CASSANDRA-12443)
 + * Fix assertion for certain legacy range tombstone pattern (CASSANDRA-12203)
 + * Set javac encoding to utf-8 (CASSANDRA-11077)
 + * Replace empty strings with null values if they cannot be converted (CASSANDRA-12794)
 + * Fixed flacky SSTableRewriterTest: check file counts before calling validateCFS (CASSANDRA-12348)
 + * Fix deserialization of 2.x DeletedCells (CASSANDRA-12620)
 + * Add parent repair session id to anticompaction log message (CASSANDRA-12186)
 + * Improve contention handling on failure to acquire MV lock for streaming and hints (CASSANDRA-12905)
 + * Fix DELETE and UPDATE queries with empty IN restrictions (CASSANDRA-12829)
 + * Mark MVs as built after successful bootstrap (CASSANDRA-12984)
 + * Estimated TS drop-time histogram updated with Cell.NO_DELETION_TIME (CASSANDRA-13040)
 + * Nodetool compactionstats fails with NullPointerException (CASSANDRA-13021)
 + * Thread local pools never cleaned up (CASSANDRA-13033)
 + * Set RPC_READY to false when draining or if a node is marked as shutdown (CASSANDRA-12781)
 + * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
 + * Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
 + * Nodetool should use a more sane max heap size (CASSANDRA-12739)
 + * LocalToken ensures token values are cloned on heap (CASSANDRA-12651)
 + * AnticompactionRequestSerializer serializedSize is incorrect (CASSANDRA-12934)
 + * Prevent reloading of logback.xml from UDF sandbox (CASSANDRA-12535)
 + * Reenable HeapPool (CASSANDRA-12900)
 +Merged from 2.2:
+  * Coalescing strategy sleeps too much and shouldn't be enabled by default (CASSANDRA-13090)
   * Fix negative mean latency metric (CASSANDRA-12876)
   * Use only one file pointer when creating commitlog segments (CASSANDRA-12539)
   * Fix speculative retry bugs (CASSANDRA-13009)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/conf/cassandra.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 91b75ed,981026d..602214f
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -708,29 -690,11 +708,35 @@@ public class DatabaseDescripto
          if (seedProvider.getSeeds().size() == 0)
              throw new ConfigurationException("The seed provider lists no seeds.", false);
  
 +        if (conf.user_defined_function_fail_timeout < 0)
 +            throw new ConfigurationException("user_defined_function_fail_timeout must not be negative", false);
 +        if (conf.user_defined_function_warn_timeout < 0)
 +            throw new ConfigurationException("user_defined_function_warn_timeout must not be negative", false);
 +
 +        if (conf.user_defined_function_fail_timeout < conf.user_defined_function_warn_timeout)
 +            throw new ConfigurationException("user_defined_function_warn_timeout must less than user_defined_function_fail_timeout", false);
 +
 +        if (conf.max_mutation_size_in_kb == null)
 +            conf.max_mutation_size_in_kb = conf.commitlog_segment_size_in_mb * 1024 / 2;
 +        else if (conf.commitlog_segment_size_in_mb * 1024 < 2 * conf.max_mutation_size_in_kb)
 +            throw new ConfigurationException("commitlog_segment_size_in_mb must be at least twice the size of max_mutation_size_in_kb / 1024", false);
 +
 +        // native transport encryption options
 +        if (conf.native_transport_port_ssl != null
 +            && conf.native_transport_port_ssl.intValue() != conf.native_transport_port.intValue()
 +            && !conf.client_encryption_options.enabled)
 +        {
 +            throw new ConfigurationException("Encryption must be enabled in client_encryption_options for native_transport_port_ssl", false);
 +        }
 +
 +        if (conf.max_value_size_in_mb == null || conf.max_value_size_in_mb <= 0)
 +            throw new ConfigurationException("max_value_size_in_mb must be positive", false);
++
+         if (conf.otc_coalescing_enough_coalesced_messages > 128)
+             throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be smaller than 128", false);
+ 
+         if (conf.otc_coalescing_enough_coalesced_messages <= 0)
+             throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be positive", false);
      }
  
      private static FileStore guessFileStore(String dir) throws IOException
@@@ -1951,11 -1816,16 +1957,21 @@@
          return conf.otc_coalescing_window_us;
      }
  
+     public static int getOtcCoalescingEnoughCoalescedMessages()
+     {
+         return conf.otc_coalescing_enough_coalesced_messages;
+     }
+ 
+     public static void setOtcCoalescingEnoughCoalescedMessages(int otc_coalescing_enough_coalesced_messages)
+     {
+         conf.otc_coalescing_enough_coalesced_messages = otc_coalescing_enough_coalesced_messages;
+     }
+ 
 +    public static int getWindowsTimerInterval()
 +    {
 +        return conf.windows_timer_interval;
 +    }
 +
      public static boolean enableUserDefinedFunctions()
      {
          return conf.enable_user_defined_functions;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
----------------------------------------------------------------------


[06/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by aw...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3d01e906
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3d01e906
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3d01e906

Branch: refs/heads/cassandra-3.0
Commit: 3d01e906152f02f826a9663afb4a1232060ad584
Parents: 3a570d7 5725e2c
Author: Ariel Weisberg <aw...@apple.com>
Authored: Mon Feb 13 13:00:20 2017 -0500
Committer: Ariel Weisberg <aw...@apple.com>
Committed: Mon Feb 13 13:23:39 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra.yaml                             | 27 +++++++++++++
 .../org/apache/cassandra/config/Config.java     |  3 +-
 .../cassandra/config/DatabaseDescriptor.java    | 16 ++++++++
 .../cassandra/net/OutboundTcpConnection.java    |  4 +-
 .../cassandra/utils/CoalescingStrategies.java   | 32 ++++++++++++----
 .../utils/CoalescingStrategiesTest.java         | 40 ++++++++++++++++++++
 7 files changed, 114 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ac3d1ed,4052b0f..b19550a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,43 -1,5 +1,44 @@@
 -2.2.9
 +3.0.11
 + * Add vm.max_map_count StartupCheck (CASSANDRA-13008)
 + * Hint related logging should include the IP address of the destination in addition to 
 +   host ID (CASSANDRA-13205)
 + * Reloading logback.xml does not work (CASSANDRA-13173)
 + * Lightweight transactions temporarily fail after upgrade from 2.1 to 3.0 (CASSANDRA-13109)
 + * Duplicate rows after upgrading from 2.1.16 to 3.0.10/3.9 (CASSANDRA-13125)
 + * Fix UPDATE queries with empty IN restrictions (CASSANDRA-13152)
 + * Abort or retry on failed hints delivery (CASSANDRA-13124)
 + * Fix handling of partition with partition-level deletion plus
 +   live rows in sstabledump (CASSANDRA-13177)
 + * Provide user workaround when system_schema.columns does not contain entries
 +   for a table that's in system_schema.tables (CASSANDRA-13180)
 + * Dump threads when unit tests time out (CASSANDRA-13117)
 + * Better error when modifying function permissions without explicit keyspace (CASSANDRA-12925)
 + * Indexer is not correctly invoked when building indexes over sstables (CASSANDRA-13075)
 + * Read repair is not blocking repair to finish in foreground repair (CASSANDRA-13115)
 + * Stress daemon help is incorrect (CASSANDRA-12563)
 + * Remove ALTER TYPE support (CASSANDRA-12443)
 + * Fix assertion for certain legacy range tombstone pattern (CASSANDRA-12203)
 + * Set javac encoding to utf-8 (CASSANDRA-11077)
 + * Replace empty strings with null values if they cannot be converted (CASSANDRA-12794)
 + * Fixed flacky SSTableRewriterTest: check file counts before calling validateCFS (CASSANDRA-12348)
 + * Fix deserialization of 2.x DeletedCells (CASSANDRA-12620)
 + * Add parent repair session id to anticompaction log message (CASSANDRA-12186)
 + * Improve contention handling on failure to acquire MV lock for streaming and hints (CASSANDRA-12905)
 + * Fix DELETE and UPDATE queries with empty IN restrictions (CASSANDRA-12829)
 + * Mark MVs as built after successful bootstrap (CASSANDRA-12984)
 + * Estimated TS drop-time histogram updated with Cell.NO_DELETION_TIME (CASSANDRA-13040)
 + * Nodetool compactionstats fails with NullPointerException (CASSANDRA-13021)
 + * Thread local pools never cleaned up (CASSANDRA-13033)
 + * Set RPC_READY to false when draining or if a node is marked as shutdown (CASSANDRA-12781)
 + * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
 + * Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
 + * Nodetool should use a more sane max heap size (CASSANDRA-12739)
 + * LocalToken ensures token values are cloned on heap (CASSANDRA-12651)
 + * AnticompactionRequestSerializer serializedSize is incorrect (CASSANDRA-12934)
 + * Prevent reloading of logback.xml from UDF sandbox (CASSANDRA-12535)
 + * Reenable HeapPool (CASSANDRA-12900)
 +Merged from 2.2:
+  * Coalescing strategy sleeps too much and shouldn't be enabled by default (CASSANDRA-13090)
   * Fix negative mean latency metric (CASSANDRA-12876)
   * Use only one file pointer when creating commitlog segments (CASSANDRA-12539)
   * Fix speculative retry bugs (CASSANDRA-13009)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/conf/cassandra.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 91b75ed,981026d..602214f
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -708,29 -690,11 +708,35 @@@ public class DatabaseDescripto
          if (seedProvider.getSeeds().size() == 0)
              throw new ConfigurationException("The seed provider lists no seeds.", false);
  
 +        if (conf.user_defined_function_fail_timeout < 0)
 +            throw new ConfigurationException("user_defined_function_fail_timeout must not be negative", false);
 +        if (conf.user_defined_function_warn_timeout < 0)
 +            throw new ConfigurationException("user_defined_function_warn_timeout must not be negative", false);
 +
 +        if (conf.user_defined_function_fail_timeout < conf.user_defined_function_warn_timeout)
 +            throw new ConfigurationException("user_defined_function_warn_timeout must less than user_defined_function_fail_timeout", false);
 +
 +        if (conf.max_mutation_size_in_kb == null)
 +            conf.max_mutation_size_in_kb = conf.commitlog_segment_size_in_mb * 1024 / 2;
 +        else if (conf.commitlog_segment_size_in_mb * 1024 < 2 * conf.max_mutation_size_in_kb)
 +            throw new ConfigurationException("commitlog_segment_size_in_mb must be at least twice the size of max_mutation_size_in_kb / 1024", false);
 +
 +        // native transport encryption options
 +        if (conf.native_transport_port_ssl != null
 +            && conf.native_transport_port_ssl.intValue() != conf.native_transport_port.intValue()
 +            && !conf.client_encryption_options.enabled)
 +        {
 +            throw new ConfigurationException("Encryption must be enabled in client_encryption_options for native_transport_port_ssl", false);
 +        }
 +
 +        if (conf.max_value_size_in_mb == null || conf.max_value_size_in_mb <= 0)
 +            throw new ConfigurationException("max_value_size_in_mb must be positive", false);
++
+         if (conf.otc_coalescing_enough_coalesced_messages > 128)
+             throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be smaller than 128", false);
+ 
+         if (conf.otc_coalescing_enough_coalesced_messages <= 0)
+             throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be positive", false);
      }
  
      private static FileStore guessFileStore(String dir) throws IOException
@@@ -1951,11 -1816,16 +1957,21 @@@
          return conf.otc_coalescing_window_us;
      }
  
+     public static int getOtcCoalescingEnoughCoalescedMessages()
+     {
+         return conf.otc_coalescing_enough_coalesced_messages;
+     }
+ 
+     public static void setOtcCoalescingEnoughCoalescedMessages(int otc_coalescing_enough_coalesced_messages)
+     {
+         conf.otc_coalescing_enough_coalesced_messages = otc_coalescing_enough_coalesced_messages;
+     }
+ 
 +    public static int getWindowsTimerInterval()
 +    {
 +        return conf.windows_timer_interval;
 +    }
 +
      public static boolean enableUserDefinedFunctions()
      {
          return conf.enable_user_defined_functions;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d01e906/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
----------------------------------------------------------------------


[04/10] cassandra git commit: Coalescing strategies improvements CASSANDRA-13090

Posted by aw...@apache.org.
Coalescing strategies improvements CASSANDRA-13090

With the previous code TIMEHORIZON and MOVINGAVERAGE
coalesing strategy would wait even when the backlog
still contains data which would make it grow even more.

Also:
- cleanups parkLoop()
- add otc_coalescing_max_coalesced_messages
- add otc_coalescing_enough_coalesced_messages
- add other otc_* settings to cassandra.yaml

patch by Corentin Chary <c....@criteo.com> reviewed by Ariel Weisberg <aw...@apple.com> for CASSANDRA-13090


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5725e2c4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5725e2c4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5725e2c4

Branch: refs/heads/trunk
Commit: 5725e2c422d21d8efe5ae3bc4389842939553650
Parents: 7e05f39
Author: Corentin Chary <c....@criteo.com>
Authored: Mon Jan 9 12:06:56 2017 -0500
Committer: Ariel Weisberg <aw...@apple.com>
Committed: Mon Feb 13 12:58:29 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra.yaml                             | 32 ++++++++++++++++
 .../org/apache/cassandra/config/Config.java     |  3 +-
 .../cassandra/config/DatabaseDescriptor.java    | 16 ++++++++
 .../cassandra/net/OutboundTcpConnection.java    |  4 +-
 .../cassandra/utils/CoalescingStrategies.java   | 32 ++++++++++++----
 .../utils/CoalescingStrategiesTest.java         | 40 ++++++++++++++++++++
 7 files changed, 119 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 214fe97..4052b0f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.9
+ * Coalescing strategy sleeps too much and shouldn't be enabled by default (CASSANDRA-13090)
  * Fix negative mean latency metric (CASSANDRA-12876)
  * Use only one file pointer when creating commitlog segments (CASSANDRA-12539)
  * Fix speculative retry bugs (CASSANDRA-13009)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index ab2aa14..ddb7927 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -884,3 +884,35 @@ enable_user_defined_functions: false
 # below their system default. The sysinternals 'clockres' tool can confirm your system's default
 # setting.
 windows_timer_interval: 1
+
+# Maximum size of any value in SSTables. Safety measure to detect SSTable corruption
+# early. Any value size larger than this threshold will result into marking an SSTable
+# as corrupted.
+# max_value_size_in_mb: 256
+
+# Coalescing Strategies #
+# Coalescing multiples messages turns out to significantly boost message processing throughput (think doubling or more).
+# On bare metal, the floor for packet processing throughput is high enough that many applications won\u2019t notice, but in
+# virtualized environments, the point at which an application can be bound by network packet processing can be
+# surprisingly low compared to the throughput of task processing that is possible inside a VM. It\u2019s not that bare metal
+# doesn\u2019t benefit from coalescing messages, it\u2019s that the number of packets a bare metal network interface can process
+# is sufficient for many applications such that no load starvation is experienced even without coalescing.
+# There are other benefits to coalescing network messages that are harder to isolate with a simple metric like messages
+# per second. By coalescing multiple tasks together, a network thread can process multiple messages for the cost of one
+# trip to read from a socket, and all the task submission work can be done at the same time reducing context switching
+# and increasing cache friendliness of network message processing.
+# See CASSANDRA-8692 for details.
+
+# Strategy to use for coalescing messages in OutboundTcpConnection.
+# Can be fixed, movingaverage, timehorizon (default), disabled.
+# You can also specify a subclass of CoalescingStrategies.CoalescingStrategy by name.
+# otc_coalescing_strategy: TIMEHORIZON
+
+# How many microseconds to wait for coalescing. For fixed strategy this is the amount of time after the first
+# message is received before it will be sent with any accompanying messages. For moving average this is the
+# maximum amount of time that will be waited as well as the interval at which messages must arrive on average
+# for coalescing to be enabled.
+# otc_coalescing_window_us: 200
+
+# Do not try to coalesce messages if we already got that many messages. This should be more than 2 and less than 128.
+# otc_coalescing_enough_coalesced_messages: 8

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 9f5a22f..64b36a0 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -281,12 +281,13 @@ public class Config
 
     /*
      * How many microseconds to wait for coalescing. For fixed strategy this is the amount of time after the first
-     * messgae is received before it will be sent with any accompanying messages. For moving average this is the
+     * message is received before it will be sent with any accompanying messages. For moving average this is the
      * maximum amount of time that will be waited as well as the interval at which messages must arrive on average
      * for coalescing to be enabled.
      */
     public static final int otc_coalescing_window_us_default = 200;
     public int otc_coalescing_window_us = otc_coalescing_window_us_default;
+    public int otc_coalescing_enough_coalesced_messages = 8;
 
     public int windows_timer_interval = 0;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 35debd0..981026d 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -689,6 +689,12 @@ public class DatabaseDescriptor
         }
         if (seedProvider.getSeeds().size() == 0)
             throw new ConfigurationException("The seed provider lists no seeds.", false);
+
+        if (conf.otc_coalescing_enough_coalesced_messages > 128)
+            throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be smaller than 128", false);
+
+        if (conf.otc_coalescing_enough_coalesced_messages <= 0)
+            throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be positive", false);
     }
 
     private static FileStore guessFileStore(String dir) throws IOException
@@ -1810,6 +1816,16 @@ public class DatabaseDescriptor
         return conf.otc_coalescing_window_us;
     }
 
+    public static int getOtcCoalescingEnoughCoalescedMessages()
+    {
+        return conf.otc_coalescing_enough_coalesced_messages;
+    }
+
+    public static void setOtcCoalescingEnoughCoalescedMessages(int otc_coalescing_enough_coalesced_messages)
+    {
+        conf.otc_coalescing_enough_coalesced_messages = otc_coalescing_enough_coalesced_messages;
+    }
+
     public static boolean enableUserDefinedFunctions()
     {
         return conf.enable_user_defined_functions;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 1bf3ea3..6ec78a4 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -81,6 +81,8 @@ public class OutboundTcpConnection extends Thread
     private static final String BUFFER_SIZE_PROPERTY = PREFIX + "otc_buffer_size";
     private static final int BUFFER_SIZE = Integer.getInteger(BUFFER_SIZE_PROPERTY, 1024 * 64);
 
+    public static final int MAX_COALESCED_MESSAGES = 128;
+
     private static CoalescingStrategy newCoalescingStrategy(String displayName)
     {
         return CoalescingStrategies.newCoalescingStrategy(DatabaseDescriptor.getOtcCoalescingStrategy(),
@@ -186,7 +188,7 @@ public class OutboundTcpConnection extends Thread
 
     public void run()
     {
-        final int drainedMessageSize = 128;
+        final int drainedMessageSize = MAX_COALESCED_MESSAGES;
         // keeping list (batch) size small for now; that way we don't have an unbounded array (that we never resize)
         final List<QueuedMessage> drainedMessages = new ArrayList<>(drainedMessageSize);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
index 826bd64..52d4240 100644
--- a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
+++ b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
@@ -18,8 +18,11 @@
 package org.apache.cassandra.utils;
 
 import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.util.FileUtils;
+
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.RandomAccessFile;
@@ -38,6 +41,7 @@ import com.google.common.base.Preconditions;
 
 public class CoalescingStrategies
 {
+    static protected final Logger logger = LoggerFactory.getLogger(CoalescingStrategies.class);
 
     /*
      * Log debug information at info level about what the average is and when coalescing is enabled/disabled
@@ -85,15 +89,23 @@ public class CoalescingStrategies
     {
         long now = System.nanoTime();
         final long timer = now + nanos;
+        // We shouldn't loop if it's within a few % of the target sleep time if on a second iteration.
+        // See CASSANDRA-8692.
+        final long limit = timer - nanos / 16;
         do
         {
             LockSupport.parkNanos(timer - now);
+            now = System.nanoTime();
         }
-        while (timer - (now = System.nanoTime()) > nanos / 16);
+        while (now < limit);
     }
 
     private static boolean maybeSleep(int messages, long averageGap, long maxCoalesceWindow, Parker parker)
     {
+        // Do not sleep if there are still items in the backlog (CASSANDRA-13090).
+        if (messages >= DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages())
+            return false;
+
         // only sleep if we can expect to double the number of messages we're sending in the time interval
         long sleep = messages * averageGap;
         if (sleep <= 0 || sleep > maxCoalesceWindow)
@@ -329,7 +341,7 @@ public class CoalescingStrategies
             if (input.drainTo(out, maxItems) == 0)
             {
                 out.add(input.take());
-                input.drainTo(out, maxItems - 1);
+                input.drainTo(out, maxItems - out.size());
             }
 
             for (Coalescable qm : out)
@@ -411,15 +423,16 @@ public class CoalescingStrategies
             if (input.drainTo(out, maxItems) == 0)
             {
                 out.add(input.take());
+                input.drainTo(out, maxItems - out.size());
             }
 
             long average = notifyOfSample(out.get(0).timestampNanos());
-
             debugGap(average);
 
-            maybeSleep(out.size(), average, maxCoalesceWindow, parker);
+            if (maybeSleep(out.size(), average, maxCoalesceWindow, parker)) {
+                input.drainTo(out, maxItems - out.size());
+            }
 
-            input.drainTo(out, maxItems - out.size());
             for (int ii = 1; ii < out.size(); ii++)
                 notifyOfSample(out.get(ii).timestampNanos());
         }
@@ -447,11 +460,16 @@ public class CoalescingStrategies
         @Override
         protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out,  int maxItems) throws InterruptedException
         {
+            int enough = DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages();
+
             if (input.drainTo(out, maxItems) == 0)
             {
                 out.add(input.take());
-                parker.park(coalesceWindow);
-                input.drainTo(out, maxItems - 1);
+                input.drainTo(out, maxItems - out.size());
+                if (out.size() < enough) {
+                    parker.park(coalesceWindow);
+                    input.drainTo(out, maxItems - out.size());
+                }
             }
             debugTimestamps(out);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
index 97d15fe..26b6b3a 100644
--- a/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
+++ b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
@@ -17,10 +17,12 @@
  */
 package org.apache.cassandra.utils;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.utils.CoalescingStrategies.Clock;
 import org.apache.cassandra.utils.CoalescingStrategies.Coalescable;
 import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy;
 import org.apache.cassandra.utils.CoalescingStrategies.Parker;
+import org.junit.BeforeClass;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -101,6 +103,12 @@ public class CoalescingStrategiesTest
     Semaphore queueParked = new Semaphore(0);
     Semaphore queueRelease = new Semaphore(0);
 
+    @BeforeClass
+    public static void initDD()
+    {
+        DatabaseDescriptor.forceStaticInitialization();
+    }
+
     @SuppressWarnings({ "serial" })
     @Before
     public void setUp() throws Exception
@@ -207,6 +215,38 @@ public class CoalescingStrategiesTest
     }
 
     @Test
+    public void testFixedCoalescingStrategyEnough() throws Exception
+    {
+        int oldValue = DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages();
+        DatabaseDescriptor.setOtcCoalescingEnoughCoalescedMessages(1);
+        try {
+            cs = newStrategy("FIXED", 200);
+
+            //Test that when a stream of messages continues arriving it keeps sending until all are drained
+            //It does this because it is already awake and sending messages
+            add(42);
+            add(42);
+            cs.coalesce(input, output, 128);
+            assertEquals(2, output.size());
+            assertNull(parker.parks.poll());
+
+            clear();
+
+            runBlocker(queueParked);
+            add(42);
+            add(42);
+            add(42);
+            release();
+            assertEquals(3, output.size());
+            assertNull(parker.parks.poll());
+        }
+        finally {
+            DatabaseDescriptor.setOtcCoalescingEnoughCoalescedMessages(oldValue);
+        }
+
+    }
+
+    @Test
     public void testDisabledCoalescingStrateg() throws Exception
     {
         cs = newStrategy("DISABLED", 200);


[03/10] cassandra git commit: Coalescing strategies improvements CASSANDRA-13090

Posted by aw...@apache.org.
Coalescing strategies improvements CASSANDRA-13090

With the previous code TIMEHORIZON and MOVINGAVERAGE
coalesing strategy would wait even when the backlog
still contains data which would make it grow even more.

Also:
- cleanups parkLoop()
- add otc_coalescing_max_coalesced_messages
- add otc_coalescing_enough_coalesced_messages
- add other otc_* settings to cassandra.yaml

patch by Corentin Chary <c....@criteo.com> reviewed by Ariel Weisberg <aw...@apple.com> for CASSANDRA-13090


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5725e2c4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5725e2c4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5725e2c4

Branch: refs/heads/cassandra-3.11
Commit: 5725e2c422d21d8efe5ae3bc4389842939553650
Parents: 7e05f39
Author: Corentin Chary <c....@criteo.com>
Authored: Mon Jan 9 12:06:56 2017 -0500
Committer: Ariel Weisberg <aw...@apple.com>
Committed: Mon Feb 13 12:58:29 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra.yaml                             | 32 ++++++++++++++++
 .../org/apache/cassandra/config/Config.java     |  3 +-
 .../cassandra/config/DatabaseDescriptor.java    | 16 ++++++++
 .../cassandra/net/OutboundTcpConnection.java    |  4 +-
 .../cassandra/utils/CoalescingStrategies.java   | 32 ++++++++++++----
 .../utils/CoalescingStrategiesTest.java         | 40 ++++++++++++++++++++
 7 files changed, 119 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 214fe97..4052b0f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.9
+ * Coalescing strategy sleeps too much and shouldn't be enabled by default (CASSANDRA-13090)
  * Fix negative mean latency metric (CASSANDRA-12876)
  * Use only one file pointer when creating commitlog segments (CASSANDRA-12539)
  * Fix speculative retry bugs (CASSANDRA-13009)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index ab2aa14..ddb7927 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -884,3 +884,35 @@ enable_user_defined_functions: false
 # below their system default. The sysinternals 'clockres' tool can confirm your system's default
 # setting.
 windows_timer_interval: 1
+
+# Maximum size of any value in SSTables. Safety measure to detect SSTable corruption
+# early. Any value size larger than this threshold will result into marking an SSTable
+# as corrupted.
+# max_value_size_in_mb: 256
+
+# Coalescing Strategies #
+# Coalescing multiples messages turns out to significantly boost message processing throughput (think doubling or more).
+# On bare metal, the floor for packet processing throughput is high enough that many applications won\u2019t notice, but in
+# virtualized environments, the point at which an application can be bound by network packet processing can be
+# surprisingly low compared to the throughput of task processing that is possible inside a VM. It\u2019s not that bare metal
+# doesn\u2019t benefit from coalescing messages, it\u2019s that the number of packets a bare metal network interface can process
+# is sufficient for many applications such that no load starvation is experienced even without coalescing.
+# There are other benefits to coalescing network messages that are harder to isolate with a simple metric like messages
+# per second. By coalescing multiple tasks together, a network thread can process multiple messages for the cost of one
+# trip to read from a socket, and all the task submission work can be done at the same time reducing context switching
+# and increasing cache friendliness of network message processing.
+# See CASSANDRA-8692 for details.
+
+# Strategy to use for coalescing messages in OutboundTcpConnection.
+# Can be fixed, movingaverage, timehorizon (default), disabled.
+# You can also specify a subclass of CoalescingStrategies.CoalescingStrategy by name.
+# otc_coalescing_strategy: TIMEHORIZON
+
+# How many microseconds to wait for coalescing. For fixed strategy this is the amount of time after the first
+# message is received before it will be sent with any accompanying messages. For moving average this is the
+# maximum amount of time that will be waited as well as the interval at which messages must arrive on average
+# for coalescing to be enabled.
+# otc_coalescing_window_us: 200
+
+# Do not try to coalesce messages if we already got that many messages. This should be more than 2 and less than 128.
+# otc_coalescing_enough_coalesced_messages: 8

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 9f5a22f..64b36a0 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -281,12 +281,13 @@ public class Config
 
     /*
      * How many microseconds to wait for coalescing. For fixed strategy this is the amount of time after the first
-     * messgae is received before it will be sent with any accompanying messages. For moving average this is the
+     * message is received before it will be sent with any accompanying messages. For moving average this is the
      * maximum amount of time that will be waited as well as the interval at which messages must arrive on average
      * for coalescing to be enabled.
      */
     public static final int otc_coalescing_window_us_default = 200;
     public int otc_coalescing_window_us = otc_coalescing_window_us_default;
+    public int otc_coalescing_enough_coalesced_messages = 8;
 
     public int windows_timer_interval = 0;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 35debd0..981026d 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -689,6 +689,12 @@ public class DatabaseDescriptor
         }
         if (seedProvider.getSeeds().size() == 0)
             throw new ConfigurationException("The seed provider lists no seeds.", false);
+
+        if (conf.otc_coalescing_enough_coalesced_messages > 128)
+            throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be smaller than 128", false);
+
+        if (conf.otc_coalescing_enough_coalesced_messages <= 0)
+            throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be positive", false);
     }
 
     private static FileStore guessFileStore(String dir) throws IOException
@@ -1810,6 +1816,16 @@ public class DatabaseDescriptor
         return conf.otc_coalescing_window_us;
     }
 
+    public static int getOtcCoalescingEnoughCoalescedMessages()
+    {
+        return conf.otc_coalescing_enough_coalesced_messages;
+    }
+
+    public static void setOtcCoalescingEnoughCoalescedMessages(int otc_coalescing_enough_coalesced_messages)
+    {
+        conf.otc_coalescing_enough_coalesced_messages = otc_coalescing_enough_coalesced_messages;
+    }
+
     public static boolean enableUserDefinedFunctions()
     {
         return conf.enable_user_defined_functions;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 1bf3ea3..6ec78a4 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -81,6 +81,8 @@ public class OutboundTcpConnection extends Thread
     private static final String BUFFER_SIZE_PROPERTY = PREFIX + "otc_buffer_size";
     private static final int BUFFER_SIZE = Integer.getInteger(BUFFER_SIZE_PROPERTY, 1024 * 64);
 
+    public static final int MAX_COALESCED_MESSAGES = 128;
+
     private static CoalescingStrategy newCoalescingStrategy(String displayName)
     {
         return CoalescingStrategies.newCoalescingStrategy(DatabaseDescriptor.getOtcCoalescingStrategy(),
@@ -186,7 +188,7 @@ public class OutboundTcpConnection extends Thread
 
     public void run()
     {
-        final int drainedMessageSize = 128;
+        final int drainedMessageSize = MAX_COALESCED_MESSAGES;
         // keeping list (batch) size small for now; that way we don't have an unbounded array (that we never resize)
         final List<QueuedMessage> drainedMessages = new ArrayList<>(drainedMessageSize);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
index 826bd64..52d4240 100644
--- a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
+++ b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
@@ -18,8 +18,11 @@
 package org.apache.cassandra.utils;
 
 import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.util.FileUtils;
+
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.RandomAccessFile;
@@ -38,6 +41,7 @@ import com.google.common.base.Preconditions;
 
 public class CoalescingStrategies
 {
+    static protected final Logger logger = LoggerFactory.getLogger(CoalescingStrategies.class);
 
     /*
      * Log debug information at info level about what the average is and when coalescing is enabled/disabled
@@ -85,15 +89,23 @@ public class CoalescingStrategies
     {
         long now = System.nanoTime();
         final long timer = now + nanos;
+        // We shouldn't loop if it's within a few % of the target sleep time if on a second iteration.
+        // See CASSANDRA-8692.
+        final long limit = timer - nanos / 16;
         do
         {
             LockSupport.parkNanos(timer - now);
+            now = System.nanoTime();
         }
-        while (timer - (now = System.nanoTime()) > nanos / 16);
+        while (now < limit);
     }
 
     private static boolean maybeSleep(int messages, long averageGap, long maxCoalesceWindow, Parker parker)
     {
+        // Do not sleep if there are still items in the backlog (CASSANDRA-13090).
+        if (messages >= DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages())
+            return false;
+
         // only sleep if we can expect to double the number of messages we're sending in the time interval
         long sleep = messages * averageGap;
         if (sleep <= 0 || sleep > maxCoalesceWindow)
@@ -329,7 +341,7 @@ public class CoalescingStrategies
             if (input.drainTo(out, maxItems) == 0)
             {
                 out.add(input.take());
-                input.drainTo(out, maxItems - 1);
+                input.drainTo(out, maxItems - out.size());
             }
 
             for (Coalescable qm : out)
@@ -411,15 +423,16 @@ public class CoalescingStrategies
             if (input.drainTo(out, maxItems) == 0)
             {
                 out.add(input.take());
+                input.drainTo(out, maxItems - out.size());
             }
 
             long average = notifyOfSample(out.get(0).timestampNanos());
-
             debugGap(average);
 
-            maybeSleep(out.size(), average, maxCoalesceWindow, parker);
+            if (maybeSleep(out.size(), average, maxCoalesceWindow, parker)) {
+                input.drainTo(out, maxItems - out.size());
+            }
 
-            input.drainTo(out, maxItems - out.size());
             for (int ii = 1; ii < out.size(); ii++)
                 notifyOfSample(out.get(ii).timestampNanos());
         }
@@ -447,11 +460,16 @@ public class CoalescingStrategies
         @Override
         protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out,  int maxItems) throws InterruptedException
         {
+            int enough = DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages();
+
             if (input.drainTo(out, maxItems) == 0)
             {
                 out.add(input.take());
-                parker.park(coalesceWindow);
-                input.drainTo(out, maxItems - 1);
+                input.drainTo(out, maxItems - out.size());
+                if (out.size() < enough) {
+                    parker.park(coalesceWindow);
+                    input.drainTo(out, maxItems - out.size());
+                }
             }
             debugTimestamps(out);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5725e2c4/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
index 97d15fe..26b6b3a 100644
--- a/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
+++ b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
@@ -17,10 +17,12 @@
  */
 package org.apache.cassandra.utils;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.utils.CoalescingStrategies.Clock;
 import org.apache.cassandra.utils.CoalescingStrategies.Coalescable;
 import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy;
 import org.apache.cassandra.utils.CoalescingStrategies.Parker;
+import org.junit.BeforeClass;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -101,6 +103,12 @@ public class CoalescingStrategiesTest
     Semaphore queueParked = new Semaphore(0);
     Semaphore queueRelease = new Semaphore(0);
 
+    @BeforeClass
+    public static void initDD()
+    {
+        DatabaseDescriptor.forceStaticInitialization();
+    }
+
     @SuppressWarnings({ "serial" })
     @Before
     public void setUp() throws Exception
@@ -207,6 +215,38 @@ public class CoalescingStrategiesTest
     }
 
     @Test
+    public void testFixedCoalescingStrategyEnough() throws Exception
+    {
+        int oldValue = DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages();
+        DatabaseDescriptor.setOtcCoalescingEnoughCoalescedMessages(1);
+        try {
+            cs = newStrategy("FIXED", 200);
+
+            //Test that when a stream of messages continues arriving it keeps sending until all are drained
+            //It does this because it is already awake and sending messages
+            add(42);
+            add(42);
+            cs.coalesce(input, output, 128);
+            assertEquals(2, output.size());
+            assertNull(parker.parks.poll());
+
+            clear();
+
+            runBlocker(queueParked);
+            add(42);
+            add(42);
+            add(42);
+            release();
+            assertEquals(3, output.size());
+            assertNull(parker.parks.poll());
+        }
+        finally {
+            DatabaseDescriptor.setOtcCoalescingEnoughCoalescedMessages(oldValue);
+        }
+
+    }
+
+    @Test
     public void testDisabledCoalescingStrateg() throws Exception
     {
         cs = newStrategy("DISABLED", 200);