You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ha...@apache.org on 2019/07/30 00:31:22 UTC

[zookeeper] branch master updated: ZOOKEEPER-3359: Batch commits in the CommitProcessor

This is an automated email from the ASF dual-hosted git repository.

hanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new a6b38f8  ZOOKEEPER-3359: Batch commits in the CommitProcessor
a6b38f8 is described below

commit a6b38f83218791f8d9fabc52c865dcccf07026cc
Author: Brian Nixon <ni...@fb.com>
AuthorDate: Mon Jul 29 17:31:09 2019 -0700

    ZOOKEEPER-3359: Batch commits in the CommitProcessor
    
    Author: Brian Nixon <ni...@fb.com>
    
    Reviewers: Michael Han <ha...@apache.org>, Norbert Kalmar <nk...@yahoo.com>, Enrico Olivelli <eo...@gmail.com>
    
    Closes #905 from enixon/commit-proc-batch
---
 .../src/main/resources/markdown/zookeeperAdmin.md  |  23 ++
 .../zookeeper/server/ZooKeeperServerBean.java      |  13 +
 .../zookeeper/server/ZooKeeperServerMXBean.java    |   6 +
 .../zookeeper/server/quorum/CommitProcessor.java   | 318 ++++++++++++++-------
 .../quorum/CommitProcessorConcurrencyTest.java     | 159 +++++++++++
 .../server/quorum/CommitProcessorMetricsTest.java  |   4 +
 6 files changed, 422 insertions(+), 101 deletions(-)

diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index c596546..835e357 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -1445,6 +1445,29 @@ Both subsystems need to have sufficient amount of threads to achieve peak read t
     Number of Commit Processor worker threads. If configured with 0 worker threads, the main thread
     will process the request directly. The default value is the number of cpu cores.
 
+* *zookeeper.commitProcessor.maxReadBatchSize* :
+    (Java system property only: **zookeeper.commitProcessor.maxReadBatchSize**)
+    Max number of reads to process from queuedRequests before switching to processing commits.
+    If the value < 0 (default), we switch whenever we have a local write, and pending commits.
+    A high read batch size will delay commit processing, causing stale data to be served.
+    If reads are known to arrive in fixed size batches then matching that batch size with
+    the value of this property can smooth queue performance. Since reads are handled in parallel,
+    one recommendation is to set this property to match *zookeeper.commitProcessor.numWorkerThread*
+    (default is the number of cpu cores) or lower.
+
+* *zookeeper.commitProcessor.maxCommitBatchSize* :
+    (Java system property only: **zookeeper.commitProcessor.maxCommitBatchSize**)
+    Max number of commits to process before processing reads. We will try to process as many
+    remote/local commits as we can till we reach this count. A high commit batch size will delay
+    reads while processing more commits. A low commit batch size will favor reads.
+    It is recommended to only set this property when an ensemble is serving a workload with a high
+    commit rate. If writes are known to arrive in a set number of batches then matching that
+    batch size with the value of this property can smooth queue performance. A generic
+    approach would be to set this value to equal the ensemble size so that with the processing
+    of each batch the current server will probabilistically handle a write related to one of
+    its direct clients.
+    Default is "1". Negative and zero values are not supported.
+
 * *znode.container.checkIntervalMs* :
     (Java system property only)
     **New in 3.6.0:** The
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
index 92ceab8..6e89820 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
@@ -23,6 +23,7 @@ import java.util.Date;
 import org.apache.jute.BinaryInputArchive;
 import org.apache.zookeeper.Version;
 import org.apache.zookeeper.jmx.ZKMBeanInfo;
