You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2018/12/06 15:46:51 UTC
[4/9] cassandra git commit: Backporting ImmediateFlusher from trunk
Backporting ImmediateFlusher from trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9bcbb457
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9bcbb457
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9bcbb457
Branch: refs/heads/trunk
Commit: 9bcbb457fce9f0007597145f1a150a42b7935ef0
Parents: 0c97908
Author: Michael Burman <ya...@iki.fi>
Authored: Tue May 8 15:40:54 2018 +0300
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu Dec 6 15:38:47 2018 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 5 ++
.../org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 6 +-
.../org/apache/cassandra/transport/Message.java | 76 ++++++++++++++++----
.../org/apache/cassandra/transport/Server.java | 2 +-
6 files changed, 76 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9bcbb457/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d3d7158..e349674 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,7 @@
* Fix corrupted static collection deletions in 3.0 <-> 2.{1,2} messages (CASSANDRA-14568)
* Handle failures in parallelAllSSTableOperation (cleanup/upgradesstables/etc) (CASSANDRA-14657)
* Improve TokenMetaData cache populating performance avoid long locking (CASSANDRA-14660)
+ * Backport: Flush netty client messages immediately (not by default) (CASSANDRA-13651)
* Fix static column order for SELECT * wildcard queries (CASSANDRA-14638)
* sstableloader should use discovered broadcast address to connect intra-cluster (CASSANDRA-14522)
* Fix reading columns with non-UTF names from schema (CASSANDRA-14468)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9bcbb457/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index ef7b034..2e6f363 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1003,3 +1003,8 @@ windows_timer_interval: 1
# An interval of 0 disables any wait time, which is the behavior of former Cassandra versions.
#
# otc_backlog_expiration_interval_ms: 200
+
+# Define use of legacy delayed flusher for replies to TCP connections. This will increase latency, but might be beneficial for
+# legacy use-cases where only a single connection is used for each Cassandra node. Default is false.
+# native_transport_flush_in_batches_legacy: false
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9bcbb457/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 64d41bb..6d56c74 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -149,6 +149,7 @@ public class Config
public Integer native_transport_max_frame_size_in_mb = 256;
public volatile Long native_transport_max_concurrent_connections = -1L;
public volatile Long native_transport_max_concurrent_connections_per_ip = -1L;
+ public boolean native_transport_flush_in_batches_legacy = false;
@Deprecated
public Integer thrift_max_message_length_in_mb = 16;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9bcbb457/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index efc71ef..0fd785a 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1499,6 +1499,11 @@ public class DatabaseDescriptor
conf.native_transport_max_concurrent_connections_per_ip = native_transport_max_concurrent_connections_per_ip;
}
+ public static boolean useNativeTransportLegacyFlusher()
+ {
+ return conf.native_transport_flush_in_batches_legacy;
+ }
+
public static double getCommitLogSyncBatchWindow()
{
return conf.commitlog_sync_batch_window_in_ms;
@@ -2077,5 +2082,4 @@ public class DatabaseDescriptor
{
return conf.gc_warn_threshold_in_ms;
}
-
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9bcbb457/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index 91ece5c..0851b19 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -420,26 +420,38 @@ public abstract class Message
}
}
- private static final class Flusher implements Runnable
+ private static abstract class Flusher implements Runnable
{
final EventLoop eventLoop;
final ConcurrentLinkedQueue<FlushItem> queued = new ConcurrentLinkedQueue<>();
- final AtomicBoolean running = new AtomicBoolean(false);
+ final AtomicBoolean scheduled = new AtomicBoolean(false);
final HashSet<ChannelHandlerContext> channels = new HashSet<>();
final List<FlushItem> flushed = new ArrayList<>();
- int runsSinceFlush = 0;
- int runsWithNoWork = 0;
- private Flusher(EventLoop eventLoop)
- {
- this.eventLoop = eventLoop;
- }
+
void start()
{
- if (!running.get() && running.compareAndSet(false, true))
+ if (!scheduled.get() && scheduled.compareAndSet(false, true))
{
this.eventLoop.execute(this);
}
}
+
+ public Flusher(EventLoop eventLoop)
+ {
+ this.eventLoop = eventLoop;
+ }
+ }
+
+ private static final class LegacyFlusher extends Flusher
+ {
+ int runsSinceFlush = 0;
+ int runsWithNoWork = 0;
+
+ private LegacyFlusher(EventLoop eventLoop)
+ {
+ super(eventLoop);
+ }
+
public void run()
{
@@ -476,8 +488,8 @@ public abstract class Message
// either reschedule or cancel
if (++runsWithNoWork > 5)
{
- running.set(false);
- if (queued.isEmpty() || !running.compareAndSet(false, true))
+ scheduled.set(false);
+ if (queued.isEmpty() || !scheduled.compareAndSet(false, true))
return;
}
}
@@ -486,11 +498,48 @@ public abstract class Message
}
}
+ private static final class ImmediateFlusher extends Flusher
+ {
+ private ImmediateFlusher(EventLoop eventLoop)
+ {
+ super(eventLoop);
+ }
+
+ public void run()
+ {
+ boolean doneWork = false;
+ FlushItem flush;
+ scheduled.set(false);
+
+ while (null != (flush = queued.poll()))
+ {
+ channels.add(flush.ctx);
+ flush.ctx.write(flush.response, flush.ctx.voidPromise());
+ flushed.add(flush);
+ doneWork = true;
+ }
+
+ if (doneWork)
+ {
+ for (ChannelHandlerContext channel : channels)
+ channel.flush();
+ for (FlushItem item : flushed)
+ item.sourceFrame.release();
+
+ channels.clear();
+ flushed.clear();
+ }
+ }
+ }
+
private static final ConcurrentMap<EventLoop, Flusher> flusherLookup = new ConcurrentHashMap<>();
- public Dispatcher()
+ private final boolean useLegacyFlusher;
+
+ public Dispatcher(boolean useLegacyFlusher)
{
super(false);
+ this.useLegacyFlusher = useLegacyFlusher;
}
@Override
@@ -538,7 +587,8 @@ public abstract class Message
Flusher flusher = flusherLookup.get(loop);
if (flusher == null)
{
- Flusher alt = flusherLookup.putIfAbsent(loop, flusher = new Flusher(loop));
+ Flusher created = useLegacyFlusher ? new LegacyFlusher(loop) : new ImmediateFlusher(loop);
+ Flusher alt = flusherLookup.putIfAbsent(loop, flusher = created);
if (alt != null)
flusher = alt;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9bcbb457/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 7df194d..8c781db 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -295,7 +295,7 @@ public class Server implements CassandraDaemon.Server
private static final Frame.Compressor frameCompressor = new Frame.Compressor();
private static final Frame.Encoder frameEncoder = new Frame.Encoder();
private static final Message.ExceptionHandler exceptionHandler = new Message.ExceptionHandler();
- private static final Message.Dispatcher dispatcher = new Message.Dispatcher();
+ private static final Message.Dispatcher dispatcher = new Message.Dispatcher(DatabaseDescriptor.useNativeTransportLegacyFlusher());
private static final ConnectionLimitHandler connectionLimitHandler = new ConnectionLimitHandler();
private final Server server;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org