You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ha...@apache.org on 2019/07/30 00:31:22 UTC
[zookeeper] branch master updated: ZOOKEEPER-3359: Batch commits in
the CommitProcessor
This is an automated email from the ASF dual-hosted git repository.
hanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new a6b38f8 ZOOKEEPER-3359: Batch commits in the CommitProcessor
a6b38f8 is described below
commit a6b38f83218791f8d9fabc52c865dcccf07026cc
Author: Brian Nixon <ni...@fb.com>
AuthorDate: Mon Jul 29 17:31:09 2019 -0700
ZOOKEEPER-3359: Batch commits in the CommitProcessor
Author: Brian Nixon <ni...@fb.com>
Reviewers: Michael Han <ha...@apache.org>, Norbert Kalmar <nk...@yahoo.com>, Enrico Olivelli <eo...@gmail.com>
Closes #905 from enixon/commit-proc-batch
---
.../src/main/resources/markdown/zookeeperAdmin.md | 23 ++
.../zookeeper/server/ZooKeeperServerBean.java | 13 +
.../zookeeper/server/ZooKeeperServerMXBean.java | 6 +
.../zookeeper/server/quorum/CommitProcessor.java | 318 ++++++++++++++-------
.../quorum/CommitProcessorConcurrencyTest.java | 159 +++++++++++
.../server/quorum/CommitProcessorMetricsTest.java | 4 +
6 files changed, 422 insertions(+), 101 deletions(-)
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index c596546..835e357 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -1445,6 +1445,29 @@ Both subsystems need to have sufficient amount of threads to achieve peak read t
Number of Commit Processor worker threads. If configured with 0 worker threads, the main thread
will process the request directly. The default value is the number of cpu cores.
+* *zookeeper.commitProcessor.maxReadBatchSize* :
+ (Java system property only: **zookeeper.commitProcessor.maxReadBatchSize**)
+ Max number of reads to process from queuedRequests before switching to processing commits.
+ If the value < 0 (default), we switch whenever we have a local write, and pending commits.
+ A high read batch size will delay commit processing, causing stale data to be served.
+ If reads are known to arrive in fixed size batches then matching that batch size with
+ the value of this property can smooth queue performance. Since reads are handled in parallel,
+ one recommendation is to set this property to match *zookeeper.commitProcessor.numWorkerThread*
+ (default is the number of cpu cores) or lower.
+
+* *zookeeper.commitProcessor.maxCommitBatchSize* :
+ (Java system property only: **zookeeper.commitProcessor.maxCommitBatchSize**)
+ Max number of commits to process before processing reads. We will try to process as many
+ remote/local commits as we can till we reach this count. A high commit batch size will delay
+ reads while processing more commits. A low commit batch size will favor reads.
+ It is recommended to only set this property when an ensemble is serving a workload with a high
+ commit rate. If writes are known to arrive in a set number of batches then matching that
+ batch size with the value of this property can smooth queue performance. A generic
+ approach would be to set this value to equal the ensemble size so that with the processing
+ of each batch the current server will probabilistically handle a write related to one of
+ its direct clients.
+ Default is "1". Negative and zero values are not supported.
+
* *znode.container.checkIntervalMs* :
(Java system property only)
**New in 3.6.0:** The
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
index 92ceab8..6e89820 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
@@ -23,6 +23,7 @@ import java.util.Date;
import org.apache.jute.BinaryInputArchive;
import org.apache.zookeeper.Version;
import org.apache.zookeeper.jmx.ZKMBeanInfo;
+import org.apache.zookeeper.server.quorum.CommitProcessor;
/**
* This class implements the ZooKeeper server MBean interface.
@@ -281,6 +282,18 @@ public class ZooKeeperServerBean implements ZooKeeperServerMXBean, ZKMBeanInfo {
///////////////////////////////////////////////////////////////////////////
+ public int getCommitProcMaxReadBatchSize() { return CommitProcessor.getMaxReadBatchSize(); }
+
+ public void setCommitProcMaxReadBatchSize(int size) { CommitProcessor.setMaxReadBatchSize(size); }
+
+ ///////////////////////////////////////////////////////////////////////////
+
+ public int getCommitProcMaxCommitBatchSize() { return CommitProcessor.getMaxCommitBatchSize(); }
+
+ public void setCommitProcMaxCommitBatchSize(int size) { CommitProcessor.setMaxCommitBatchSize(size);}
+
+ ///////////////////////////////////////////////////////////////////////////
+
@Override
public long getFlushDelay() {
return zks.getFlushDelay();
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
index 4c71eac..7a42eaa 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
@@ -120,6 +120,12 @@ public interface ZooKeeperServerMXBean {
public double getConnectionDecreaseRatio();
public void setConnectionDecreaseRatio(double val);
+ public int getCommitProcMaxReadBatchSize();
+ public void setCommitProcMaxReadBatchSize(int size);
+
+ public int getCommitProcMaxCommitBatchSize();
+ public void setCommitProcMaxCommitBatchSize(int size);
+
public int getRequestThrottleLimit();
public void setRequestThrottleLimit(int requests);
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java
index 6e4b702..77635c9 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java
@@ -19,11 +19,12 @@
package org.apache.zookeeper.server.quorum;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.LinkedBlockingQueue;
@@ -82,6 +83,12 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
/** Default worker pool shutdown timeout in ms: 5000 (5s) */
public static final String ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT =
"zookeeper.commitProcessor.shutdownTimeout";
+ /** Default max read batch size: -1 to disable the feature */
+ public static final String ZOOKEEPER_COMMIT_PROC_MAX_READ_BATCH_SIZE =
+ "zookeeper.commitProcessor.maxReadBatchSize";
+ /** Default max commit batch size: 1 */
+ public static final String ZOOKEEPER_COMMIT_PROC_MAX_COMMIT_BATCH_SIZE =
+ "zookeeper.commitProcessor.maxCommitBatchSize";
/**
* Incoming requests.
@@ -90,6 +97,13 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
new LinkedBlockingQueue<Request>();
/**
+ * Incoming requests that are waiting on a commit,
+ * contained in order of arrival
+ */
+ protected final LinkedBlockingQueue<Request> queuedWriteRequests =
+ new LinkedBlockingQueue<>();
+
+ /**
* The number of read requests currently held in all session queues
*/
private AtomicInteger numReadQueuedRequests = new AtomicInteger(0);
@@ -126,6 +140,23 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
private Object emptyPoolSync = new Object();
/**
+ * Max number of reads to process from queuedRequests before switching to
+ * processing commits. If the value is negative, we switch whenever we have
+ * a local write, and pending commits.
+ * A high read batch size will delay commit processing causing us to
+ * serve stale data.
+ */
+ private static volatile int maxReadBatchSize;
+ /**
+ * Max number of commits to process before processing reads. We will try to
+ * process as many remote/local commits as we can till we reach this
+ * count.
+ * A high commit batch size will delay reads while processing more commits.
+ * A low commit batch size will favor reads.
+ */
+ private static volatile int maxCommitBatchSize;
+
+ /**
* This flag indicates whether we need to wait for a response to come back from the
* leader or we just let the sync operation flow through like a read. The flag will
* be false if the CommitProcessor is in a Leader pipeline.
@@ -209,22 +240,28 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
/*
* Processing up to requestsToProcess requests from the incoming
- * queue (queuedRequests), possibly less if a committed request
- * is present along with a pending local write. After the loop,
- * we process one committed request if commitIsWaiting.
+ * queue (queuedRequests). If maxReadBatchSize is set then no
+ * commits will be processed until maxReadBatchSize number of
+ * reads are processed (or no more reads remain in the queue).
+ * After the loop a single committed request is processed if
+ * one is waiting (or a batch of commits if maxCommitBatchSize
+ * is set).
*/
- Request request = null;
+ Request request;
+ int readsProcessed = 0;
while (!stopped && requestsToProcess > 0
+ && (maxReadBatchSize < 0 || readsProcessed <= maxReadBatchSize)
&& (request = queuedRequests.poll()) != null) {
requestsToProcess--;
if (needCommit(request)
|| pendingRequests.containsKey(request.sessionId)) {
// Add request to pending
- pendingRequests
- .computeIfAbsent(request.sessionId, sid -> new ArrayDeque<>())
- .add(request);
- ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(pendingRequests.get(request.sessionId).size());
+ Deque<Request> requests =
+ pendingRequests.computeIfAbsent(request.sessionId, sid -> new ArrayDeque<>());
+ requests.addLast(request);
+ ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(requests.size());
} else {
+ readsProcessed++;
numReadQueuedRequests.decrementAndGet();
sendToNextProcessor(request);
}
@@ -237,9 +274,10 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
* the queue, so if we have a pending request and a
* committed request, the committed request must be for that
* pending write or for a write originating at a different
- * server.
+ * server. We skip this if maxReadBatchSize is set.
*/
- if (!pendingRequests.isEmpty() && !committedRequests.isEmpty()){
+ if (maxReadBatchSize < 0 &&
+ !pendingRequests.isEmpty() && !committedRequests.isEmpty()){
/*
* We set commitIsWaiting so that we won't check
* committedRequests again.
@@ -248,91 +286,111 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
break;
}
}
+ ServerMetrics.getMetrics().READS_ISSUED_IN_COMMIT_PROC.add(readsProcessed);
- // Handle a single committed request
- if (commitIsWaiting && !stopped){
+ if (!commitIsWaiting) {
+ commitIsWaiting = !committedRequests.isEmpty();
+ }
+
+ /*
+ * Handle commits, if any.
+ */
+ if (commitIsWaiting && !stopped) {
+ /*
+ * Drain outstanding reads
+ */
waitForEmptyPool();
- if (stopped){
+ if (stopped) {
return;
}
- // Process committed head
- if ((request = committedRequests.poll()) == null) {
- throw new IOException("Error: committed head is null");
- }
+ int commitsToProcess = maxCommitBatchSize;
/*
- * Check if request is pending, if so, update it with the committed info
+ * Loop through all the commits, and try to drain them.
*/
- Deque<Request> sessionQueue = pendingRequests
- .get(request.sessionId);
- ServerMetrics.getMetrics().PENDING_SESSION_QUEUE_SIZE.add(pendingRequests.size());
- if (sessionQueue != null) {
- ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(sessionQueue.size());
- // If session queue != null, then it is also not empty.
- Request topPending = sessionQueue.poll();
- if (request.cxid != topPending.cxid) {
- /*
- * TL;DR - we should not encounter this scenario often under normal load.
- * We pass the commit to the next processor and put the pending back with a warning.
- *
- * Generally, we can get commit requests that are not at the queue head after
- * a session moved (see ZOOKEEPER-2684). Let's denote the previous server of the session
- * with A, and the server that the session moved to with B (keep in mind that it is
- * possible that the session already moved from B to a new server C, and maybe C=A).
- * 1. If request.cxid < topPending.cxid : this means that the session requested this update
- * from A, then moved to B (i.e., which is us), and now B receives the commit
- * for the update after the session already performed several operations in B
- * (and therefore its cxid is higher than that old request).
- * 2. If request.cxid > topPending.cxid : this means that the session requested an updated
- * from B with cxid that is bigger than the one we know therefore in this case we
- * are A, and we lost the connection to the session. Given that we are waiting for a commit
- * for that update, it means that we already sent the request to the leader and it will
- * be committed at some point (in this case the order of cxid won't follow zxid, since zxid
- * is an increasing order). It is not safe for us to delete the session's queue at this
- * point, since it is possible that the session has newer requests in it after it moved
- * back to us. We just leave the queue as it is, and once the commit arrives (for the old
- * request), the finalRequestProcessor will see a closed cnxn handle, and just won't send a
- * response.
- * Also note that we don't have a local session, therefore we treat the request
- * like any other commit for a remote request, i.e., we perform the update without sending
- * a response.
- */
- LOG.warn("Got request " + request +
- " but we are expecting request " + topPending);
- sessionQueue.addFirst(topPending);
- } else {
+ Set<Long> queuesToDrain = new HashSet<>();
+ long startWriteTime = Time.currentElapsedTime();
+ int commitsProcessed = 0;
+ while (commitIsWaiting && !stopped && commitsToProcess > 0) {
+
+ // Process committed head
+ request = committedRequests.peek();
+
+ /*
+ * Check if this is a local write request is pending,
+ * if so, update it with the committed info. If the commit matches
+ * the first write queued in the blockedRequestQueue, we know this is
+ * a commit for a local write, as commits are received in order. Else
+ * it must be a commit for a remote write.
+ */
+ if (!queuedWriteRequests.isEmpty() &&
+ queuedWriteRequests.peek().sessionId == request.sessionId &&
+ queuedWriteRequests.peek().cxid == request.cxid) {
/*
- * Generally, we want to send to the next processor our version of the request,
- * since it contains the session information that is needed for post update processing.
- * In more details, when a request is in the local queue, there is (or could be) a client
- * attached to this server waiting for a response, and there is other bookkeeping of
- * requests that are outstanding and have originated from this server
- * (e.g., for setting the max outstanding requests) - we need to update this info when an
- * outstanding request completes. Note that in the other case (above), the operation
- * originated from a different server and there is no local bookkeeping or a local client
- * session that needs to be notified.
+ * Commit matches the earliest write in our write queue.
*/
- topPending.setHdr(request.getHdr());
- topPending.setTxn(request.getTxn());
- topPending.zxid = request.zxid;
- topPending.commitRecvTime = request.commitRecvTime;
- request = topPending;
-
- // Only decrement if we take a request off the queue.
- numWriteQueuedRequests.decrementAndGet();
+ Deque<Request> sessionQueue = pendingRequests
+ .get(request.sessionId);
+ ServerMetrics.getMetrics().PENDING_SESSION_QUEUE_SIZE.add(pendingRequests.size());
+ if (sessionQueue == null || sessionQueue.isEmpty() || !needCommit(sessionQueue.peek())) {
+ /*
+ * Can't process this write yet.
+ * Either there are reads pending in this session, or we
+ * haven't gotten to this write yet.
+ */
+ break;
+ } else {
+ ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(sessionQueue.size());
+ // If session queue != null, then it is also not empty.
+ Request topPending = sessionQueue.poll();
+ /*
+ * Generally, we want to send to the next processor our version of the request,
+ * since it contains the session information that is needed for post update processing.
+ * In more details, when a request is in the local queue, there is (or could be) a client
+ * attached to this server waiting for a response, and there is other bookkeeping of
+ * requests that are outstanding and have originated from this server
+ * (e.g., for setting the max outstanding requests) - we need to update this info when an
+ * outstanding request completes. Note that in the other case, the operation
+ * originated from a different server and there is no local bookkeeping or a local client
+ * session that needs to be notified.
+ */
+ topPending.setHdr(request.getHdr());
+ topPending.setTxn(request.getTxn());
+ topPending.zxid = request.zxid;
+ topPending.commitRecvTime = request.commitRecvTime;
+ request = topPending;
+ // Only decrement if we take a request off the queue.
+ numWriteQueuedRequests.decrementAndGet();
+ queuedWriteRequests.poll();
+ queuesToDrain.add(request.sessionId);
+ }
}
- }
+ /*
+ * Pull the request off the commit queue, now that we are going
+ * to process it.
+ */
+ committedRequests.remove();
+ commitsToProcess--;
+ commitsProcessed++;
- sendToNextProcessor(request);
- waitForEmptyPool();
+ // Process the write inline.
+ processWrite(request);
+
+ commitIsWaiting = !committedRequests.isEmpty();
+ }
+ ServerMetrics.getMetrics().WRITE_BATCH_TIME_IN_COMMIT_PROCESSOR.add(
+ Time.currentElapsedTime() - startWriteTime);
+ ServerMetrics.getMetrics().WRITES_ISSUED_IN_COMMIT_PROC.add(commitsProcessed);
/*
- * Process following reads if any, remove session queue if
+ * Process following reads if any, remove session queue(s) if
* empty.
*/
- if (sessionQueue != null) {
+ readsProcessed = 0;
+ for (Long sessionId : queuesToDrain) {
+ Deque<Request> sessionQueue = pendingRequests.get(sessionId);
int readsAfterWrite = 0;
while (!stopped && !sessionQueue.isEmpty()
&& !needCommit(sessionQueue.peek())) {
@@ -341,12 +399,15 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
readsAfterWrite++;
}
ServerMetrics.getMetrics().READS_AFTER_WRITE_IN_SESSION_QUEUE.add(readsAfterWrite);
+ readsProcessed += readsAfterWrite;
// Remove empty queues
if (sessionQueue.isEmpty()) {
- pendingRequests.remove(request.sessionId);
+ pendingRequests.remove(sessionId);
}
}
+ ServerMetrics.getMetrics().SESSION_QUEUES_DRAINED.add(queuesToDrain.size());
+ ServerMetrics.getMetrics().READ_ISSUED_FROM_SESSION_QUEUE.add(readsProcessed);
}
ServerMetrics.getMetrics().COMMIT_PROCESS_TIME.add(Time.currentElapsedTime() - time);
@@ -388,6 +449,8 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
workerShutdownTimeoutMS = Long.getLong(
ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT, 5000);
+ initBatchSizes();
+
LOG.info("Configuring CommitProcessor with "
+ (numWorkerThreads > 0 ? numWorkerThreads : "no")
+ " worker threads.");
@@ -409,6 +472,78 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
workerPool.schedule(new CommitWorkRequest(request), request.sessionId);
}
+ private void processWrite(Request request) throws RequestProcessorException {
+ processCommitMetrics(request, true);
+
+ long timeBeforeFinalProc = Time.currentElapsedTime();
+ nextProcessor.processRequest(request);
+ ServerMetrics.getMetrics().WRITE_FINAL_PROC_TIME.add(
+ Time.currentElapsedTime() - timeBeforeFinalProc);
+ }
+
+ private static void initBatchSizes() {
+ maxReadBatchSize = Integer.getInteger(
+ ZOOKEEPER_COMMIT_PROC_MAX_READ_BATCH_SIZE, -1);
+ maxCommitBatchSize = Integer.getInteger(
+ ZOOKEEPER_COMMIT_PROC_MAX_COMMIT_BATCH_SIZE, 1);
+
+ if (maxCommitBatchSize <= 0) {
+ String errorMsg = "maxCommitBatchSize must be positive, was " +
+ maxCommitBatchSize;
+ throw new IllegalArgumentException(errorMsg);
+ }
+
+ LOG.info("Configuring CommitProcessor with readBatchSize {} commitBatchSize {}",
+ maxReadBatchSize,
+ maxCommitBatchSize);
+ }
+
+ private static void processCommitMetrics(Request request, boolean isWrite) {
+ if (isWrite) {
+ if (request.commitProcQueueStartTime != -1 &&
+ request.commitRecvTime != -1) {
+ // Locally issued writes.
+ long currentTime = Time.currentElapsedTime();
+ ServerMetrics.getMetrics().WRITE_COMMITPROC_TIME.add(currentTime -
+ request.commitProcQueueStartTime);
+ ServerMetrics.getMetrics().LOCAL_WRITE_COMMITTED_TIME.add(currentTime -
+ request.commitRecvTime);
+ } else if (request.commitRecvTime != -1) {
+ // Writes issued by other servers.
+ ServerMetrics.getMetrics().SERVER_WRITE_COMMITTED_TIME.add(
+ Time.currentElapsedTime() - request.commitRecvTime);
+ }
+ } else {
+ if (request.commitProcQueueStartTime != -1) {
+ ServerMetrics.getMetrics().READ_COMMITPROC_TIME.add(
+ Time.currentElapsedTime() -
+ request.commitProcQueueStartTime);
+ }
+ }
+ }
+
+ public static int getMaxReadBatchSize() {
+ return maxReadBatchSize;
+ }
+
+ public static int getMaxCommitBatchSize() {
+ return maxCommitBatchSize;
+ }
+
+ public static void setMaxReadBatchSize(int size) {
+ maxReadBatchSize = size;
+ LOG.info("Configuring CommitProcessor with readBatchSize {}",
+ maxReadBatchSize);
+ }
+
+ public static void setMaxCommitBatchSize(int size) {
+ if (size > 0) {
+ maxCommitBatchSize = size;
+ LOG.info("Configuring CommitProcessor with commitBatchSize {}",
+ maxCommitBatchSize);
+ }
+ }
+
/**
* CommitWorkRequest is a small wrapper class to allow
* downstream processing to be run using the WorkerService
@@ -431,27 +566,7 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
public void doWork() throws RequestProcessorException {
try {
- if (needCommit(request)) {
- if (request.commitProcQueueStartTime != -1 &&
- request.commitRecvTime != -1) {
- // Locally issued writes.
- long currentTime = Time.currentElapsedTime();
- ServerMetrics.getMetrics().WRITE_COMMITPROC_TIME.add(currentTime -
- request.commitProcQueueStartTime);
- ServerMetrics.getMetrics().LOCAL_WRITE_COMMITTED_TIME.add(currentTime -
- request.commitRecvTime);
- } else if (request.commitRecvTime != -1) {
- // Writes issued by other servers.
- ServerMetrics.getMetrics().SERVER_WRITE_COMMITTED_TIME.add(
- Time.currentElapsedTime() - request.commitRecvTime);
- }
- } else {
- if (request.commitProcQueueStartTime != -1) {
- ServerMetrics.getMetrics().READ_COMMITPROC_TIME.add(
- Time.currentElapsedTime() -
- request.commitProcQueueStartTime);
- }
- }
+ processCommitMetrics(request, needCommit(request));
long timeBeforeFinalProc = Time.currentElapsedTime();
nextProcessor.processRequest(request);
@@ -508,6 +623,7 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
queuedRequests.add(request);
// If the request will block, add it to the queue of blocking requests
if (needCommit(request)) {
+ queuedWriteRequests.add(request);
numWriteQueuedRequests.incrementAndGet();
} else {
numReadQueuedRequests.incrementAndGet();
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
index 0e002b9..95889ef 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
@@ -62,6 +62,8 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
public void setUp() throws Exception {
processedRequests = new LinkedBlockingQueue<Request>();
processor = new MockCommitProcessor();
+ CommitProcessor.setMaxReadBatchSize(-1);
+ CommitProcessor.setMaxCommitBatchSize(1);
}
@After
@@ -148,6 +150,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
processor.committedRequests.add(writeReq);
processor.queuedRequests.add(readReq);
processor.queuedRequests.add(writeReq);
+ processor.queuedWriteRequests.add(writeReq);
processor.initThreads(1);
processor.stoppedMainLoop = true;
@@ -194,6 +197,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
Request readReq = newRequest(new GetDataRequest(path, false),
OpCode.getData, sessionId, sessionId + 2);
processor.queuedRequests.add(writeReq);
+ processor.queuedWriteRequests.add(writeReq);
processor.queuedRequests.add(readReq);
shouldNotBeProcessed.add(writeReq);
shouldNotBeProcessed.add(readReq);
@@ -232,6 +236,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
OpCode.create, 0x1, 1);
processor.queuedRequests.add(writeReq);
+ processor.queuedWriteRequests.add(writeReq);
shouldBeInPending.add(writeReq);
for (int readReqId = 2; readReqId <= 5; ++readReqId) {
@@ -249,6 +254,8 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
processedRequests.isEmpty());
Assert.assertTrue("Did not handled all of queuedRequests' requests",
processor.queuedRequests.isEmpty());
+ Assert.assertTrue("Removed from blockedQueuedRequests before commit",
+ !processor.queuedWriteRequests.isEmpty());
shouldBeInPending
.removeAll(processor.pendingRequests.get(writeReq.sessionId));
@@ -273,6 +280,155 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
processor.committedRequests.isEmpty());
Assert.assertTrue("Did not process committed request",
processor.pendingRequests.isEmpty());
+ Assert.assertTrue("Did not remove from blockedQueuedRequests",
+ processor.queuedWriteRequests.isEmpty());
+ }
+
+ /**
+ * In the following test, we add a write request followed by several read
+ * requests of the same session. We will do this for 2 sessions. For the
+ * second session, we will queue up another write after the reads, and
+ * we verify several things - 1. The writes are not processed until
+ * the commits arrive. 2. Only 2 writes are processed, with maxCommitBatchSize
+ * of 3, due to the blocking reads. 3. Once the writes are processed,
+ * all the read requests are processed as well. 4. All read requests are
+ * executed after the write, before any other write for that session,
+ * along with new reads. 5. Then we add another read for session 1, and
+ * another write and commit for session 2. 6. Only the old write, and the read
+ * are processed, leaving the commit in the queue. 7. Last write is executed
+ * in the last iteration, and all lists are empty.
+ */
+ @Test
+ public void processAllWritesMaxBatchSize()
+ throws Exception {
+ final String path = "/processAllWritesMaxBatchSize";
+ HashSet<Request> shouldBeProcessedAfterPending = new HashSet<Request>();
+
+ Request writeReq = newRequest(
+ new CreateRequest(path + "_1", new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+ OpCode.create, 0x1, 1);
+ processor.queuedRequests.add(writeReq);
+ processor.queuedWriteRequests.add(writeReq);
+
+ Request writeReq2 = newRequest(
+ new CreateRequest(path + "_2", new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+ OpCode.create, 0x2, 1);
+ processor.queuedRequests.add(writeReq2);
+ processor.queuedWriteRequests.add(writeReq2);
+
+ for (int readReqId = 2; readReqId <= 5; ++readReqId) {
+ Request readReq = newRequest(new GetDataRequest(path, false),
+ OpCode.getData, 0x1, readReqId);
+ Request readReq2 = newRequest(new GetDataRequest(path, false),
+ OpCode.getData, 0x2, readReqId);
+ processor.queuedRequests.add(readReq);
+ shouldBeProcessedAfterPending.add(readReq);
+ processor.queuedRequests.add(readReq2);
+ shouldBeProcessedAfterPending.add(readReq2);
+ }
+
+ Request writeReq3 = newRequest(
+ new CreateRequest(path + "_3", new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+ OpCode.create, 0x2, 6);
+ processor.queuedRequests.add(writeReq3);
+ processor.queuedWriteRequests.add(writeReq3);
+
+ processor.initThreads(defaultSizeOfThreadPool);
+
+ processor.stoppedMainLoop = true;
+ CommitProcessor.setMaxCommitBatchSize(2);
+ processor.run();
+ Assert.assertTrue("Processed without waiting for commit",
+ processedRequests.isEmpty());
+ Assert.assertTrue("Did not handled all of queuedRequests' requests",
+ processor.queuedRequests.isEmpty());
+ Assert.assertTrue("Removed from blockedQueuedRequests before commit",
+ !processor.queuedWriteRequests.isEmpty());
+ Assert.assertTrue("Missing session 1 in pending queue",
+ processor.pendingRequests.containsKey(writeReq.sessionId));
+ Assert.assertTrue("Missing session 2 in pending queue",
+ processor.pendingRequests.containsKey(writeReq2.sessionId));
+
+ processor.committedRequests.add(writeReq);
+ processor.committedRequests.add(writeReq2);
+ processor.committedRequests.add(writeReq3);
+ processor.stoppedMainLoop = true;
+ CommitProcessor.setMaxCommitBatchSize(3);
+ processor.run();
+ processor.initThreads(defaultSizeOfThreadPool);
+
+ Thread.sleep(500);
+ Assert.assertTrue("Did not process committed request",
+ processedRequests.peek() == writeReq);
+ Assert.assertTrue("Did not process following read request",
+ processedRequests.containsAll(shouldBeProcessedAfterPending));
+ Assert.assertTrue("Processed committed request",
+ !processor.committedRequests.isEmpty());
+ Assert.assertTrue("Removed commit for write req 3",
+ processor.committedRequests.peek() == writeReq3);
+ Assert.assertTrue("Processed committed request",
+ !processor.pendingRequests.isEmpty());
+ Assert.assertTrue("Missing session 2 in pending queue",
+ processor.pendingRequests.containsKey(writeReq3.sessionId));
+ Assert.assertTrue("Missing write 3 in pending queue",
+ processor.pendingRequests.get(writeReq3.sessionId).peek() == writeReq3);
+ Assert.assertTrue("Removed from blockedQueuedRequests",
+ !processor.queuedWriteRequests.isEmpty());
+ Assert.assertTrue("Removed write req 3 from blockedQueuedRequests",
+ processor.queuedWriteRequests.peek() == writeReq3);
+
+ Request readReq3 = newRequest(new GetDataRequest(path, false),
+ OpCode.getData, 0x1, 7);
+ processor.queuedRequests.add(readReq3);
+ shouldBeProcessedAfterPending.add(readReq3);
+ Request writeReq4 = newRequest(
+ new CreateRequest(path + "_4", new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+ OpCode.create, 0x2, 7);
+
+ processor.queuedRequests.add(writeReq4);
+ processor.queuedWriteRequests.add(writeReq4);
+ processor.committedRequests.add(writeReq4);
+
+ processor.stoppedMainLoop = true;
+ CommitProcessor.setMaxCommitBatchSize(3);
+ processor.run();
+ processor.initThreads(defaultSizeOfThreadPool);
+
+ Thread.sleep(500);
+ Assert.assertTrue("Did not process committed request",
+ processedRequests.peek() == writeReq);
+ Assert.assertTrue("Did not process following read request",
+ processedRequests.containsAll(shouldBeProcessedAfterPending));
+ Assert.assertTrue("Processed unexpected committed request",
+ !processor.committedRequests.isEmpty());
+ Assert.assertTrue("Unexpected pending request",
+ processor.pendingRequests.isEmpty());
+ Assert.assertTrue("Removed from blockedQueuedRequests",
+ !processor.queuedWriteRequests.isEmpty());
+ Assert.assertTrue("Removed write req 4 from blockedQueuedRequests",
+ processor.queuedWriteRequests.peek() == writeReq4);
+
+ processor.stoppedMainLoop = true;
+ CommitProcessor.setMaxCommitBatchSize(3);
+ processor.run();
+ processor.initThreads(defaultSizeOfThreadPool);
+
+ Thread.sleep(500);
+ Assert.assertTrue("Did not process committed request",
+ processedRequests.peek() == writeReq);
+ Assert.assertTrue("Did not process following read request",
+ processedRequests.containsAll(shouldBeProcessedAfterPending));
+ Assert.assertTrue("Did not process committed request",
+ processor.committedRequests.isEmpty());
+ Assert.assertTrue("Did not process committed request",
+ processor.pendingRequests.isEmpty());
+ Assert.assertTrue("Did not remove from blockedQueuedRequests",
+ processor.queuedWriteRequests.isEmpty());
+
}
/**
@@ -322,6 +478,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
OpCode.create, 0x3, 1);
processor.queuedRequests.add(firstCommittedReq);
+ processor.queuedWriteRequests.add(firstCommittedReq);
processor.committedRequests.add(firstCommittedReq);
Set<Request> allReads = new HashSet<Request>();
@@ -399,6 +556,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
OpCode.create, sessionid, readReqId++);
processor.queuedRequests.add(firstCommittedReq);
+ processor.queuedWriteRequests.add(firstCommittedReq);
localRequests.add(firstCommittedReq);
// queue read requests to queuedRequests
@@ -463,6 +621,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
OpCode.create, sessionid, lastCXid);
processor.queuedRequests.add(orphanCommittedReq);
+ processor.queuedWriteRequests.add(orphanCommittedReq);
localRequests.add(orphanCommittedReq);
// queue read requests to queuedRequests
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java
index 426aae9..f284052 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java
@@ -51,6 +51,10 @@ public class CommitProcessorMetricsTest extends ZKTestCase {
public void setup() {
LOG.info("setup");
ServerMetrics.getMetrics().resetAll();
+
+ // ensure no leaked parallelism properties
+ System.clearProperty("zookeeper.commitProcessor.maxReadBatchSize");
+ System.clearProperty("zookeeper.commitProcessor.maxCommitBatchSize");
}
public void setupProcessors(int commitWorkers, int finalProcTime ) {