+import org.apache.zookeeper.server.quorum.CommitProcessor;
 
 /**
  * This class implements the ZooKeeper server MBean interface.
@@ -281,6 +282,18 @@ public class ZooKeeperServerBean implements ZooKeeperServerMXBean, ZKMBeanInfo {
 
     ///////////////////////////////////////////////////////////////////////////
 
+    public int getCommitProcMaxReadBatchSize() { return CommitProcessor.getMaxReadBatchSize(); }
+
+    public void setCommitProcMaxReadBatchSize(int size) { CommitProcessor.setMaxReadBatchSize(size); }
+
+    ///////////////////////////////////////////////////////////////////////////
+
+    public int getCommitProcMaxCommitBatchSize() { return CommitProcessor.getMaxCommitBatchSize(); }
+
+    public void setCommitProcMaxCommitBatchSize(int size) { CommitProcessor.setMaxCommitBatchSize(size);}
+
+    ///////////////////////////////////////////////////////////////////////////
+
     @Override
     public long getFlushDelay() {
         return zks.getFlushDelay();
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
index 4c71eac..7a42eaa 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
@@ -120,6 +120,12 @@ public interface ZooKeeperServerMXBean {
     public double getConnectionDecreaseRatio();
     public void setConnectionDecreaseRatio(double val);
 
+    public int getCommitProcMaxReadBatchSize();
+    public void setCommitProcMaxReadBatchSize(int size);
+
+    public int getCommitProcMaxCommitBatchSize();
+    public void setCommitProcMaxCommitBatchSize(int size);
+
     public int getRequestThrottleLimit();
     public void setRequestThrottleLimit(int requests);
 
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java
index 6e4b702..77635c9 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java
@@ -19,11 +19,12 @@
 package org.apache.zookeeper.server.quorum;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -82,6 +83,12 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
     /** Default worker pool shutdown timeout in ms: 5000 (5s) */
     public static final String ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT =
         "zookeeper.commitProcessor.shutdownTimeout";
+    /** Default max read batch size: -1 to disable the feature */
+    public static final String ZOOKEEPER_COMMIT_PROC_MAX_READ_BATCH_SIZE =
+        "zookeeper.commitProcessor.maxReadBatchSize";
+    /** Default max commit batch size: 1 */
+    public static final String ZOOKEEPER_COMMIT_PROC_MAX_COMMIT_BATCH_SIZE =
+        "zookeeper.commitProcessor.maxCommitBatchSize";
 
     /**
      * Incoming requests.
@@ -90,6 +97,13 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
         new LinkedBlockingQueue<Request>();
 
     /**
+     * Incoming requests that are waiting on a commit,
+     * contained in order of arrival
+     */
+    protected final LinkedBlockingQueue<Request> queuedWriteRequests =
+            new LinkedBlockingQueue<>();
+
+    /**
      * The number of read requests currently held in all session queues
      */
     private AtomicInteger numReadQueuedRequests = new AtomicInteger(0);
@@ -126,6 +140,23 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
     private Object emptyPoolSync = new Object();
 
     /**
+     * Max number of reads to process from queuedRequests before switching to
+     * processing commits. If the value is negative, we switch whenever we have
+     * a local write, and pending commits.
+     * A high read batch size will delay commit processing causing us to
+     * serve stale data.
+     */
+    private static volatile int maxReadBatchSize;
+    /**
+     * Max number of commits to process before processing reads. We will try to
+     * process as many remote/local commits as we can till we reach this
+     * count.
+     * A high commit batch size will delay reads while processing more commits.
+     * A low commit batch size will favor reads.
+     */
+    private static volatile int maxCommitBatchSize;
+
+    /**
      * This flag indicates whether we need to wait for a response to come back from the
      * leader or we just let the sync operation flow through like a read. The flag will
      * be false if the CommitProcessor is in a Leader pipeline.
@@ -209,22 +240,28 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
 
                 /*
                  * Processing up to requestsToProcess requests from the incoming
-                 * queue (queuedRequests), possibly less if a committed request
-                 * is present along with a pending local write. After the loop,
-                 * we process one committed request if commitIsWaiting.
+                 * queue (queuedRequests). If maxReadBatchSize is set then no
+                 * commits will be processed until maxReadBatchSize number of
+                 * reads are processed (or no more reads remain in the queue).
+                 * After the loop a single committed request is processed if
+                 * one is waiting (or a batch of commits if maxCommitBatchSize
+                 * is set).
                  */
