You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2018/12/06 15:46:48 UTC

[1/9] cassandra git commit: Making immediate flusher OFF by default

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 0c97908b2 -> ef5ce7cf5
  refs/heads/cassandra-3.11 85e402a7f -> 866a89435
  refs/heads/trunk 1f19d5f7a -> 2275ab6d6


Making immediate flusher OFF by default


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ef5ce7cf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ef5ce7cf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ef5ce7cf

Branch: refs/heads/cassandra-3.0
Commit: ef5ce7cf5d5bd5308f3612f8419d623a0c260abe
Parents: 9bcbb45
Author: sumanthpasupuleti <su...@gmail.com>
Authored: Tue Nov 20 08:17:01 2018 -0800
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu Dec 6 15:38:47 2018 +0000

----------------------------------------------------------------------
 conf/cassandra.yaml                              | 7 +------
 src/java/org/apache/cassandra/config/Config.java | 2 +-
 2 files changed, 2 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5ce7cf/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 2e6f363..84664fe 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1002,9 +1002,4 @@ windows_timer_interval: 1
 # time and queue contention while iterating the backlog of messages.
 # An interval of 0 disables any wait time, which is the behavior of former Cassandra versions.
 #
-# otc_backlog_expiration_interval_ms: 200
-
-# Define use of legacy delayed flusher for replies to TCP connections. This will increase latency, but might be beneficial for
-# legacy use-cases where only a single connection is used for each Cassandra node. Default is false.
-# native_transport_flush_in_batches_legacy: false
-
+# otc_backlog_expiration_interval_ms: 200
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5ce7cf/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 6d56c74..130ff08 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -149,7 +149,7 @@ public class Config
     public Integer native_transport_max_frame_size_in_mb = 256;
     public volatile Long native_transport_max_concurrent_connections = -1L;
     public volatile Long native_transport_max_concurrent_connections_per_ip = -1L;
-    public boolean native_transport_flush_in_batches_legacy = false;
+    public boolean native_transport_flush_in_batches_legacy = true;
 
     @Deprecated
     public Integer thrift_max_message_length_in_mb = 16;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[2/9] cassandra git commit: Backporting ImmediateFlusher from trunk

Posted by be...@apache.org.
Backporting ImmediateFlusher from trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9bcbb457
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9bcbb457
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9bcbb457

Branch: refs/heads/cassandra-3.0
Commit: 9bcbb457fce9f0007597145f1a150a42b7935ef0
Parents: 0c97908
Author: Michael Burman <ya...@iki.fi>
Authored: Tue May 8 15:40:54 2018 +0300
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu Dec 6 15:38:47 2018 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra.yaml                             |  5 ++
 .../org/apache/cassandra/config/Config.java     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    |  6 +-
 .../org/apache/cassandra/transport/Message.java | 76 ++++++++++++++++----
 .../org/apache/cassandra/transport/Server.java  |  2 +-
 6 files changed, 76 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9bcbb457/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d3d7158..e349674 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,7 @@
  * Fix corrupted static collection deletions in 3.0 <-> 2.{1,2} messages (CASSANDRA-14568)
  * Handle failures in parallelAllSSTableOperation (cleanup/upgradesstables/etc) (CASSANDRA-14657)
  * Improve TokenMetaData cache populating performance avoid long locking (CASSANDRA-14660)
+ * Backport: Flush netty client messages immediately (not by default) (CASSANDRA-13651)
  * Fix static column order for SELECT * wildcard queries (CASSANDRA-14638)
  * sstableloader should use discovered broadcast address to connect intra-cluster (CASSANDRA-14522)
  * Fix reading columns with non-UTF names from schema (CASSANDRA-14468)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9bcbb457/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index ef7b034..2e6f363 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1003,3 +1003,8 @@ windows_timer_interval: 1
 # An interval of 0 disables any wait time, which is the behavior of former Cassandra versions.
 #
 # otc_backlog_expiration_interval_ms: 200
+
+# Define use of legacy delayed flusher for replies to TCP connections. This will increase latency, but might be beneficial for
+# legacy use-cases where only a single connection is used for each Cassandra node. Default is false.
+# native_transport_flush_in_batches_legacy: false
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9bcbb457/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 64d41bb..6d56c74 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -149,6 +149,7 @@ public class Config
     public Integer native_transport_max_frame_size_in_mb = 256;
     public volatile Long native_transport_max_concurrent_connections = -1L;
     public volatile Long native_transport_max_concurrent_connections_per_ip = -1L;
+    public boolean native_transport_flush_in_batches_legacy = false;
 
     @Deprecated
     public Integer thrift_max_message_length_in_mb = 16;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9bcbb457/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index efc71ef..0fd785a 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1499,6 +1499,11 @@ public class DatabaseDescriptor
         conf.native_transport_max_concurrent_connections_per_ip = native_transport_max_concurrent_connections_per_ip;
     }
 
