You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by sh...@apache.org on 2017/11/03 04:05:10 UTC

zookeeper git commit: y

Repository: zookeeper
Updated Branches:
  refs/heads/master c510be675 -> 1392a8b3d


y

We wish to fix this long-standing issue in the code.
Note that the previous commit processor algorithm had the same approach (as the one suggested in this fix) when dealing with a request that has a different cxid than session's expected one (see [here](https://github.com/apache/zookeeper/commit/9fc632c4f0a340b0a00ec6dff39c7b454c802822#diff-5cc688a027068714af01b0ad4d292fe5L238)).

This fix is based on the code from https://github.com/apache/zookeeper/pull/167, following the discussion in https://issues.apache.org/jira/browse/ZOOKEEPER-2684 .

Author: Kfir Lev-Ari <kl...@apple.com>

Reviewers: Alexander Shraer <sh...@apache.org>, Abraham Fine <af...@apache.org>

Closes #411 from kfirlevari/ZOOKEEPER-2684


Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/1392a8b3
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/1392a8b3
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/1392a8b3

Branch: refs/heads/master
Commit: 1392a8b3d8ac3d042a51dac18e33e144674b1b5b
Parents: c510be6
Author: Kfir Lev-Ari <kl...@apple.com>
Authored: Thu Nov 2 21:04:56 2017 -0700
Committer: Alexander Shraer <as...@apple.com>
Committed: Thu Nov 2 21:04:56 2017 -0700

----------------------------------------------------------------------
 .../server/quorum/CommitProcessor.java          |  65 +++++++---
 .../quorum/CommitProcessorConcurrencyTest.java  | 126 +++++++++++++++++++
 2 files changed, 172 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/1392a8b3/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java b/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
index 66b12c6..7439c7e 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
@@ -246,8 +246,7 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
                     }
 
                     /*
-                     * Check if request is pending, if so, update it with the
-                     * committed info
+                     * Check if request is pending, if so, update it with the committed info
                      */
                     LinkedList<Request> sessionQueue = pendingRequests
                             .get(request.sessionId);
@@ -255,24 +254,52 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements
                         // If session queue != null, then it is also not empty.
                         Request topPending = sessionQueue.poll();
                         if (request.cxid != topPending.cxid) {
-                            LOG.error(
-                                    "Got cxid 0x"
-                                            + Long.toHexString(request.cxid)
-                                            + " expected 0x" + Long.toHexString(
-                                                    topPending.cxid)
-                                    + " for client session id "
-                                    + Long.toHexString(request.sessionId));
-                            throw new IOException("Error: unexpected cxid for"
-                                    + "client session");
+                            /*
+                             * TL;DR - we should not encounter this scenario often under normal load.
+                             * We pass the commit to the next processor and put the pending back with a warning.
+                             *
+                             * Generally, we can get commit requests that are not at the queue head after
+                             * a session moved (see ZOOKEEPER-2684). Let's denote the previous server of the session
+                             * with A, and the server that the session moved to with B (keep in mind that it is
+                             * possible that the session already moved from B to a new server C, and maybe C=A).
+                             * 1. If request.cxid < topPending.cxid : this means that the session requested this update
+                             * from A, then moved to B (i.e., which is us), and now B receives the commit
+                             * for the update after the session already performed several operations in B
+                             * (and therefore its cxid is higher than that old request).
+                             * 2. If request.cxid > topPending.cxid : this means that the session requested an updated
+                             * from B with cxid that is bigger than the one we know therefore in this case we
+                             * are A, and we lost the connection to the session. Given that we are waiting for a commit
+                             * for that update, it means that we already sent the request to the leader and it will
+                             * be committed at some point (in this case the order of cxid won't follow zxid, since zxid
+                             * is an increasing order). It is not safe for us to delete the session's queue at this
+                             * point, since it is possible that the session has newer requests in it after it moved
+                             * back to us. We just leave the queue as it is, and once the commit arrives (for the old
+                             * request), the finalRequestProcessor will see a closed cnxn handle, and just won't send a
+                             * response.
+                             * Also note that we don't have a local session, therefore we treat the request
+                             * like any other commit for a remote request, i.e., we perform the update without sending
+                             * a response.
+                             */
+                            LOG.warn("Got request " + request +
+                                    " but we are expecting request " + topPending);
+                            sessionQueue.addFirst(topPending);
+                        } else {
+                            /*
+                             * Generally, we want to send to the next processor our version of the request,
+                             * since it contains the session information that is needed for post update processing.
+                             * In more details, when a request is in the local queue, there is (or could be) a client
+                             * attached to this server waiting for a response, and there is other bookkeeping of
+                             * requests that are outstanding and have originated from this server
+                             * (e.g., for setting the max outstanding requests) - we need to update this info when an
+                             * outstanding request completes. Note that in the other case (above), the operation
+                             * originated from a different server and there is no local bookkeeping or a local client
+                             * session that needs to be notified.
+                             */
+                            topPending.setHdr(request.getHdr());
+                            topPending.setTxn(request.getTxn());
+                            topPending.zxid = request.zxid;
+                            request = topPending;
                         }
-                        /*
-                         * We want to send our version of the request. the
-                         * pointer to the connection in the request
-                         */
-                        topPending.setHdr(request.getHdr());
-                        topPending.setTxn(request.getTxn());
-                        topPending.zxid = request.zxid;
-                        request = topPending;
                     }
 
                     sendToNextProcessor(request);

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/1392a8b3/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java b/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
index a1ca7ca..0e002b9 100644
--- a/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
+++ b/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
@@ -108,6 +108,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
 
                 @Override
                 public void notifyStopping(String threadName, int errorCode) {
+                    Assert.fail("Commit processor crashed " + errorCode);
                 }
             });
         }
