You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by eo...@apache.org on 2019/05/22 09:39:19 UTC
[zookeeper] branch master updated: ZOOKEEPER-3311: Allow a delay to
the transaction log flush
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new cc431f7 ZOOKEEPER-3311: Allow a delay to the transaction log flush
cc431f7 is described below
commit cc431f70020b9a2028edcc61e41cff9ee85b078f
Author: Brian Nixon <ni...@fb.com>
AuthorDate: Wed May 22 11:39:06 2019 +0200
ZOOKEEPER-3311: Allow a delay to the transaction log flush
Author: Brian Nixon <ni...@fb.com>
Reviewers: fangmin@apache.org, eolivelli@apache.org
Closes #851 from enixon/flush-delay
---
.../src/main/resources/markdown/zookeeperAdmin.md | 20 +++++++++++
.../zookeeper/server/SyncRequestProcessor.java | 40 ++++++++++++++++++----
.../apache/zookeeper/server/ZooKeeperServer.java | 39 +++++++++++++++++++++
.../zookeeper/server/ZooKeeperServerBean.java | 36 +++++++++++++++++++
.../zookeeper/server/ZooKeeperServerMXBean.java | 9 +++++
5 files changed, 138 insertions(+), 6 deletions(-)
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index 454aa06..ceb0e20 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -837,6 +837,26 @@ property, when available, is noted below.
This should be set to `NettyServerCnxnFactory` in order to use TLS based server communication.
Default is `NIOServerCnxnFactory`.
+* *flushDelay* :
+ (Java system property: **zookeeper.flushDelay**)
+ Time in milliseconds to delay the flush of the commit log.
+ Does not effect the limit defined by *maxBatchSize*.
+ Disabled by default (with value 0). Ensembles with high write rates
+ may see throughput improved with a value of 10-20 ms.
+
+* *maxWriteQueuePollTime* :
+ (Java system property: **zookeeper.maxWriteQueuePollTime**)
+ If *flushDelay* is enabled, this determines the amount of time in milliseconds
+ to wait before flushing when no new requests are being queued.
+ Set to *flushDelay*/3 by default (implicitly disabled by default).
+
+* *maxBatchSize* :
+ (Java system property: **zookeeper.maxBatchSize**)
+ The number of transactions allowed in the server before a flush of the
+ commit log is triggered.
+ Does not effect the limit defined by *flushDelay*.
+ Default is 1000.
+
<a name="sc_clusterOptions"></a>
#### Cluster Options
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
index 044c456..d30fd2b 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
@@ -27,7 +27,9 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.apache.zookeeper.common.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,8 +55,6 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
private static final Logger LOG = LoggerFactory.getLogger(SyncRequestProcessor.class);
- private static final int FLUSH_SIZE = 1000;
-
private static final Request REQUEST_OF_DEATH = Request.requestOfDeath;
/** The number of log entries to log before starting a snapshot */
@@ -85,7 +85,8 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
* disk. Basically this is the list of SyncItems whose callbacks will be
* invoked after flush returns successfully.
*/
- private final Queue<Request> toFlush = new ArrayDeque<>(FLUSH_SIZE);
+ private final Queue<Request> toFlush;
+ private long lastFlushTime;
public SyncRequestProcessor(ZooKeeperServer zks,
RequestProcessor nextProcessor) {
@@ -93,6 +94,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
.getZooKeeperServerListener());
this.zks = zks;
this.nextProcessor = nextProcessor;
+ this.toFlush = new ArrayDeque<>(zks.getMaxBatchSize());
}
/**
@@ -112,6 +114,28 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
return snapCount;
}
+ private long getRemainingDelay() {
+ long flushDelay = zks.getFlushDelay();
+ long duration = Time.currentElapsedTime() - lastFlushTime;
+ if (duration < flushDelay) {
+ return flushDelay - duration;
+ }
+ return 0;
+ }
+
+ /** If both flushDelay and maxMaxBatchSize are set (> 0), flush
+ * whenever either condition is hit. If only one or the other is
+ * set, flush only when the relevant condition is hit.
+ */
+ private boolean shouldFlush() {
+ long flushDelay = zks.getFlushDelay();
+ long maxBatchSize = zks.getMaxBatchSize();
+ if ((flushDelay > 0) && (getRemainingDelay() == 0)) {
+ return true;
+ }
+ return (maxBatchSize > 0) && (toFlush.size() >= maxBatchSize);
+ }
+
/**
* used by tests to check for changing
* snapcounts
@@ -139,9 +163,12 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
// we do this in an attempt to ensure that not all of the servers
// in the ensemble take a snapshot at the same time
resetSnapshotStats();
+ lastFlushTime = Time.currentElapsedTime();
while (true) {
- Request si = queuedRequests.poll();
+ long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());
+ Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
if (si == null) {
+ /* We timed out looking for more writes to batch, go ahead and flush immediately */
flush();
si = queuedRequests.take();
}
@@ -187,7 +214,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
continue;
}
toFlush.add(si);
- if (toFlush.size() == FLUSH_SIZE) {
+ if (shouldFlush()) {
flush();
}
}
@@ -213,7 +240,8 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
}
if (this.nextProcessor instanceof Flushable) {
((Flushable)this.nextProcessor).flush();
- }
+ }
+ lastFlushTime = Time.currentElapsedTime();
}
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 28dc293..cb06dcf 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -145,6 +145,13 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
private ZooKeeperServerShutdownHandler zkShutdownHandler;
private volatile int createSessionTrackerServerId = 1;
+ private static final String FLUSH_DELAY = "zookeeper.flushDelay";
+ private static volatile long flushDelay;
+ private static final String MAX_WRITE_QUEUE_POLL_SIZE = "zookeeper.maxWriteQueuePollTime";
+ private static volatile long maxWriteQueuePollTime;
+ private static final String MAX_BATCH_SIZE = "zookeeper.maxBatchSize";
+ private static volatile int maxBatchSize;
+
/**
* Starting size of read and write ByteArroyOuputBuffers. Default is 32 bytes.
* Flag not used for small transfers like connectResponses.
@@ -155,6 +162,11 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
public static final int intBufferStartingSizeBytes;
static {
+ long configuredFlushDelay = Long.getLong(FLUSH_DELAY, 0);
+ setFlushDelay(configuredFlushDelay);
+ setMaxWriteQueuePollTime(Long.getLong(MAX_WRITE_QUEUE_POLL_SIZE, configuredFlushDelay / 3));
+ setMaxBatchSize(Integer.getInteger(MAX_BATCH_SIZE, 1000));
+
intBufferStartingSizeBytes = Integer.getInteger(
INT_BUFFER_STARTING_SIZE_BYTES,
DEFAULT_STARTING_BUFFER_SIZE);
@@ -1209,6 +1221,33 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
return false;
}
+ long getFlushDelay() {
+ return flushDelay;
+ }
+
+ static void setFlushDelay(long delay) {
+ LOG.info("{}={}", FLUSH_DELAY, delay);
+ flushDelay = delay;
+ }
+
+ long getMaxWriteQueuePollTime() {
+ return maxWriteQueuePollTime;
+ }
+
+ static void setMaxWriteQueuePollTime(long maxTime) {
+ LOG.info("{}={}", MAX_WRITE_QUEUE_POLL_SIZE, maxTime);
+ maxWriteQueuePollTime = maxTime;
+ }
+
+ int getMaxBatchSize() {
+ return maxBatchSize;
+ }
+
+ static void setMaxBatchSize(int size) {
+ LOG.info("{}={}", MAX_BATCH_SIZE, size);
+ maxBatchSize = size;
+ }
+
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
// We have the request, now process and setup for next
InputStream bais = new ByteBufferInputStream(incomingBuffer);
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
index b8cf706..39f542f 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
@@ -278,4 +278,40 @@ public class ZooKeeperServerBean implements ZooKeeperServerMXBean, ZKMBeanInfo {
public void setConnectionDecreaseRatio(double val) {
zks.connThrottle().setDecreasePoint(val);
}
+
+ ///////////////////////////////////////////////////////////////////////////
+
+ @Override
+ public long getFlushDelay() {
+ return zks.getFlushDelay();
+ }
+
+ @Override
+ public void setFlushDelay(long delay) {
+ ZooKeeperServer.setFlushDelay(delay);
+ }
+
+ ///////////////////////////////////////////////////////////////////////////
+
+ @Override
+ public long getMaxWriteQueuePollTime() {
+ return zks.getMaxWriteQueuePollTime();
+ }
+
+ @Override
+ public void setMaxWriteQueuePollTime(long delay) {
+ ZooKeeperServer.setMaxWriteQueuePollTime(delay);
+ }
+
+ ///////////////////////////////////////////////////////////////////////////
+
+ @Override
+ public int getMaxBatchSize() {
+ return zks.getMaxBatchSize();
+ }
+
+ @Override
+ public void setMaxBatchSize(int size) {
+ ZooKeeperServer.setMaxBatchSize(size);
+ }
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
index 91c8c82..a4482d6 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
@@ -183,4 +183,13 @@ public interface ZooKeeperServerMXBean {
* @return size of largest generated client response
*/
public int getMaxClientResponseSize();
+
+ public long getFlushDelay();
+ public void setFlushDelay(long delay);
+
+ public long getMaxWriteQueuePollTime();
+ public void setMaxWriteQueuePollTime(long delay);
+
+ public int getMaxBatchSize();
+ public void setMaxBatchSize(int size);
}