You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2018/12/06 15:55:42 UTC

[3/9] cassandra git commit: Backport ImmediateFlusher to cassandra-3.0 and cassandra-3.11

Backport ImmediateFlusher to cassandra-3.0 and cassandra-3.11

patch by Michael Burman and Sumanth Pasupuleti; reviewed by Benedict for CASSANDRA-14855


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fff6eec2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fff6eec2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fff6eec2

Branch: refs/heads/trunk
Commit: fff6eec2903ee85f648535dd051c9bc72631f524
Parents: 0c97908
Author: Michael Burman <ya...@iki.fi>
Authored: Tue May 8 15:40:54 2018 +0300
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu Dec 6 15:48:50 2018 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra.yaml                             |  5 ++
 .../org/apache/cassandra/config/Config.java     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    |  6 +-
 .../org/apache/cassandra/transport/Message.java | 76 ++++++++++++++++----
 .../org/apache/cassandra/transport/Server.java  |  2 +-
 6 files changed, 76 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fff6eec2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d3d7158..e349674 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,7 @@
  * Fix corrupted static collection deletions in 3.0 <-> 2.{1,2} messages (CASSANDRA-14568)
  * Handle failures in parallelAllSSTableOperation (cleanup/upgradesstables/etc) (CASSANDRA-14657)
  * Improve TokenMetaData cache populating performance avoid long locking (CASSANDRA-14660)
+ * Backport: Flush netty client messages immediately (not by default) (CASSANDRA-13651)
  * Fix static column order for SELECT * wildcard queries (CASSANDRA-14638)
  * sstableloader should use discovered broadcast address to connect intra-cluster (CASSANDRA-14522)
  * Fix reading columns with non-UTF names from schema (CASSANDRA-14468)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fff6eec2/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index ef7b034..2e6f363 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1003,3 +1003,8 @@ windows_timer_interval: 1
 # An interval of 0 disables any wait time, which is the behavior of former Cassandra versions.
 #
 # otc_backlog_expiration_interval_ms: 200
+
+# Define use of legacy delayed flusher for replies to TCP connections. This will increase latency, but might be beneficial for
+# legacy use-cases where only a single connection is used for each Cassandra node. Default is false.
+# native_transport_flush_in_batches_legacy: false
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fff6eec2/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 64d41bb..6d56c74 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -149,6 +149,7 @@ public class Config
     public Integer native_transport_max_frame_size_in_mb = 256;
     public volatile Long native_transport_max_concurrent_connections = -1L;
     public volatile Long native_transport_max_concurrent_connections_per_ip = -1L;
+    public boolean native_transport_flush_in_batches_legacy = false;
 
     @Deprecated
     public Integer thrift_max_message_length_in_mb = 16;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fff6eec2/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index efc71ef..0fd785a 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1499,6 +1499,11 @@ public class DatabaseDescriptor
         conf.native_transport_max_concurrent_connections_per_ip = native_transport_max_concurrent_connections_per_ip;
     }
 
+    public static boolean useNativeTransportLegacyFlusher()
+    {
+        return conf.native_transport_flush_in_batches_legacy;
+    }
+
     public static double getCommitLogSyncBatchWindow()
     {
         return conf.commitlog_sync_batch_window_in_ms;
@@ -2077,5 +2082,4 @@ public class DatabaseDescriptor
     {
         return conf.gc_warn_threshold_in_ms;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fff6eec2/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index 91ece5c..0851b19 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -420,26 +420,38 @@ public abstract class Message
             }
         }
 
