You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by sh...@apache.org on 2017/11/03 04:05:10 UTC
zookeeper git commit: y
Repository: zookeeper
Updated Branches:
refs/heads/master c510be675 -> 1392a8b3d
y
We wish to fix this long-standing issue in the code.
Note that the previous commit processor algorithm had the same approach (as the one suggested in this fix) when dealing with a request that has a different cxid than session's expected one (see [here](https://github.com/apache/zookeeper/commit/9fc632c4f0a340b0a00ec6dff39c7b454c802822#diff-5cc688a027068714af01b0ad4d292fe5L238)).
This fix is based on the code from https://github.com/apache/zookeeper/pull/167, following the discussion in https://issues.apache.org/jira/browse/ZOOKEEPER-2684 .
Author: Kfir Lev-Ari <kl...@apple.com>
Reviewers: Alexander Shraer <sh...@apache.org>, Abraham Fine <af...@apache.org>
Closes #411 from kfirlevari/ZOOKEEPER-2684
Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/1392a8b3
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/1392a8b3
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/1392a8b3
Branch: refs/heads/master
Commit: 1392a8b3d8ac3d042a51dac18e33e144674b1b5b
Parents: c510be6
Author: Kfir Lev-Ari <kl...@apple.com>
Authored: Thu Nov 2 21:04:56 2017 -0700
Committer: Alexander Shraer <as...@apple.com>
Committed: Thu Nov 2 21:04:56 2017 -0700
----------------------------------------------------------------------
.../server/quorum/CommitProcessor.java | 65 +++++++---
.../quorum/CommitProcessorConcurrencyTest.java | 126 +++++++++++++++++++
2 files changed, 172 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/1392a8b3/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java b/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
index 66b12c6..7439c7e 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
@@ -246,8 +246,7 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
}
/*
- * Check if request is pending, if so, update it with the
- * committed info
+ * Check if request is pending, if so, update it with the committed info
*/
LinkedList<Request> sessionQueue = pendingRequests
.get(request.sessionId);
@@ -255,24 +254,52 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
// If session queue != null, then it is also not empty.
Request topPending = sessionQueue.poll();
if (request.cxid != topPending.cxid) {
- LOG.error(
- "Got cxid 0x"
- + Long.toHexString(request.cxid)
- + " expected 0x" + Long.toHexString(
- topPending.cxid)
- + " for client session id "
- + Long.toHexString(request.sessionId));
- throw new IOException("Error: unexpected cxid for"
- + "client session");
+ /*
+ * TL;DR - we should not encounter this scenario often under normal load.
+ * We pass the commit to the next processor and put the pending back with a warning.
+ *
+ * Generally, we can get commit requests that are not at the queue head after
+ * a session moved (see ZOOKEEPER-2684). Let's denote the previous server of the session
+ * with A, and the server that the session moved to with B (keep in mind that it is
+ * possible that the session already moved from B to a new server C, and maybe C=A).
+ * 1. If request.cxid < topPending.cxid : this means that the session requested this update
+ * from A, then moved to B (i.e., which is us), and now B receives the commit
+ * for the update after the session already performed several operations in B
+ * (and therefore its cxid is higher than that old request).
+ * 2. If request.cxid > topPending.cxid : this means that the session requested an updated
+ * from B with cxid that is bigger than the one we know therefore in this case we
+ * are A, and we lost the connection to the session. Given that we are waiting for a commit
+ * for that update, it means that we already sent the request to the leader and it will
+ * be committed at some point (in this case the order of cxid won't follow zxid, since zxid
+ * is an increasing order). It is not safe for us to delete the session's queue at this
+ * point, since it is possible that the session has newer requests in it after it moved
+ * back to us. We just leave the queue as it is, and once the commit arrives (for the old
+ * request), the finalRequestProcessor will see a closed cnxn handle, and just won't send a
+ * response.
+ * Also note that we don't have a local session, therefore we treat the request
+ * like any other commit for a remote request, i.e., we perform the update without sending
+ * a response.
+ */
+ LOG.warn("Got request " + request +
+ " but we are expecting request " + topPending);
+ sessionQueue.addFirst(topPending);
+ } else {
+ /*
+ * Generally, we want to send to the next processor our version of the request,
+ * since it contains the session information that is needed for post update processing.
+ * In more details, when a request is in the local queue, there is (or could be) a client
+ * attached to this server waiting for a response, and there is other bookkeeping of
+ * requests that are outstanding and have originated from this server
+ * (e.g., for setting the max outstanding requests) - we need to update this info when an
+ * outstanding request completes. Note that in the other case (above), the operation
+ * originated from a different server and there is no local bookkeeping or a local client
+ * session that needs to be notified.
+ */
+ topPending.setHdr(request.getHdr());
+ topPending.setTxn(request.getTxn());
+ topPending.zxid = request.zxid;
+ request = topPending;
}
- /*
- * We want to send our version of the request. the
- * pointer to the connection in the request
- */
- topPending.setHdr(request.getHdr());
- topPending.setTxn(request.getTxn());
- topPending.zxid = request.zxid;
- request = topPending;
}
sendToNextProcessor(request);
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/1392a8b3/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java b/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
index a1ca7ca..0e002b9 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
@@ -108,6 +108,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
@Override
public void notifyStopping(String threadName, int errorCode) {
+ Assert.fail("Commit processor crashed " + errorCode);
}
});
}
@@ -376,4 +377,129 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
!processedRequests.contains(r));
}
}
+
+ /**
+ * In the following test, we verify that we can handle the case that we got a commit
+ * of a request we never seen since the session that we just established. This can happen
+ * when a session is just established and there is request waiting to be committed in the
+ * session queue but it sees a commit for a request that belongs to the previous connection.
+ */
+ @Test(timeout = 5000)
+ public void noCrashOnCommittedRequestsOfUnseenRequestTest() throws Exception {
+ final String path = "/noCrash/OnCommittedRequests/OfUnseenRequestTest";
+ final int numberofReads = 10;
+ final int sessionid = 0x123456;
+ final int firstCXid = 0x100;
+ int readReqId = firstCXid;
+ processor.stoppedMainLoop = true;
+ HashSet<Request> localRequests = new HashSet<Request>();
+ // queue the blocking write request to queuedRequests
+ Request firstCommittedReq = newRequest(
+ new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+ OpCode.create, sessionid, readReqId++);
+ processor.queuedRequests.add(firstCommittedReq);
+ localRequests.add(firstCommittedReq);
+
+ // queue read requests to queuedRequests
+ for (; readReqId <= numberofReads+firstCXid; ++readReqId) {
+ Request readReq = newRequest(new GetDataRequest(path, false),
+ OpCode.getData, sessionid, readReqId);
+ processor.queuedRequests.add(readReq);
+ localRequests.add(readReq);
+ }
+
+ //run once
+ Assert.assertTrue(processor.queuedRequests.containsAll(localRequests));
+ processor.initThreads(defaultSizeOfThreadPool);
+ processor.run();
+ Thread.sleep(1000);
+
+ //We verify that the processor is waiting for the commit
+ Assert.assertTrue(processedRequests.isEmpty());
+
+ // We add a commit that belongs to the same session but with smaller cxid,
+ // i.e., commit of an update from previous connection of this session.
+ Request preSessionCommittedReq = newRequest(
+ new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+ OpCode.create, sessionid, firstCXid - 2);
+ processor.committedRequests.add(preSessionCommittedReq);
+ processor.committedRequests.add(firstCommittedReq);
+ processor.run();
+ Thread.sleep(1000);
+
+ //We verify that the commit processor processed the old commit prior to the newer messages
+ Assert.assertTrue(processedRequests.peek() == preSessionCommittedReq);
+
+ processor.run();
+ Thread.sleep(1000);
+
+ //We verify that the commit processor handle all messages.
+ Assert.assertTrue(processedRequests.containsAll(localRequests));
+ }
+
+ /**
+ * In the following test, we verify if we handle the case in which we get a commit
+ * for a request that has higher Cxid than the one we are waiting. This can happen
+ * when a session connection is lost but there is a request waiting to be committed in the
+ * session queue. However, since the session has moved, new requests can get to
+ * the leader out of order. Hence, the commits can also arrive "out of order" w.r.t. cxid.
+ * We should commit the requests according to the order we receive from the leader, i.e., wait for the relevant commit.
+ */
+ @Test(timeout = 5000)
+ public void noCrashOnOutofOrderCommittedRequestTest() throws Exception {
+ final String path = "/noCrash/OnCommittedRequests/OfUnSeenRequestTest";
+ final int sessionid = 0x123456;
+ final int lastCXid = 0x100;
+ final int numberofReads = 10;
+ int readReqId = lastCXid;
+ processor.stoppedMainLoop = true;
+ HashSet<Request> localRequests = new HashSet<Request>();
+
+ // queue the blocking write request to queuedRequests
+ Request orphanCommittedReq = newRequest(
+ new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+ OpCode.create, sessionid, lastCXid);
+ processor.queuedRequests.add(orphanCommittedReq);
+ localRequests.add(orphanCommittedReq);
+
+ // queue read requests to queuedRequests
+ for (; readReqId <= numberofReads+lastCXid; ++readReqId) {
+ Request readReq = newRequest(new GetDataRequest(path, false),
+ OpCode.getData, sessionid, readReqId);
+ processor.queuedRequests.add(readReq);
+ localRequests.add(readReq);
+ }
+
+ //run once
+ processor.initThreads(defaultSizeOfThreadPool);
+ processor.run();
+ Thread.sleep(1000);
+
+ //We verify that the processor is waiting for the commit
+ Assert.assertTrue(processedRequests.isEmpty());
+
+ // We add a commit that belongs to the same session but with larger cxid,
+ // i.e., commit of an update from the next connection of this session.
+ Request otherSessionCommittedReq = newRequest(
+ new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+ OpCode.create, sessionid, lastCXid+10);
+ processor.committedRequests.add(otherSessionCommittedReq);
+ processor.committedRequests.add(orphanCommittedReq);
+ processor.run();
+ Thread.sleep(1000);
+
+ //We verify that the commit processor processed the old commit prior to the newer messages
+ Assert.assertTrue(processedRequests.size() == 1);
+ Assert.assertTrue(processedRequests.contains(otherSessionCommittedReq));
+
+ processor.run();
+ Thread.sleep(1000);
+
+ //We verify that the commit processor handle all messages.
+ Assert.assertTrue(processedRequests.containsAll(localRequests));
+ }
}
\ No newline at end of file