+    public static boolean useNativeTransportLegacyFlusher()
+    {
+        return conf.native_transport_flush_in_batches_legacy;
+    }
+
     public static double getCommitLogSyncBatchWindow()
     {
         return conf.commitlog_sync_batch_window_in_ms;
@@ -2077,5 +2082,4 @@ public class DatabaseDescriptor
     {
         return conf.gc_warn_threshold_in_ms;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9bcbb457/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index 91ece5c..0851b19 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -420,26 +420,38 @@ public abstract class Message
             }
         }
 
-        private static final class Flusher implements Runnable
+        private static abstract class Flusher implements Runnable
         {
             final EventLoop eventLoop;
             final ConcurrentLinkedQueue<FlushItem> queued = new ConcurrentLinkedQueue<>();
-            final AtomicBoolean running = new AtomicBoolean(false);
+            final AtomicBoolean scheduled = new AtomicBoolean(false);
             final HashSet<ChannelHandlerContext> channels = new HashSet<>();
             final List<FlushItem> flushed = new ArrayList<>();
-            int runsSinceFlush = 0;
-            int runsWithNoWork = 0;
-            private Flusher(EventLoop eventLoop)
-            {
-                this.eventLoop = eventLoop;
-            }
+
             void start()
             {
-                if (!running.get() && running.compareAndSet(false, true))
+                if (!scheduled.get() && scheduled.compareAndSet(false, true))
                 {
                     this.eventLoop.execute(this);
                 }
             }
+
+            public Flusher(EventLoop eventLoop)
+            {
+                this.eventLoop = eventLoop;
+            }
+        }
+
+        private static final class LegacyFlusher extends Flusher
+        {
+            int runsSinceFlush = 0;
+            int runsWithNoWork = 0;
+
+            private LegacyFlusher(EventLoop eventLoop)
+            {
+                super(eventLoop);
+            }
+
             public void run()
             {
 
@@ -476,8 +488,8 @@ public abstract class Message
                     // either reschedule or cancel
                     if (++runsWithNoWork > 5)
                     {
-                        running.set(false);
-                        if (queued.isEmpty() || !running.compareAndSet(false, true))
+                        scheduled.set(false);
+                        if (queued.isEmpty() || !scheduled.compareAndSet(false, true))
                             return;
                     }
                 }
@@ -486,11 +498,48 @@ public abstract class Message
             }
         }
 
+        private static final class ImmediateFlusher extends Flusher
+        {
+            private ImmediateFlusher(EventLoop eventLoop)
+            {
+                super(eventLoop);
+            }
+
+            public void run()
+            {
+                boolean doneWork = false;
+                FlushItem flush;
+                scheduled.set(false);
+
+                while (null != (flush = queued.poll()))
+                {
+                    channels.add(flush.ctx);
+                    flush.ctx.write(flush.response, flush.ctx.voidPromise());
+                    flushed.add(flush);
+                    doneWork = true;
+                }
+
+                if (doneWork)
+                {
+                    for (ChannelHandlerContext channel : channels)
+                        channel.flush();
+                    for (FlushItem item : flushed)
+                        item.sourceFrame.release();
+
+                    channels.clear();
+                    flushed.clear();
+                }
+            }
+        }
+
         private static final ConcurrentMap<EventLoop, Flusher> flusherLookup = new ConcurrentHashMap<>();
 
-        public Dispatcher()
+        private final boolean useLegacyFlusher;
+
+        public Dispatcher(boolean useLegacyFlusher)
         {
             super(false);
+            this.useLegacyFlusher = useLegacyFlusher;
         }
 
         @Override
