You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by eo...@apache.org on 2019/05/22 09:39:19 UTC

[zookeeper] branch master updated: ZOOKEEPER-3311: Allow a delay to the transaction log flush

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new cc431f7  ZOOKEEPER-3311: Allow a delay to the transaction log flush
cc431f7 is described below

commit cc431f70020b9a2028edcc61e41cff9ee85b078f
Author: Brian Nixon <ni...@fb.com>
AuthorDate: Wed May 22 11:39:06 2019 +0200

    ZOOKEEPER-3311: Allow a delay to the transaction log flush
    
    Author: Brian Nixon <ni...@fb.com>
    
    Reviewers: fangmin@apache.org, eolivelli@apache.org
    
    Closes #851 from enixon/flush-delay
---
 .../src/main/resources/markdown/zookeeperAdmin.md  | 20 +++++++++++
 .../zookeeper/server/SyncRequestProcessor.java     | 40 ++++++++++++++++++----
 .../apache/zookeeper/server/ZooKeeperServer.java   | 39 +++++++++++++++++++++
 .../zookeeper/server/ZooKeeperServerBean.java      | 36 +++++++++++++++++++
 .../zookeeper/server/ZooKeeperServerMXBean.java    |  9 +++++
 5 files changed, 138 insertions(+), 6 deletions(-)

diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index 454aa06..ceb0e20 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -837,6 +837,26 @@ property, when available, is noted below.
     This should be set to `NettyServerCnxnFactory` in order to use TLS based server communication.
     Default is `NIOServerCnxnFactory`.
 
+* *flushDelay* :
+    (Java system property: **zookeeper.flushDelay**)
+    Time in milliseconds to delay the flush of the commit log.
+    Does not effect the limit defined by *maxBatchSize*.
+    Disabled by default (with value 0). Ensembles with high write rates
+    may see throughput improved with a value of 10-20 ms.
+
+* *maxWriteQueuePollTime* :
+    (Java system property: **zookeeper.maxWriteQueuePollTime**)
+    If *flushDelay* is enabled, this determines the amount of time in milliseconds
+    to wait before flushing when no new requests are being queued.
+    Set to *flushDelay*/3 by default (implicitly disabled by default).
+
+* *maxBatchSize* :
+    (Java system property: **zookeeper.maxBatchSize**)
+    The number of transactions allowed in the server before a flush of the
+    commit log is triggered.
+    Does not effect the limit defined by *flushDelay*.
+    Default is 1000.
+
 <a name="sc_clusterOptions"></a>
 
 #### Cluster Options
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
index 044c456..d30fd2b 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java
@@ -27,7 +27,9 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.zookeeper.common.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,8 +55,6 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
 
     private static final Logger LOG = LoggerFactory.getLogger(SyncRequestProcessor.class);
 
-    private static final int FLUSH_SIZE = 1000;
-
     private static final Request REQUEST_OF_DEATH = Request.requestOfDeath;
 
     /** The number of log entries to log before starting a snapshot */
@@ -85,7 +85,8 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
      * disk. Basically this is the list of SyncItems whose callbacks will be
      * invoked after flush returns successfully.
      */
-    private final Queue<Request> toFlush = new ArrayDeque<>(FLUSH_SIZE);
+    private final Queue<Request> toFlush;
+    private long lastFlushTime;
 
     public SyncRequestProcessor(ZooKeeperServer zks,
             RequestProcessor nextProcessor) {
@@ -93,6 +94,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
                 .getZooKeeperServerListener());
         this.zks = zks;
         this.nextProcessor = nextProcessor;
+        this.toFlush = new ArrayDeque<>(zks.getMaxBatchSize());
     }
 
     /**
@@ -112,6 +114,28 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
         return snapCount;
     }
 
+    private long getRemainingDelay() {
+        long flushDelay = zks.getFlushDelay();
+        long duration = Time.currentElapsedTime() - lastFlushTime;
+        if (duration < flushDelay) {
+            return flushDelay - duration;
+        }
+        return 0;
+    }
+
+    /** If both flushDelay and maxMaxBatchSize are set (> 0), flush
+     * whenever either condition is hit. If only one or the other is
+     * set, flush only when the relevant condition is hit.
+     */
+    private boolean shouldFlush() {
+        long flushDelay = zks.getFlushDelay();
+        long maxBatchSize = zks.getMaxBatchSize();
+        if ((flushDelay > 0) && (getRemainingDelay() == 0)) {
+            return true;
+        }
+        return (maxBatchSize > 0) && (toFlush.size() >= maxBatchSize);
+    }
+
     /**
      * used by tests to check for changing
      * snapcounts
@@ -139,9 +163,12 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
             // we do this in an attempt to ensure that not all of the servers
             // in the ensemble take a snapshot at the same time
             resetSnapshotStats();
+            lastFlushTime = Time.currentElapsedTime();
             while (true) {
-                Request si = queuedRequests.poll();
+                long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());
+                Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
                 if (si == null) {
+                    /* We timed out looking for more writes to batch, go ahead and flush immediately */
                     flush();
                     si = queuedRequests.take();
                 }