@@ -376,4 +377,129 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
                     !processedRequests.contains(r));
         }
     }
+
+    /**
+     * In the following test, we verify that we can handle the case that we got a commit
+     * of a request we never seen since the session that we just established. This can happen
+     * when a session is just established and there is request waiting to be committed in the
+     * session queue but it sees a commit for a request that belongs to the previous connection.
+     */
+    @Test(timeout = 5000)
+    public void noCrashOnCommittedRequestsOfUnseenRequestTest() throws Exception {
+        final String path = "/noCrash/OnCommittedRequests/OfUnseenRequestTest";
+        final int numberofReads = 10;
+        final int sessionid = 0x123456;
+        final int firstCXid = 0x100;
+        int readReqId = firstCXid;
+        processor.stoppedMainLoop = true;
+        HashSet<Request> localRequests = new HashSet<Request>();
+        // queue the blocking write request to queuedRequests
+        Request firstCommittedReq = newRequest(
+                new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+                OpCode.create, sessionid, readReqId++);
+        processor.queuedRequests.add(firstCommittedReq);
+        localRequests.add(firstCommittedReq);
+
+        // queue read requests to queuedRequests
+        for (; readReqId <= numberofReads+firstCXid; ++readReqId) {
+            Request readReq = newRequest(new GetDataRequest(path, false),
+                    OpCode.getData, sessionid, readReqId);
+            processor.queuedRequests.add(readReq);
+            localRequests.add(readReq);
+        }
+
+        //run once
+        Assert.assertTrue(processor.queuedRequests.containsAll(localRequests));
+        processor.initThreads(defaultSizeOfThreadPool);
+        processor.run();
+        Thread.sleep(1000);
+
+        //We verify that the processor is waiting for the commit
+        Assert.assertTrue(processedRequests.isEmpty());
+
+        // We add a commit that belongs to the same session but with smaller cxid,
+        // i.e., commit of an update from previous connection of this session.
+        Request preSessionCommittedReq = newRequest(
+                new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+                OpCode.create, sessionid, firstCXid - 2);
+        processor.committedRequests.add(preSessionCommittedReq);
+        processor.committedRequests.add(firstCommittedReq);
+        processor.run();
+        Thread.sleep(1000);
+
+        //We verify that the commit processor processed the old commit prior to the newer messages
+        Assert.assertTrue(processedRequests.peek() == preSessionCommittedReq);
+
+        processor.run();
+        Thread.sleep(1000);
+
+        //We verify that the commit processor handle all messages.
+        Assert.assertTrue(processedRequests.containsAll(localRequests));
+    }
+
+    /**
+     * In the following test, we verify if we handle the case in which we get a commit
+     * for a request that has higher Cxid than the one we are waiting. This can happen
+     * when a session connection is lost but there is a request waiting to be committed in the
+     * session queue. However, since the session has moved, new requests can get to
+     * the leader out of order. Hence, the commits can also arrive "out of order" w.r.t. cxid.
+     * We should commit the requests according to the order we receive from the leader, i.e., wait for the relevant commit.
+     */
+    @Test(timeout = 5000)
+    public void noCrashOnOutofOrderCommittedRequestTest() throws Exception {
+        final String path = "/noCrash/OnCommittedRequests/OfUnSeenRequestTest";
+        final int sessionid = 0x123456;
+        final int lastCXid = 0x100;
+        final int numberofReads = 10;
+        int readReqId = lastCXid;
+        processor.stoppedMainLoop = true;
+        HashSet<Request> localRequests = new HashSet<Request>();
+
+        // queue the blocking write request to queuedRequests
+        Request orphanCommittedReq = newRequest(
+                new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+                OpCode.create, sessionid, lastCXid);
+        processor.queuedRequests.add(orphanCommittedReq);
+        localRequests.add(orphanCommittedReq);
+
+        // queue read requests to queuedRequests
+        for (; readReqId <= numberofReads+lastCXid; ++readReqId) {
+            Request readReq = newRequest(new GetDataRequest(path, false),
+                    OpCode.getData, sessionid, readReqId);
+            processor.queuedRequests.add(readReq);
+            localRequests.add(readReq);
+        }
+
+        //run once
+        processor.initThreads(defaultSizeOfThreadPool);
+        processor.run();
+        Thread.sleep(1000);
+
+        //We verify that the processor is waiting for the commit
+        Assert.assertTrue(processedRequests.isEmpty());
+
+        // We add a commit that belongs to the same session but with larger cxid,
+        // i.e., commit of an update from the next connection of this session.
+        Request otherSessionCommittedReq = newRequest(
+                new CreateRequest(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT_SEQUENTIAL.toFlag()),
+                OpCode.create, sessionid, lastCXid+10);
+        processor.committedRequests.add(otherSessionCommittedReq);
+        processor.committedRequests.add(orphanCommittedReq);
+        processor.run();
+        Thread.sleep(1000);
+
+        //We verify that the commit processor processed the old commit prior to the newer messages
+        Assert.assertTrue(processedRequests.size() == 1);
+        Assert.assertTrue(processedRequests.contains(otherSessionCommittedReq));
+
+        processor.run();
+        Thread.sleep(1000);
+
+        //We verify that the commit processor handle all messages.
+        Assert.assertTrue(processedRequests.containsAll(localRequests));
+    }
 }
\ No newline at end of file