-                Request request = null;
+                Request request;
+                int readsProcessed = 0;
                 while (!stopped && requestsToProcess > 0
+                        && (maxReadBatchSize < 0 || readsProcessed <= maxReadBatchSize)
                         && (request = queuedRequests.poll()) != null) {
                     requestsToProcess--;
                     if (needCommit(request)
                             || pendingRequests.containsKey(request.sessionId)) {
                         // Add request to pending
-                        pendingRequests
-                                .computeIfAbsent(request.sessionId, sid -> new ArrayDeque<>())
-                                .add(request);
-                        ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(pendingRequests.get(request.sessionId).size());
+                        Deque<Request> requests =
+                                pendingRequests.computeIfAbsent(request.sessionId, sid -> new ArrayDeque<>());
+                        requests.addLast(request);
+                        ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(requests.size());
                     } else {
+                        readsProcessed++;
                         numReadQueuedRequests.decrementAndGet();
                         sendToNextProcessor(request);
                     }
@@ -237,9 +274,10 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
                      * the queue, so if we have a pending request and a
                      * committed request, the committed request must be for that
                      * pending write or for a write originating at a different
-                     * server.
+                     * server. We skip this if maxReadBatchSize is set.
                      */
-                    if (!pendingRequests.isEmpty() && !committedRequests.isEmpty()){
+                    if (maxReadBatchSize < 0 &&
+                        !pendingRequests.isEmpty() && !committedRequests.isEmpty()){
                         /*
                          * We set commitIsWaiting so that we won't check
                          * committedRequests again.
@@ -248,91 +286,111 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
                         break;
                     }
                 }
+                ServerMetrics.getMetrics().READS_ISSUED_IN_COMMIT_PROC.add(readsProcessed);
 
-                // Handle a single committed request
-                if (commitIsWaiting && !stopped){
+                if (!commitIsWaiting) {
+                    commitIsWaiting = !committedRequests.isEmpty();
+                }
+
+                /*
+                 * Handle commits, if any.
+                 */
+                if (commitIsWaiting && !stopped) {
+                    /*
+                     * Drain outstanding reads
+                     */
                     waitForEmptyPool();
 
-                    if (stopped){
+                    if (stopped) {
                         return;
                     }
 
-                    // Process committed head
-                    if ((request = committedRequests.poll()) == null) {
-                        throw new IOException("Error: committed head is null");
-                    }
+                    int commitsToProcess = maxCommitBatchSize;
 
                     /*
-                     * Check if request is pending, if so, update it with the committed info
+                     * Loop through all the commits, and try to drain them.
                      */
-                    Deque<Request> sessionQueue = pendingRequests
-                            .get(request.sessionId);
-                    ServerMetrics.getMetrics().PENDING_SESSION_QUEUE_SIZE.add(pendingRequests.size());
-                    if (sessionQueue != null) {
-                        ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(sessionQueue.size());
-                        // If session queue != null, then it is also not empty.
-                        Request topPending = sessionQueue.poll();
-                        if (request.cxid != topPending.cxid) {
-                            /*
-                             * TL;DR - we should not encounter this scenario often under normal load.
-                             * We pass the commit to the next processor and put the pending back with a warning.
-                             *
-                             * Generally, we can get commit requests that are not at the queue head after
-                             * a session moved (see ZOOKEEPER-2684). Let's denote the previous server of the session
-                             * with A, and the server that the session moved to with B (keep in mind that it is
-                             * possible that the session already moved from B to a new server C, and maybe C=A).
-                             * 1. If request.cxid < topPending.cxid : this means that the session requested this update
-                             * from A, then moved to B (i.e., which is us), and now B receives the commit
-                             * for the update after the session already performed several operations in B
-                             * (and therefore its cxid is higher than that old request).
-                             * 2. If request.cxid > topPending.cxid : this means that the session requested an updated
-                             * from B with cxid that is bigger than the one we know therefore in this case we
-                             * are A, and we lost the connection to the session. Given that we are waiting for a commit
-                             * for that update, it means that we already sent the request to the leader and it will
-                             * be committed at some point (in this case the order of cxid won't follow zxid, since zxid
-                             * is an increasing order). It is not safe for us to delete the session's queue at this
-                             * point, since it is possible that the session has newer requests in it after it moved
-                             * back to us. We just leave the queue as it is, and once the commit arrives (for the old
-                             * request), the finalRequestProcessor will see a closed cnxn handle, and just won't send a
-                             * response.
-                             * Also note that we don't have a local session, therefore we treat the request
-                             * like any other commit for a remote request, i.e., we perform the update without sending
-                             * a response.
-                             */
-                            LOG.warn("Got request " + request +
-                                    " but we are expecting request " + topPending);
-                            sessionQueue.addFirst(topPending);
-                        } else {
+                    Set<Long> queuesToDrain = new HashSet<>();
+                    long startWriteTime = Time.currentElapsedTime();
+                    int commitsProcessed = 0;
+                    while (commitIsWaiting && !stopped && commitsToProcess > 0) {
+
+                        // Process committed head
+                        request = committedRequests.peek();
+
+                        /*
+                         * Check if this is a local write request is pending,
+                         * if so, update it with the committed info. If the commit matches
+                         * the first write queued in the blockedRequestQueue, we know this is
+                         * a commit for a local write, as commits are received in order. Else
+                         * it must be a commit for a remote write.
+                         */
+                        if (!queuedWriteRequests.isEmpty() &&
+                                queuedWriteRequests.peek().sessionId == request.sessionId &&
+                                queuedWriteRequests.peek().cxid == request.cxid) {
                             /*
-                             * Generally, we want to send to the next processor our version of the request,
-                             * since it contains the session information that is needed for post update processing.
-                             * In more details, when a request is in the local queue, there is (or could be) a client
-                             * attached to this server waiting for a response, and there is other bookkeeping of
-                             * requests that are outstanding and have originated from this server
-                             * (e.g., for setting the max outstanding requests) - we need to update this info when an
-                             * outstanding request completes. Note that in the other case (above), the operation
-                             * originated from a different server and there is no local bookkeeping or a local client
-                             * session that needs to be notified.
+                             * Commit matches the earliest write in our write queue.
                              */
-                            topPending.setHdr(request.getHdr());
-                            topPending.setTxn(request.getTxn());
-                            topPending.zxid = request.zxid;
-                            topPending.commitRecvTime = request.commitRecvTime;
-                            request = topPending;
-
-                            // Only decrement if we take a request off the queue.
-                            numWriteQueuedRequests.decrementAndGet();
+                            Deque<Request> sessionQueue = pendingRequests
+                                    .get(request.sessionId);
+                            ServerMetrics.getMetrics().PENDING_SESSION_QUEUE_SIZE.add(pendingRequests.size());
+                            if (sessionQueue == null || sessionQueue.isEmpty() || !needCommit(sessionQueue.peek())) {
+                                /*
+                                 * Can't process this write yet.
+                                 * Either there are reads pending in this session, or we
+                                 * haven't gotten to this write yet.
+                                 */
+                                break;
+                            } else {
+                                ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(sessionQueue.size());
+                                // If session queue != null, then it is also not empty.
+                                Request topPending = sessionQueue.poll();
+                                /*
+                                 * Generally, we want to send to the next processor our version of the request,
+                                 * since it contains the session information that is needed for post update processing.
+                                 * In more details, when a request is in the local queue, there is (or could be) a client
+                                 * attached to this server waiting for a response, and there is other bookkeeping of
+                                 * requests that are outstanding and have originated from this server
+                                 * (e.g., for setting the max outstanding requests) - we need to update this info when an
+                                 * outstanding request completes. Note that in the other case, the operation
+                                 * originated from a different server and there is no local bookkeeping or a local client
+                                 * session that needs to be notified.
+                                 */
+                                topPending.setHdr(request.getHdr());
+                                topPending.setTxn(request.getTxn());
+                                topPending.zxid = request.zxid;
+                                topPending.commitRecvTime = request.commitRecvTime;
+                                request = topPending;
+                                // Only decrement if we take a request off the queue.
+                                numWriteQueuedRequests.decrementAndGet();
+                                queuedWriteRequests.poll();
+                                queuesToDrain.add(request.sessionId);
+                            }
                         }
-                    }
+                        /*
+                         * Pull the request off the commit queue, now that we are going
+                         * to process it.
+                         */
+                        committedRequests.remove();
+                        commitsToProcess--;
+                        commitsProcessed++;
 
-                    sendToNextProcessor(request);
-                    waitForEmptyPool();
+                        // Process the write inline.
+                        processWrite(request);
+
+                        commitIsWaiting = !committedRequests.isEmpty();
+                    }
+                    ServerMetrics.getMetrics().WRITE_BATCH_TIME_IN_COMMIT_PROCESSOR.add(
+                            Time.currentElapsedTime() - startWriteTime);
+                    ServerMetrics.getMetrics().WRITES_ISSUED_IN_COMMIT_PROC.add(commitsProcessed);
 
                     /*
-                     * Process following reads if any, remove session queue if
+                     * Process following reads if any, remove session queue(s) if
                      * empty.
                      */
-                    if (sessionQueue != null) {
+                    readsProcessed = 0;
+                    for (Long sessionId : queuesToDrain) {
+                        Deque<Request> sessionQueue = pendingRequests.get(sessionId);
                         int readsAfterWrite = 0;
                         while (!stopped && !sessionQueue.isEmpty()
                                 && !needCommit(sessionQueue.peek())) {
@@ -341,12 +399,15 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
                             readsAfterWrite++;
                         }
                         ServerMetrics.getMetrics().READS_AFTER_WRITE_IN_SESSION_QUEUE.add(readsAfterWrite);
+                        readsProcessed += readsAfterWrite;
 
                         // Remove empty queues
                         if (sessionQueue.isEmpty()) {
-                            pendingRequests.remove(request.sessionId);
+                            pendingRequests.remove(sessionId);
                         }
                     }
+                    ServerMetrics.getMetrics().SESSION_QUEUES_DRAINED.add(queuesToDrain.size());
+                    ServerMetrics.getMetrics().READ_ISSUED_FROM_SESSION_QUEUE.add(readsProcessed);
                 }
 
                 ServerMetrics.getMetrics().COMMIT_PROCESS_TIME.add(Time.currentElapsedTime() - time);
@@ -388,6 +449,8 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
         workerShutdownTimeoutMS = Long.getLong(
             ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT, 5000);
 
+        initBatchSizes();
+
         LOG.info("Configuring CommitProcessor with "
                  + (numWorkerThreads > 0 ? numWorkerThreads : "no")
                  + " worker threads.");
@@ -409,6 +472,78 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
         workerPool.schedule(new CommitWorkRequest(request), request.sessionId);
     }
 
+    private void processWrite(Request request) throws RequestProcessorException {
+        processCommitMetrics(request, true);
+
+        long timeBeforeFinalProc = Time.currentElapsedTime();
+        nextProcessor.processRequest(request);
+        ServerMetrics.getMetrics().WRITE_FINAL_PROC_TIME.add(
+                Time.currentElapsedTime() - timeBeforeFinalProc);
+    }
+
+    private static void initBatchSizes() {
+        maxReadBatchSize = Integer.getInteger(
+                ZOOKEEPER_COMMIT_PROC_MAX_READ_BATCH_SIZE, -1);
+        maxCommitBatchSize = Integer.getInteger(
+                ZOOKEEPER_COMMIT_PROC_MAX_COMMIT_BATCH_SIZE, 1);
+
+        if (maxCommitBatchSize <= 0) {
+            String errorMsg = "maxCommitBatchSize must be positive, was " +
+                    maxCommitBatchSize;
+            throw new IllegalArgumentException(errorMsg);
+        }
+
+        LOG.info("Configuring CommitProcessor with readBatchSize {} commitBatchSize {}",
+                maxReadBatchSize,
+                maxCommitBatchSize);
+    }
+
+    private static void processCommitMetrics(Request request, boolean isWrite) {
+        if (isWrite) {
+            if (request.commitProcQueueStartTime != -1 &&
+                    request.commitRecvTime != -1) {
+                // Locally issued writes.
+                long currentTime = Time.currentElapsedTime();
+                ServerMetrics.getMetrics().WRITE_COMMITPROC_TIME.add(currentTime -
+                        request.commitProcQueueStartTime);
+                ServerMetrics.getMetrics().LOCAL_WRITE_COMMITTED_TIME.add(currentTime -
+                        request.commitRecvTime);
+            } else if (request.commitRecvTime != -1) {
+                // Writes issued by other servers.
+                ServerMetrics.getMetrics().SERVER_WRITE_COMMITTED_TIME.add(
+                        Time.currentElapsedTime() - request.commitRecvTime);
+            }
+        } else {
+            if (request.commitProcQueueStartTime != -1) {
+                ServerMetrics.getMetrics().READ_COMMITPROC_TIME.add(
+                        Time.currentElapsedTime() -
+                                request.commitProcQueueStartTime);
+            }
+        }
+    }
+
+    public static int getMaxReadBatchSize() {
+        return maxReadBatchSize;
+    }
+
+    public static int getMaxCommitBatchSize() {
+        return maxCommitBatchSize;
+    }
+
+    public static void setMaxReadBatchSize(int size) {
+        maxReadBatchSize = size;
+        LOG.info("Configuring CommitProcessor with readBatchSize {}",
+                 maxReadBatchSize);
+    }
+
+    public static void setMaxCommitBatchSize(int size) {
+        if (size > 0) {
+            maxCommitBatchSize = size;
+            LOG.info("Configuring CommitProcessor with commitBatchSize {}",
+                     maxCommitBatchSize);
+        }
+    }
+
     /**
      * CommitWorkRequest is a small wrapper class to allow
      * downstream processing to be run using the WorkerService
@@ -431,27 +566,7 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
 
         public void doWork() throws RequestProcessorException {
             try {
-                if (needCommit(request)) {
-                    if (request.commitProcQueueStartTime != -1 &&
-                            request.commitRecvTime != -1) {
-                        // Locally issued writes.
-                        long currentTime = Time.currentElapsedTime();
-                        ServerMetrics.getMetrics().WRITE_COMMITPROC_TIME.add(currentTime -
-                                request.commitProcQueueStartTime);
-                        ServerMetrics.getMetrics().LOCAL_WRITE_COMMITTED_TIME.add(currentTime -
-                                request.commitRecvTime);
-                    } else if (request.commitRecvTime != -1) {
-                        // Writes issued by other servers.
-                        ServerMetrics.getMetrics().SERVER_WRITE_COMMITTED_TIME.add(
-                                Time.currentElapsedTime() - request.commitRecvTime);
-                    }
-                } else {
-                    if (request.commitProcQueueStartTime != -1) {
-                        ServerMetrics.getMetrics().READ_COMMITPROC_TIME.add(
-                                Time.currentElapsedTime() -
-                                        request.commitProcQueueStartTime);
-                    }
-                }
+                processCommitMetrics(request, needCommit(request));
 
                 long timeBeforeFinalProc = Time.currentElapsedTime();
                 nextProcessor.processRequest(request);
@@ -508,6 +623,7 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
         queuedRequests.add(request);
         // If the request will block, add it to the queue of blocking requests
         if (needCommit(request)) {
+            queuedWriteRequests.add(request);
             numWriteQueuedRequests.incrementAndGet();
         } else {
             numReadQueuedRequests.incrementAndGet();
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
index 0e002b9..95889ef 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
@@ -62,6 +62,8 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
     public void setUp() throws Exception {
         processedRequests = new LinkedBlockingQueue<Request>();
         processor = new MockCommitProcessor();
+        CommitProcessor.setMaxReadBatchSize(-1);
+        CommitProcessor.setMaxCommitBatchSize(1);
     }
 
     @After
@@ -148,6 +150,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
         processor.committedRequests.add(writeReq);
         processor.queuedRequests.add(readReq);
         processor.queuedRequests.add(writeReq);
+        processor.queuedWriteRequests.add(writeReq);
         processor.initThreads(1);
 
         processor.stoppedMainLoop = true;
@@ -194,6 +197,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
             Request readReq = newRequest(new GetDataRequest(path, false),
                     OpCode.getData, sessionId, sessionId + 2);
             processor.queuedRequests.add(writeReq);
+            processor.queuedWriteRequests.add(writeReq);
             processor.queuedRequests.add(readReq);
             shouldNotBeProcessed.add(writeReq);
             shouldNotBeProcessed.add(readReq);
@@ -232,6 +236,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
                         CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
                 OpCode.create, 0x1, 1);
         processor.queuedRequests.add(writeReq);
+        processor.queuedWriteRequests.add(writeReq);
         shouldBeInPending.add(writeReq);
 
         for (int readReqId = 2; readReqId <= 5; ++readReqId) {
@@ -249,6 +254,8 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
                 processedRequests.isEmpty());
         Assert.assertTrue("Did not handled all of queuedRequests' requests",
                 processor.queuedRequests.isEmpty());
+        Assert.assertTrue("Removed from blockedQueuedRequests before commit",
+                !processor.queuedWriteRequests.isEmpty());
 
         shouldBeInPending
                 .removeAll(processor.pendingRequests.get(writeReq.sessionId));
@@ -273,6 +280,155 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
                 processor.committedRequests.isEmpty());
         Assert.assertTrue("Did not process committed request",
                 processor.pendingRequests.isEmpty());
+        Assert.assertTrue("Did not remove from blockedQueuedRequests",
+                processor.queuedWriteRequests.isEmpty());
+    }
+
+    /**
+     * In the following test, we add a write request followed by several read
+     * requests of the same session. We will do this for 2 sessions. For the
+     * second session, we will queue up another write after the reads, and
+     * we verify several things - 1. The writes are not processed until
+     * the commits arrive. 2. Only 2 writes are processed, with maxCommitBatchSize
+     * of 3, due to the blocking reads. 3. Once the writes are processed,
+     * all the read requests are processed as well. 4. All read requests are
+     * executed after the write, before any other write for that session,
+     * along with new reads. 5. Then we add another read for session 1, and
+     * another write and commit for session 2. 6. Only the old write, and the read
+     * are processed, leaving the commit in the queue. 7. Last write is executed
+     * in the last iteration, and all lists are empty.
+     */
+    @Test
+    public void processAllWritesMaxBatchSize()
+            throws Exception {
+        final String path = "/processAllWritesMaxBatchSize";
+        HashSet<Request> shouldBeProcessedAfterPending = new HashSet<Request>();
+
+        Request writeReq = newRequest(
+                new CreateRequest(path + "_1", new byte[0], Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+                OpCode.create, 0x1, 1);
+        processor.queuedRequests.add(writeReq);
+        processor.queuedWriteRequests.add(writeReq);
+
+        Request writeReq2 = newRequest(
+                new CreateRequest(path + "_2", new byte[0], Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+                OpCode.create, 0x2, 1);
+        processor.queuedRequests.add(writeReq2);
+        processor.queuedWriteRequests.add(writeReq2);
+
+        for (int readReqId = 2; readReqId <= 5; ++readReqId) {
+            Request readReq = newRequest(new GetDataRequest(path, false),
+                    OpCode.getData, 0x1, readReqId);
+            Request readReq2 = newRequest(new GetDataRequest(path, false),
+                    OpCode.getData, 0x2, readReqId);
+            processor.queuedRequests.add(readReq);
+            shouldBeProcessedAfterPending.add(readReq);
+            processor.queuedRequests.add(readReq2);
+            shouldBeProcessedAfterPending.add(readReq2);
+        }
+
+        Request writeReq3 = newRequest(
+                new CreateRequest(path + "_3", new byte[0], Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+                OpCode.create, 0x2, 6);
+        processor.queuedRequests.add(writeReq3);
+        processor.queuedWriteRequests.add(writeReq3);
+
+        processor.initThreads(defaultSizeOfThreadPool);
+
+        processor.stoppedMainLoop = true;
+        CommitProcessor.setMaxCommitBatchSize(2);
+        processor.run();
+        Assert.assertTrue("Processed without waiting for commit",
+                processedRequests.isEmpty());
+        Assert.assertTrue("Did not handled all of queuedRequests' requests",
+                processor.queuedRequests.isEmpty());
+        Assert.assertTrue("Removed from blockedQueuedRequests before commit",
+                !processor.queuedWriteRequests.isEmpty());
+        Assert.assertTrue("Missing session 1 in pending queue",
+                processor.pendingRequests.containsKey(writeReq.sessionId));
+        Assert.assertTrue("Missing session 2 in pending queue",
+                processor.pendingRequests.containsKey(writeReq2.sessionId));
+
+        processor.committedRequests.add(writeReq);
+        processor.committedRequests.add(writeReq2);
+        processor.committedRequests.add(writeReq3);
+        processor.stoppedMainLoop = true;
+        CommitProcessor.setMaxCommitBatchSize(3);
+        processor.run();
+        processor.initThreads(defaultSizeOfThreadPool);
+
+        Thread.sleep(500);
+        Assert.assertTrue("Did not process committed request",
+                processedRequests.peek() == writeReq);
+        Assert.assertTrue("Did not process following read request",
+                processedRequests.containsAll(shouldBeProcessedAfterPending));
+        Assert.assertTrue("Processed committed request",
+                !processor.committedRequests.isEmpty());
+        Assert.assertTrue("Removed commit for write req 3",
+                processor.committedRequests.peek() == writeReq3);
+        Assert.assertTrue("Processed committed request",
+                !processor.pendingRequests.isEmpty());
+        Assert.assertTrue("Missing session 2 in pending queue",
+                processor.pendingRequests.containsKey(writeReq3.sessionId));
+        Assert.assertTrue("Missing write 3 in pending queue",
+                processor.pendingRequests.get(writeReq3.sessionId).peek() == writeReq3);
+        Assert.assertTrue("Removed from blockedQueuedRequests",
+                !processor.queuedWriteRequests.isEmpty());
+        Assert.assertTrue("Removed write req 3 from blockedQueuedRequests",
+                processor.queuedWriteRequests.peek() == writeReq3);
+
+        Request readReq3 = newRequest(new GetDataRequest(path, false),
+                OpCode.getData, 0x1, 7);
+        processor.queuedRequests.add(readReq3);
+        shouldBeProcessedAfterPending.add(readReq3);
+        Request writeReq4 = newRequest(
+                new CreateRequest(path + "_4", new byte[0], Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+                OpCode.create, 0x2, 7);
+
+        processor.queuedRequests.add(writeReq4);
+        processor.queuedWriteRequests.add(writeReq4);
+        processor.committedRequests.add(writeReq4);
+
+        processor.stoppedMainLoop = true;
+        CommitProcessor.setMaxCommitBatchSize(3);
+        processor.run();
+        processor.initThreads(defaultSizeOfThreadPool);
+
+        Thread.sleep(500);
+        Assert.assertTrue("Did not process committed request",
+                processedRequests.peek() == writeReq);
+        Assert.assertTrue("Did not process following read request",
+                processedRequests.containsAll(shouldBeProcessedAfterPending));
+        Assert.assertTrue("Processed unexpected committed request",
+                !processor.committedRequests.isEmpty());
+        Assert.assertTrue("Unexpected pending request",
+                processor.pendingRequests.isEmpty());
+        Assert.assertTrue("Removed from blockedQueuedRequests",
+                !processor.queuedWriteRequests.isEmpty());
+        Assert.assertTrue("Removed write req 4 from blockedQueuedRequests",
+                processor.queuedWriteRequests.peek() == writeReq4);
+
+        processor.stoppedMainLoop = true;
+        CommitProcessor.setMaxCommitBatchSize(3);
+        processor.run();
+        processor.initThreads(defaultSizeOfThreadPool);
+
+        Thread.sleep(500);
+        Assert.assertTrue("Did not process committed request",
+                processedRequests.peek() == writeReq);
+        Assert.assertTrue("Did not process following read request",
+                processedRequests.containsAll(shouldBeProcessedAfterPending));
+        Assert.assertTrue("Did not process committed request",
+                processor.committedRequests.isEmpty());
+        Assert.assertTrue("Did not process committed request",
+                processor.pendingRequests.isEmpty());
+        Assert.assertTrue("Did not remove from blockedQueuedRequests",
+                processor.queuedWriteRequests.isEmpty());
+
     }
 
     /**
@@ -322,6 +478,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
                         CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
                 OpCode.create, 0x3, 1);
         processor.queuedRequests.add(firstCommittedReq);
+        processor.queuedWriteRequests.add(firstCommittedReq);
         processor.committedRequests.add(firstCommittedReq);
         Set<Request> allReads = new HashSet<Request>();
 
@@ -399,6 +556,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
                         CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
                 OpCode.create, sessionid, readReqId++);
         processor.queuedRequests.add(firstCommittedReq);
+        processor.queuedWriteRequests.add(firstCommittedReq);
         localRequests.add(firstCommittedReq);
 
         // queue read requests to queuedRequests
@@ -463,6 +621,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
                         CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
                 OpCode.create, sessionid, lastCXid);
         processor.queuedRequests.add(orphanCommittedReq);
+        processor.queuedWriteRequests.add(orphanCommittedReq);
         localRequests.add(orphanCommittedReq);
 
         // queue read requests to queuedRequests
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java
index 426aae9..f284052 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java
@@ -51,6 +51,10 @@ public class CommitProcessorMetricsTest extends ZKTestCase {
     public void setup() {
         LOG.info("setup");
         ServerMetrics.getMetrics().resetAll();
+
+        // ensure no leaked parallelism properties
+        System.clearProperty("zookeeper.commitProcessor.maxReadBatchSize");
+        System.clearProperty("zookeeper.commitProcessor.maxCommitBatchSize");
     }
 
     public void setupProcessors(int commitWorkers, int finalProcTime ) {