-        private static final class Flusher implements Runnable
+        private static abstract class Flusher implements Runnable
         {
             final EventLoop eventLoop;
             final ConcurrentLinkedQueue<FlushItem> queued = new ConcurrentLinkedQueue<>();
-            final AtomicBoolean running = new AtomicBoolean(false);
+            final AtomicBoolean scheduled = new AtomicBoolean(false);
             final HashSet<ChannelHandlerContext> channels = new HashSet<>();
             final List<FlushItem> flushed = new ArrayList<>();
-            int runsSinceFlush = 0;
-            int runsWithNoWork = 0;
-            private Flusher(EventLoop eventLoop)
-            {
-                this.eventLoop = eventLoop;
-            }
+
             void start()
             {
-                if (!running.get() && running.compareAndSet(false, true))
+                if (!scheduled.get() && scheduled.compareAndSet(false, true))
                 {
                     this.eventLoop.execute(this);
                 }
             }
+
+            public Flusher(EventLoop eventLoop)
+            {
+                this.eventLoop = eventLoop;
+            }
+        }
+
+        private static final class LegacyFlusher extends Flusher
+        {
+            int runsSinceFlush = 0;
+            int runsWithNoWork = 0;
+
+            private LegacyFlusher(EventLoop eventLoop)
+            {
+                super(eventLoop);
+            }
+
             public void run()
             {
 
@@ -476,8 +488,8 @@ public abstract class Message
                     // either reschedule or cancel
                     if (++runsWithNoWork > 5)
                     {
-                        running.set(false);
-                        if (queued.isEmpty() || !running.compareAndSet(false, true))
+                        scheduled.set(false);
+                        if (queued.isEmpty() || !scheduled.compareAndSet(false, true))
                             return;
                     }
                 }
@@ -486,11 +498,48 @@ public abstract class Message
             }
         }
 
+        private static final class ImmediateFlusher extends Flusher
+        {
+            private ImmediateFlusher(EventLoop eventLoop)
+            {
+                super(eventLoop);
+            }
+
+            public void run()
+            {
+                boolean doneWork = false;
+                FlushItem flush;
+                scheduled.set(false);
+
+                while (null != (flush = queued.poll()))
+                {
+                    channels.add(flush.ctx);
+                    flush.ctx.write(flush.response, flush.ctx.voidPromise());
+                    flushed.add(flush);
+                    doneWork = true;
+                }
+
+                if (doneWork)
+                {
+                    for (ChannelHandlerContext channel : channels)
+                        channel.flush();
+                    for (FlushItem item : flushed)
+                        item.sourceFrame.release();
+
+                    channels.clear();
+                    flushed.clear();
+                }
+            }
+        }
+
         private static final ConcurrentMap<EventLoop, Flusher> flusherLookup = new ConcurrentHashMap<>();
 
-        public Dispatcher()
+        private final boolean useLegacyFlusher;
+
+        public Dispatcher(boolean useLegacyFlusher)
         {
             super(false);
+            this.useLegacyFlusher = useLegacyFlusher;
         }
 
         @Override
@@ -538,7 +587,8 @@ public abstract class Message
             Flusher flusher = flusherLookup.get(loop);
             if (flusher == null)
             {
-                Flusher alt = flusherLookup.putIfAbsent(loop, flusher = new Flusher(loop));
+                Flusher created = useLegacyFlusher ? new LegacyFlusher(loop) : new ImmediateFlusher(loop);
+                Flusher alt = flusherLookup.putIfAbsent(loop, flusher = created);
                 if (alt != null)
                     flusher = alt;
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fff6eec2/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 7df194d..8c781db 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -295,7 +295,7 @@ public class Server implements CassandraDaemon.Server
         private static final Frame.Compressor frameCompressor = new Frame.Compressor();
         private static final Frame.Encoder frameEncoder = new Frame.Encoder();
         private static final Message.ExceptionHandler exceptionHandler = new Message.ExceptionHandler();
-        private static final Message.Dispatcher dispatcher = new Message.Dispatcher();
+        private static final Message.Dispatcher dispatcher = new Message.Dispatcher(DatabaseDescriptor.useNativeTransportLegacyFlusher());
         private static final ConnectionLimitHandler connectionLimitHandler = new ConnectionLimitHandler();
 
         private final Server server;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org