@@ -187,7 +214,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
                     continue;
                 }
                 toFlush.add(si);
-                if (toFlush.size() == FLUSH_SIZE) {
+                if (shouldFlush()) {
                     flush();
                 }
             }
@@ -213,7 +240,8 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
           }
           if (this.nextProcessor instanceof Flushable) {
               ((Flushable)this.nextProcessor).flush();
-          } 
+          }
+          lastFlushTime = Time.currentElapsedTime();
       }
     }
 
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 28dc293..cb06dcf 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -145,6 +145,13 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     private ZooKeeperServerShutdownHandler zkShutdownHandler;
     private volatile int createSessionTrackerServerId = 1;
 
+    private static final String FLUSH_DELAY = "zookeeper.flushDelay";
+    private static volatile long flushDelay;
+    private static final String MAX_WRITE_QUEUE_POLL_SIZE = "zookeeper.maxWriteQueuePollTime";
+    private static volatile long maxWriteQueuePollTime;
+    private static final String MAX_BATCH_SIZE = "zookeeper.maxBatchSize";
+    private static volatile int maxBatchSize;
+
     /**
      * Starting size of read and write ByteArroyOuputBuffers. Default is 32 bytes.
      * Flag not used for small transfers like connectResponses.
@@ -155,6 +162,11 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     public static final int intBufferStartingSizeBytes;
 
     static {
+        long configuredFlushDelay = Long.getLong(FLUSH_DELAY, 0);
+        setFlushDelay(configuredFlushDelay);
+        setMaxWriteQueuePollTime(Long.getLong(MAX_WRITE_QUEUE_POLL_SIZE, configuredFlushDelay / 3));
+        setMaxBatchSize(Integer.getInteger(MAX_BATCH_SIZE, 1000));
+
         intBufferStartingSizeBytes = Integer.getInteger(
                 INT_BUFFER_STARTING_SIZE_BYTES,
                 DEFAULT_STARTING_BUFFER_SIZE);
@@ -1209,6 +1221,33 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         return false;
     }
 
+    long getFlushDelay() {
+        return flushDelay;
+    }
+
+    static void setFlushDelay(long delay) {
+        LOG.info("{}={}", FLUSH_DELAY, delay);
+        flushDelay = delay;
+    }
+
+    long getMaxWriteQueuePollTime() {
+        return maxWriteQueuePollTime;
+    }
+
+    static void setMaxWriteQueuePollTime(long maxTime) {
+        LOG.info("{}={}", MAX_WRITE_QUEUE_POLL_SIZE, maxTime);
+        maxWriteQueuePollTime = maxTime;
+    }
+
+    int getMaxBatchSize() {
+        return maxBatchSize;
+    }
+
+    static void setMaxBatchSize(int size) {
+        LOG.info("{}={}", MAX_BATCH_SIZE, size);
+        maxBatchSize = size;
+    }
+
     public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
         // We have the request, now process and setup for next
         InputStream bais = new ByteBufferInputStream(incomingBuffer);
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
index b8cf706..39f542f 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
@@ -278,4 +278,40 @@ public class ZooKeeperServerBean implements ZooKeeperServerMXBean, ZKMBeanInfo {
     public void setConnectionDecreaseRatio(double val) {
         zks.connThrottle().setDecreasePoint(val);
     }
+
+    ///////////////////////////////////////////////////////////////////////////
+
+    @Override
+    public long getFlushDelay() {
+        return zks.getFlushDelay();
+    }
+
+    @Override
+    public void setFlushDelay(long delay) {
+        ZooKeeperServer.setFlushDelay(delay);
+    }
+
+    ///////////////////////////////////////////////////////////////////////////
+
+    @Override
+    public long getMaxWriteQueuePollTime() {
+        return zks.getMaxWriteQueuePollTime();
+    }
+
+    @Override
+    public void setMaxWriteQueuePollTime(long delay) {
+        ZooKeeperServer.setMaxWriteQueuePollTime(delay);
+    }
+
+    ///////////////////////////////////////////////////////////////////////////
+
+    @Override
+    public int getMaxBatchSize() {
+        return zks.getMaxBatchSize();
+    }
+
+    @Override
+    public void setMaxBatchSize(int size) {
+        ZooKeeperServer.setMaxBatchSize(size);
+    }
 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
index 91c8c82..a4482d6 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
@@ -183,4 +183,13 @@ public interface ZooKeeperServerMXBean {
      * @return size of largest generated client response
      */
     public int getMaxClientResponseSize();
+
+    public long getFlushDelay();
+    public void setFlushDelay(long delay);
+
+    public long getMaxWriteQueuePollTime();
+    public void setMaxWriteQueuePollTime(long delay);
+
+    public int getMaxBatchSize();
+    public void setMaxBatchSize(int size);
 }