@@ -538,7 +587,8 @@ public abstract class Message
             Flusher flusher = flusherLookup.get(loop);
             if (flusher == null)
             {
-                Flusher alt = flusherLookup.putIfAbsent(loop, flusher = new Flusher(loop));
+                Flusher created = useLegacyFlusher ? new LegacyFlusher(loop) : new ImmediateFlusher(loop);
+                Flusher alt = flusherLookup.putIfAbsent(loop, flusher = created);
                 if (alt != null)
                     flusher = alt;
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9bcbb457/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 7df194d..8c781db 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -295,7 +295,7 @@ public class Server implements CassandraDaemon.Server
         private static final Frame.Compressor frameCompressor = new Frame.Compressor();
         private static final Frame.Encoder frameEncoder = new Frame.Encoder();
         private static final Message.ExceptionHandler exceptionHandler = new Message.ExceptionHandler();
-        private static final Message.Dispatcher dispatcher = new Message.Dispatcher();
+        private static final Message.Dispatcher dispatcher = new Message.Dispatcher(DatabaseDescriptor.useNativeTransportLegacyFlusher());
         private static final ConnectionLimitHandler connectionLimitHandler = new ConnectionLimitHandler();
 
         private final Server server;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[6/9] cassandra git commit: Making immediate flusher OFF by default

Posted by be...@apache.org.
Making immediate flusher OFF by default


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ef5ce7cf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ef5ce7cf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ef5ce7cf

Branch: refs/heads/cassandra-3.11
Commit: ef5ce7cf5d5bd5308f3612f8419d623a0c260abe
Parents: 9bcbb45
Author: sumanthpasupuleti <su...@gmail.com>
Authored: Tue Nov 20 08:17:01 2018 -0800
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu Dec 6 15:38:47 2018 +0000

----------------------------------------------------------------------
 conf/cassandra.yaml                              | 7 +------
 src/java/org/apache/cassandra/config/Config.java | 2 +-
 2 files changed, 2 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5ce7cf/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 2e6f363..84664fe 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1002,9 +1002,4 @@ windows_timer_interval: 1
 # time and queue contention while iterating the backlog of messages.
 # An interval of 0 disables any wait time, which is the behavior of former Cassandra versions.
 #
-# otc_backlog_expiration_interval_ms: 200
-
-# Define use of legacy delayed flusher for replies to TCP connections. This will increase latency, but might be beneficial for
-# legacy use-cases where only a single connection is used for each Cassandra node. Default is false.
-# native_transport_flush_in_batches_legacy: false
-
+# otc_backlog_expiration_interval_ms: 200
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5ce7cf/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 6d56c74..130ff08 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -149,7 +149,7 @@ public class Config
     public Integer native_transport_max_frame_size_in_mb = 256;
     public volatile Long native_transport_max_concurrent_connections = -1L;
     public volatile Long native_transport_max_concurrent_connections_per_ip = -1L;
-    public boolean native_transport_flush_in_batches_legacy = false;
+    public boolean native_transport_flush_in_batches_legacy = true;
 
     @Deprecated
     public Integer thrift_max_message_length_in_mb = 16;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[7/9] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by be...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/866a8943
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/866a8943
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/866a8943

Branch: refs/heads/cassandra-3.11
Commit: 866a89435f9d6453b611a9fdbc84a50377d19c2a
Parents: 85e402a ef5ce7c
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Thu Dec 6 15:41:11 2018 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu Dec 6 15:41:11 2018 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra.yaml                             |  2 +-
 .../org/apache/cassandra/config/Config.java     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    |  5 ++
 .../org/apache/cassandra/transport/Message.java | 76 ++++++++++++++++----
 .../org/apache/cassandra/transport/Server.java  |  2 +-
 6 files changed, 72 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/866a8943/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/866a8943/conf/cassandra.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/866a8943/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/Config.java
index a01203c,130ff08..603d851
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -133,32 -126,33 +133,33 @@@ public class Confi
  
      /* intentionally left set to true, despite being set to false in stock 2.2 cassandra.yaml
         we don't want to surprise Thrift users who have the setting blank in the yaml during 2.1->2.2 upgrade */
 -    public Boolean start_rpc = true;
 +    public boolean start_rpc = true;
      public String rpc_address;
      public String rpc_interface;
 -    public Boolean rpc_interface_prefer_ipv6 = false;
 +    public boolean rpc_interface_prefer_ipv6 = false;
      public String broadcast_rpc_address;
 -    public Integer rpc_port = 9160;
 -    public Integer rpc_listen_backlog = 50;
 +    public int rpc_port = 9160;
 +    public int rpc_listen_backlog = 50;
      public String rpc_server_type = "sync";
 -    public Boolean rpc_keepalive = true;
 -    public Integer rpc_min_threads = 16;
 -    public Integer rpc_max_threads = Integer.MAX_VALUE;
 +    public boolean rpc_keepalive = true;
 +    public int rpc_min_threads = 16;
 +    public int rpc_max_threads = Integer.MAX_VALUE;
      public Integer rpc_send_buff_size_in_bytes;
      public Integer rpc_recv_buff_size_in_bytes;
 -    public Integer internode_send_buff_size_in_bytes;
 -    public Integer internode_recv_buff_size_in_bytes;
 +    public int internode_send_buff_size_in_bytes = 0;
 +    public int internode_recv_buff_size_in_bytes = 0;
  
 -    public Boolean start_native_transport = false;
 -    public Integer native_transport_port = 9042;
 +    public boolean start_native_transport = false;
 +    public int native_transport_port = 9042;
      public Integer native_transport_port_ssl = null;
 -    public Integer native_transport_max_threads = 128;
 -    public Integer native_transport_max_frame_size_in_mb = 256;
 -    public volatile Long native_transport_max_concurrent_connections = -1L;
 -    public volatile Long native_transport_max_concurrent_connections_per_ip = -1L;
 +    public int native_transport_max_threads = 128;
 +    public int native_transport_max_frame_size_in_mb = 256;
 +    public volatile long native_transport_max_concurrent_connections = -1L;
 +    public volatile long native_transport_max_concurrent_connections_per_ip = -1L;
+     public boolean native_transport_flush_in_batches_legacy = true;
  
      @Deprecated
 -    public Integer thrift_max_message_length_in_mb = 16;
 +    public int thrift_max_message_length_in_mb = 16;
      /**
       * Max size of values in SSTables, in MegaBytes.
       * Default is the same as the native protocol frame limit: 256Mb.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/866a8943/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/866a8943/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/866a8943/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[5/9] cassandra git commit: Making immediate flusher OFF by default

Posted by be...@apache.org.
Making immediate flusher OFF by default


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ef5ce7cf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ef5ce7cf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ef5ce7cf

Branch: refs/heads/trunk
Commit: ef5ce7cf5d5bd5308f3612f8419d623a0c260abe
Parents: 9bcbb45
Author: sumanthpasupuleti <su...@gmail.com>
Authored: Tue Nov 20 08:17:01 2018 -0800
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu Dec 6 15:38:47 2018 +0000

----------------------------------------------------------------------
 conf/cassandra.yaml                              | 7 +------
 src/java/org/apache/cassandra/config/Config.java | 2 +-
 2 files changed, 2 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5ce7cf/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 2e6f363..84664fe 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1002,9 +1002,4 @@ windows_timer_interval: 1
 # time and queue contention while iterating the backlog of messages.
 # An interval of 0 disables any wait time, which is the behavior of former Cassandra versions.
 #
-# otc_backlog_expiration_interval_ms: 200
-
-# Define use of legacy delayed flusher for replies to TCP connections. This will increase latency, but might be beneficial for
-# legacy use-cases where only a single connection is used for each Cassandra node. Default is false.
-# native_transport_flush_in_batches_legacy: false
-
+# otc_backlog_expiration_interval_ms: 200
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef5ce7cf/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 6d56c74..130ff08 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -149,7 +149,7 @@ public class Config
     public Integer native_transport_max_frame_size_in_mb = 256;
     public volatile Long native_transport_max_concurrent_connections = -1L;
     public volatile Long native_transport_max_concurrent_connections_per_ip = -1L;
-    public boolean native_transport_flush_in_batches_legacy = false;
+    public boolean native_transport_flush_in_batches_legacy = true;
 
     @Deprecated
     public Integer thrift_max_message_length_in_mb = 16;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[9/9] cassandra git commit: Merge branch 'cassandra-3.11' into trunk

Posted by be...@apache.org.
Merge branch 'cassandra-3.11' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2275ab6d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2275ab6d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2275ab6d

Branch: refs/heads/trunk
Commit: 2275ab6d686e7bdf3cf12c9ef972c6f4d36ded0c
Parents: 1f19d5f 866a894
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Thu Dec 6 15:46:28 2018 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu Dec 6 15:46:28 2018 +0000

----------------------------------------------------------------------
 CHANGES.txt         | 1 +
 conf/cassandra.yaml | 4 ++--
 2 files changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2275ab6d/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2275ab6d/conf/cassandra.yaml
----------------------------------------------------------------------
diff --cc conf/cassandra.yaml
index 2d5cdd3,2cc119a..dde4296
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@@ -1207,84 -1234,4 +1207,84 @@@ back_pressure_strategy
  # time and queue contention while iterating the backlog of messages.
  # An interval of 0 disables any wait time, which is the behavior of former Cassandra versions.
  #
 -# otc_backlog_expiration_interval_ms: 200
 +# otc_backlog_expiration_interval_ms: 200
 +
 +# Track a metric per keyspace indicating whether replication achieved the ideal consistency
 +# level for writes without timing out. This is different from the consistency level requested by
 +# each write which may be lower in order to facilitate availability.
 +# ideal_consistency_level: EACH_QUORUM
 +
 +# Path to write full query log data to when the full query log is enabled
 +# The full query log will recrusively delete the contents of this path at
 +# times. Don't place links in this directory to other parts of the filesystem.
 +#full_query_log_dir: /tmp/cassandrafullquerylog
 +
 +# Automatically upgrade sstables after upgrade - if there is no ordinary compaction to do, the
 +# oldest non-upgraded sstable will get upgraded to the latest version
 +# automatic_sstable_upgrade: false
 +# Limit the number of concurrent sstable upgrades
 +# max_concurrent_automatic_sstable_upgrades: 1
 +
 +# Audit logging - Logs every incoming CQL command request, authentication to a node. See the docs
 +# on audit_logging for full details about the various configuration options.
 +audit_logging_options:
 +    enabled: false
 +    logger: BinAuditLogger
 +    # audit_logs_dir:
 +    # included_keyspaces:
 +    # excluded_keyspaces: system, system_schema, system_virtual_schema
 +    # included_categories:
 +    # excluded_categories:
 +    # included_users:
 +    # excluded_users:
 +    # roll_cycle: HOURLY
 +    # block: true
 +    # max_queue_weight: 268435456 # 256 MiB
 +    # max_log_size: 17179869184 # 16 GiB
 +    ## archive command is "/path/to/script.sh %path" where %path is replaced with the file being rolled:
 +    # archive_command:
 +    # max_archive_retries: 10
 +
 +
 +# default options for full query logging - these can be overridden from command line when executing
 +# nodetool enablefullquerylog
 +#full_query_logging_options:
 +    # log_dir:
 +    # roll_cycle: HOURLY
 +    # block: true
 +    # max_queue_weight: 268435456 # 256 MiB
 +    # max_log_size: 17179869184 # 16 GiB
 +    ## archive command is "/path/to/script.sh %path" where %path is replaced with the file being rolled:
 +    # archive_command:
 +    # max_archive_retries: 10
 +
 +# validate tombstones on reads and compaction
 +# can be either "disabled", "warn" or "exception"
 +# corrupted_tombstone_strategy: disabled
 +
 +# Diagnostic Events #
 +# If enabled, diagnostic events can be helpful for troubleshooting operational issues. Emitted events contain details
 +# on internal state and temporal relationships across events, accessible by clients via JMX.
 +diagnostic_events_enabled: false
 +
- # Define use of legacy delayed flusher for replies to TCP connections. This will increase latency, but might be beneficial for
- # legacy use-cases where only a single connection is used for each Cassandra node. Default is false.
++# Use native transport TCP message coalescing. If on upgrade to 4.0 you found your throughput decreasing, and in
++# particular you run an old kernel or have very fewer client connections, this option might be worth evaluating.
 +#native_transport_flush_in_batches_legacy: false
 +
 +# Enable tracking of repaired state of data during reads and comparison between replicas
 +# Mismatches between the repaired sets of replicas can be characterized as either confirmed
 +# or unconfirmed. In this context, unconfirmed indicates that the presence of pending repair
 +# sessions, unrepaired partition tombstones, or some other condition means that the disparity
 +# cannot be considered conclusive. Confirmed mismatches should be a trigger for investigation
 +# as they may be indicative of corruption or data loss.
 +# There are separate flags for range vs partition reads as single partition reads are only tracked
 +# when CL > 1 and a digest mismatch occurs. Currently, range queries don't use digests so if
 +# enabled for range reads, all range reads will include repaired data tracking. As this adds
 +# some overhead, operators may wish to disable it whilst still enabling it for partition reads
 +repaired_data_tracking_for_range_reads_enabled: false
 +repaired_data_tracking_for_partition_reads_enabled: false
 +# If false, only confirmed mismatches will be reported. If true, a separate metric for unconfirmed
 +# mismatches will also be recorded. This is to avoid potential signal:noise issues are unconfirmed
 +# mismatches are less actionable than confirmed ones.
 +report_unconfirmed_repaired_data_mismatches: false
 +


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[3/9] cassandra git commit: Backporting ImmediateFlusher from trunk

Posted by be...@apache.org.
Backporting ImmediateFlusher from trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9bcbb457
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9bcbb457
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9bcbb457

Branch: refs/heads/cassandra-3.11
Commit: 9bcbb457fce9f0007597145f1a150a42b7935ef0
Parents: 0c97908
Author: Michael Burman <ya...@iki.fi>
Authored: Tue May 8 15:40:54 2018 +0300
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu Dec 6 15:38:47 2018 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra.yaml                             |  5 ++
 .../org/apache/cassandra/config/Config.java     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    |  6 +-
 .../org/apache/cassandra/transport/Message.java | 76 ++++++++++++++++----
 .../org/apache/cassandra/transport/Server.java  |  2 +-
 6 files changed, 76 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9bcbb457/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d3d7158..e349674 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,7 @@
  * Fix corrupted static collection deletions in 3.0 <-> 2.{1,2} messages (CASSANDRA-14568)
  * Handle failures in parallelAllSSTableOperation (cleanup/upgradesstables/etc) (CASSANDRA-14657)
  * Improve TokenMetaData cache populating performance avoid long locking (CASSANDRA-14660)
+ * Backport: Flush netty client messages immediately (not by default) (CASSANDRA-13651)
  * Fix static column order for SELECT * wildcard queries (CASSANDRA-14638)
  * sstableloader should use discovered broadcast address to connect intra-cluster (CASSANDRA-14522)
  * Fix reading columns with non-UTF names from schema (CASSANDRA-14468)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9bcbb457/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index ef7b034..2e6f363 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1003,3 +1003,8 @@ windows_timer_interval: 1
 # An interval of 0 disables any wait time, which is the behavior of former Cassandra versions.
 #
 # otc_backlog_expiration_interval_ms: 200
+
+# Define use of legacy delayed flusher for replies to TCP connections. This will increase latency, but might be beneficial for
+# legacy use-cases where only a single connection is used for each Cassandra node. Default is false.
+# native_transport_flush_in_batches_legacy: false
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9bcbb457/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 64d41bb..6d56c74 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -149,6 +149,7 @@ public class Config
     public Integer native_transport_max_frame_size_in_mb = 256;
     public volatile Long native_transport_max_concurrent_connections = -1L;
     public volatile Long native_transport_max_concurrent_connections_per_ip = -1L;
+    public boolean native_transport_flush_in_batches_legacy = false;
 
     @Deprecated
     public Integer thrift_max_message_length_in_mb = 16;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9bcbb457/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index efc71ef..0fd785a 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1499,6 +1499,11 @@ public class DatabaseDescriptor
         conf.native_transport_max_concurrent_connections_per_ip = native_transport_max_concurrent_connections_per_ip;
     }
 
+    public static boolean useNativeTransportLegacyFlusher()
+    {
+        return conf.native_transport_flush_in_batches_legacy;
+    }
+
     public static double getCommitLogSyncBatchWindow()
     {
         return conf.commitlog_sync_batch_window_in_ms;
@@ -2077,5 +2082,4 @@ public class DatabaseDescriptor
     {
         return conf.gc_warn_threshold_in_ms;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9bcbb457/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index 91ece5c..0851b19 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -420,26 +420,38 @@ public abstract class Message
             }
         }
 
-        private static final class Flusher implements Runnable
+        private static abstract class Flusher implements Runnable
         {
             final EventLoop eventLoop;
             final ConcurrentLinkedQueue<FlushItem> queued = new ConcurrentLinkedQueue<>();
-            final AtomicBoolean running = new AtomicBoolean(false);
+            final AtomicBoolean scheduled = new AtomicBoolean(false);
             final HashSet<ChannelHandlerContext> channels = new HashSet<>();
             final List<FlushItem> flushed = new ArrayList<>();
-            int runsSinceFlush = 0;
-            int runsWithNoWork = 0;
-            private Flusher(EventLoop eventLoop)
-            {
-                this.eventLoop = eventLoop;
-            }
+
             void start()
             {
-                if (!running.get() && running.compareAndSet(false, true))
+                if (!scheduled.get() && scheduled.compareAndSet(false, true))
                 {
                     this.eventLoop.execute(this);
                 }
             }
+
+            public Flusher(EventLoop eventLoop)
+            {
+                this.eventLoop = eventLoop;
+            }
+        }
+
+        private static final class LegacyFlusher extends Flusher
+        {
+            int runsSinceFlush = 0;
+            int runsWithNoWork = 0;
+
+            private LegacyFlusher(EventLoop eventLoop)
+            {
+                super(eventLoop);
+            }
+
             public void run()
             {
 
@@ -476,8 +488,8 @@ public abstract class Message
                     // either reschedule or cancel
                     if (++runsWithNoWork > 5)
                     {
-                        running.set(false);
-                        if (queued.isEmpty() || !running.compareAndSet(false, true))
+                        scheduled.set(false);
+                        if (queued.isEmpty() || !scheduled.compareAndSet(false, true))
                             return;
                     }
                 }
@@ -486,11 +498,48 @@ public abstract class Message
             }
         }
 
+        private static final class ImmediateFlusher extends Flusher
+        {
+            private ImmediateFlusher(EventLoop eventLoop)
+            {
+                super(eventLoop);
+            }
+
+            public void run()
+            {
+                boolean doneWork = false;
+                FlushItem flush;
+                scheduled.set(false);
+
+                while (null != (flush = queued.poll()))
+                {
+                    channels.add(flush.ctx);
+                    flush.ctx.write(flush.response, flush.ctx.voidPromise());
+                    flushed.add(flush);
+                    doneWork = true;
+                }
+
+                if (doneWork)
+                {
+                    for (ChannelHandlerContext channel : channels)
+                        channel.flush();
+                    for (FlushItem item : flushed)
+                        item.sourceFrame.release();
+
+                    channels.clear();
+                    flushed.clear();
+                }
+            }
+        }
+
         private static final ConcurrentMap<EventLoop, Flusher> flusherLookup = new ConcurrentHashMap<>();
 
-        public Dispatcher()
+        private final boolean useLegacyFlusher;
+
+        public Dispatcher(boolean useLegacyFlusher)
         {
             super(false);
+            this.useLegacyFlusher = useLegacyFlusher;
         }
 
         @Override
@@ -538,7 +587,8 @@ public abstract class Message
             Flusher flusher = flusherLookup.get(loop);
             if (flusher == null)
             {
-                Flusher alt = flusherLookup.putIfAbsent(loop, flusher = new Flusher(loop));
+                Flusher created = useLegacyFlusher ? new LegacyFlusher(loop) : new ImmediateFlusher(loop);
+                Flusher alt = flusherLookup.putIfAbsent(loop, flusher = created);
                 if (alt != null)
                     flusher = alt;
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9bcbb457/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 7df194d..8c781db 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -295,7 +295,7 @@ public class Server implements CassandraDaemon.Server
         private static final Frame.Compressor frameCompressor = new Frame.Compressor();
         private static final Frame.Encoder frameEncoder = new Frame.Encoder();
         private static final Message.ExceptionHandler exceptionHandler = new Message.ExceptionHandler();
-        private static final Message.Dispatcher dispatcher = new Message.Dispatcher();
+        private static final Message.Dispatcher dispatcher = new Message.Dispatcher(DatabaseDescriptor.useNativeTransportLegacyFlusher());
         private static final ConnectionLimitHandler connectionLimitHandler = new ConnectionLimitHandler();
 
         private final Server server;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[4/9] cassandra git commit: Backporting ImmediateFlusher from trunk

Posted by be...@apache.org.
Backporting ImmediateFlusher from trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9bcbb457
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9bcbb457
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9bcbb457

Branch: refs/heads/trunk
Commit: 9bcbb457fce9f0007597145f1a150a42b7935ef0
Parents: 0c97908
Author: Michael Burman <ya...@iki.fi>
Authored: Tue May 8 15:40:54 2018 +0300
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu Dec 6 15:38:47 2018 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra.yaml                             |  5 ++
 .../org/apache/cassandra/config/Config.java     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    |  6 +-
 .../org/apache/cassandra/transport/Message.java | 76 ++++++++++++++++----
 .../org/apache/cassandra/transport/Server.java  |  2 +-
 6 files changed, 76 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9bcbb457/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d3d7158..e349674 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,7 @@
  * Fix corrupted static collection deletions in 3.0 <-> 2.{1,2} messages (CASSANDRA-14568)
  * Handle failures in parallelAllSSTableOperation (cleanup/upgradesstables/etc) (CASSANDRA-14657)
  * Improve TokenMetaData cache populating performance avoid long locking (CASSANDRA-14660)
+ * Backport: Flush netty client messages immediately (not by default) (CASSANDRA-13651)
  * Fix static column order for SELECT * wildcard queries (CASSANDRA-14638)
  * sstableloader should use discovered broadcast address to connect intra-cluster (CASSANDRA-14522)
  * Fix reading columns with non-UTF names from schema (CASSANDRA-14468)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9bcbb457/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index ef7b034..2e6f363 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1003,3 +1003,8 @@ windows_timer_interval: 1
 # An interval of 0 disables any wait time, which is the behavior of former Cassandra versions.
 #
 # otc_backlog_expiration_interval_ms: 200
+
+# Define use of legacy delayed flusher for replies to TCP connections. This will increase latency, but might be beneficial for
+# legacy use-cases where only a single connection is used for each Cassandra node. Default is false.
+# native_transport_flush_in_batches_legacy: false
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9bcbb457/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 64d41bb..6d56c74 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -149,6 +149,7 @@ public class Config
     public Integer native_transport_max_frame_size_in_mb = 256;
     public volatile Long native_transport_max_concurrent_connections = -1L;
     public volatile Long native_transport_max_concurrent_connections_per_ip = -1L;
+    public boolean native_transport_flush_in_batches_legacy = false;
 
     @Deprecated
     public Integer thrift_max_message_length_in_mb = 16;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9bcbb457/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index efc71ef..0fd785a 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1499,6 +1499,11 @@ public class DatabaseDescriptor
         conf.native_transport_max_concurrent_connections_per_ip = native_transport_max_concurrent_connections_per_ip;
     }
 
+    public static boolean useNativeTransportLegacyFlusher()
+    {
+        return conf.native_transport_flush_in_batches_legacy;
+    }
+
     public static double getCommitLogSyncBatchWindow()
     {
         return conf.commitlog_sync_batch_window_in_ms;
@@ -2077,5 +2082,4 @@ public class DatabaseDescriptor
     {
         return conf.gc_warn_threshold_in_ms;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9bcbb457/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index 91ece5c..0851b19 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -420,26 +420,38 @@ public abstract class Message
             }
         }
 
-        private static final class Flusher implements Runnable
+        private static abstract class Flusher implements Runnable
         {
             final EventLoop eventLoop;
             final ConcurrentLinkedQueue<FlushItem> queued = new ConcurrentLinkedQueue<>();
-            final AtomicBoolean running = new AtomicBoolean(false);
+            final AtomicBoolean scheduled = new AtomicBoolean(false);
             final HashSet<ChannelHandlerContext> channels = new HashSet<>();
             final List<FlushItem> flushed = new ArrayList<>();
-            int runsSinceFlush = 0;
-            int runsWithNoWork = 0;
-            private Flusher(EventLoop eventLoop)
-            {
-                this.eventLoop = eventLoop;
-            }
+
             void start()
             {
-                if (!running.get() && running.compareAndSet(false, true))
+                if (!scheduled.get() && scheduled.compareAndSet(false, true))
                 {
                     this.eventLoop.execute(this);
                 }
             }
+
+            public Flusher(EventLoop eventLoop)
+            {
+                this.eventLoop = eventLoop;
+            }
+        }
+
+        private static final class LegacyFlusher extends Flusher
+        {
+            int runsSinceFlush = 0;
+            int runsWithNoWork = 0;
+
+            private LegacyFlusher(EventLoop eventLoop)
+            {
+                super(eventLoop);
+            }
+
             public void run()
             {
 
@@ -476,8 +488,8 @@ public abstract class Message
                     // either reschedule or cancel
                     if (++runsWithNoWork > 5)
                     {
-                        running.set(false);
-                        if (queued.isEmpty() || !running.compareAndSet(false, true))
+                        scheduled.set(false);
+                        if (queued.isEmpty() || !scheduled.compareAndSet(false, true))
                             return;
                     }
                 }
@@ -486,11 +498,48 @@ public abstract class Message
             }
         }
 
+        private static final class ImmediateFlusher extends Flusher
+        {
+            private ImmediateFlusher(EventLoop eventLoop)
+            {
+                super(eventLoop);
+            }
+
+            public void run()
+            {
+                boolean doneWork = false;
+                FlushItem flush;
+                scheduled.set(false);
+
+                while (null != (flush = queued.poll()))
+                {
+                    channels.add(flush.ctx);
+                    flush.ctx.write(flush.response, flush.ctx.voidPromise());
+                    flushed.add(flush);
+                    doneWork = true;
+                }
+
+                if (doneWork)
+                {
+                    for (ChannelHandlerContext channel : channels)
+                        channel.flush();
+                    for (FlushItem item : flushed)
+                        item.sourceFrame.release();
+
+                    channels.clear();
+                    flushed.clear();
+                }
+            }
+        }
+
         private static final ConcurrentMap<EventLoop, Flusher> flusherLookup = new ConcurrentHashMap<>();
 
-        public Dispatcher()
+        private final boolean useLegacyFlusher;
+
+        public Dispatcher(boolean useLegacyFlusher)
         {
             super(false);
+            this.useLegacyFlusher = useLegacyFlusher;
         }
 
         @Override
@@ -538,7 +587,8 @@ public abstract class Message
             Flusher flusher = flusherLookup.get(loop);
             if (flusher == null)
             {
-                Flusher alt = flusherLookup.putIfAbsent(loop, flusher = new Flusher(loop));
+                Flusher created = useLegacyFlusher ? new LegacyFlusher(loop) : new ImmediateFlusher(loop);
+                Flusher alt = flusherLookup.putIfAbsent(loop, flusher = created);
                 if (alt != null)
                     flusher = alt;
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9bcbb457/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 7df194d..8c781db 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -295,7 +295,7 @@ public class Server implements CassandraDaemon.Server
         private static final Frame.Compressor frameCompressor = new Frame.Compressor();
         private static final Frame.Encoder frameEncoder = new Frame.Encoder();
         private static final Message.ExceptionHandler exceptionHandler = new Message.ExceptionHandler();
-        private static final Message.Dispatcher dispatcher = new Message.Dispatcher();
+        private static final Message.Dispatcher dispatcher = new Message.Dispatcher(DatabaseDescriptor.useNativeTransportLegacyFlusher());
         private static final ConnectionLimitHandler connectionLimitHandler = new ConnectionLimitHandler();
 
         private final Server server;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[8/9] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by be...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/866a8943
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/866a8943
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/866a8943

Branch: refs/heads/trunk
Commit: 866a89435f9d6453b611a9fdbc84a50377d19c2a
Parents: 85e402a ef5ce7c
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Thu Dec 6 15:41:11 2018 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu Dec 6 15:41:11 2018 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra.yaml                             |  2 +-
 .../org/apache/cassandra/config/Config.java     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    |  5 ++
 .../org/apache/cassandra/transport/Message.java | 76 ++++++++++++++++----
 .../org/apache/cassandra/transport/Server.java  |  2 +-
 6 files changed, 72 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/866a8943/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/866a8943/conf/cassandra.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/866a8943/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/Config.java
index a01203c,130ff08..603d851
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -133,32 -126,33 +133,33 @@@ public class Confi
  
      /* intentionally left set to true, despite being set to false in stock 2.2 cassandra.yaml
         we don't want to surprise Thrift users who have the setting blank in the yaml during 2.1->2.2 upgrade */
 -    public Boolean start_rpc = true;
 +    public boolean start_rpc = true;
      public String rpc_address;
      public String rpc_interface;
 -    public Boolean rpc_interface_prefer_ipv6 = false;
 +    public boolean rpc_interface_prefer_ipv6 = false;
      public String broadcast_rpc_address;
 -    public Integer rpc_port = 9160;
 -    public Integer rpc_listen_backlog = 50;
 +    public int rpc_port = 9160;
 +    public int rpc_listen_backlog = 50;
      public String rpc_server_type = "sync";
 -    public Boolean rpc_keepalive = true;
 -    public Integer rpc_min_threads = 16;
 -    public Integer rpc_max_threads = Integer.MAX_VALUE;
 +    public boolean rpc_keepalive = true;
 +    public int rpc_min_threads = 16;
 +    public int rpc_max_threads = Integer.MAX_VALUE;
      public Integer rpc_send_buff_size_in_bytes;
      public Integer rpc_recv_buff_size_in_bytes;
 -    public Integer internode_send_buff_size_in_bytes;
 -    public Integer internode_recv_buff_size_in_bytes;
 +    public int internode_send_buff_size_in_bytes = 0;
 +    public int internode_recv_buff_size_in_bytes = 0;
  
 -    public Boolean start_native_transport = false;
 -    public Integer native_transport_port = 9042;
 +    public boolean start_native_transport = false;
 +    public int native_transport_port = 9042;
      public Integer native_transport_port_ssl = null;
 -    public Integer native_transport_max_threads = 128;
 -    public Integer native_transport_max_frame_size_in_mb = 256;
 -    public volatile Long native_transport_max_concurrent_connections = -1L;
 -    public volatile Long native_transport_max_concurrent_connections_per_ip = -1L;
 +    public int native_transport_max_threads = 128;
 +    public int native_transport_max_frame_size_in_mb = 256;
 +    public volatile long native_transport_max_concurrent_connections = -1L;
 +    public volatile long native_transport_max_concurrent_connections_per_ip = -1L;
+     public boolean native_transport_flush_in_batches_legacy = true;
  
      @Deprecated
 -    public Integer thrift_max_message_length_in_mb = 16;
 +    public int thrift_max_message_length_in_mb = 16;
      /**
       * Max size of values in SSTables, in MegaBytes.
       * Default is the same as the native protocol frame limit: 256Mb.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/866a8943/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/866a8943/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/866a8943/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org