You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ca...@apache.org on 2014/07/15 23:54:33 UTC

svn commit: r1610861 - in /zookeeper/trunk: CHANGES.txt src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java

Author: camille
Date: Tue Jul 15 21:54:32 2014
New Revision: 1610861

URL: http://svn.apache.org/r1610861
Log:
ZOOKEEPER-1863. Race condition in commit processor leading to out of order request completion, xid mismatch on client. (fpj and Dutch T Meyer via camille)

Added:
    zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java   (with props)
Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1610861&r1=1610860&r2=1610861&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Tue Jul 15 21:54:32 2014
@@ -690,6 +690,9 @@ BUGFIXES:
   stat is not null (Michi Mutsuzaki via rakeshr)
 
   ZOOKEEPER-1964. Fix Flaky Test in ReconfigTest.java (Hongchao Deng via fpj)
+  
+  ZOOKEEPER-1863. Race condition in commit processor leading to out of order 
+  request completion, xid mismatch on client. (fpj and Dutch T Meyer via camille)
 
 IMPROVEMENTS:
 

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java?rev=1610861&r1=1610860&r2=1610861&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java Tue Jul 15 21:54:32 2014
@@ -77,30 +77,30 @@ public class CommitProcessor extends Zoo
     /**
      * Requests that we are holding until the commit comes in.
      */
-    private final LinkedBlockingQueue<Request> queuedRequests =
+    protected final LinkedBlockingQueue<Request> queuedRequests =
         new LinkedBlockingQueue<Request>();
 
     /**
      * Requests that have been committed.
      */
-    private final LinkedBlockingQueue<Request> committedRequests =
+    protected final LinkedBlockingQueue<Request> committedRequests =
         new LinkedBlockingQueue<Request>();
 
     /** Request for which we are currently awaiting a commit */
-    private final AtomicReference<Request> nextPending =
+    protected final AtomicReference<Request> nextPending =
         new AtomicReference<Request>();
     /** Request currently being committed (ie, sent off to next processor) */
     private final AtomicReference<Request> currentlyCommitting =
         new AtomicReference<Request>();
 
     /** The number of requests currently being processed */
-    private AtomicInteger numRequestsProcessing = new AtomicInteger(0);
+    protected AtomicInteger numRequestsProcessing = new AtomicInteger(0);
 
     RequestProcessor nextProcessor;
 
-    private volatile boolean stopped = true;
+    protected volatile boolean stopped = true;
     private long workerShutdownTimeoutMS;
-    private WorkerService workerPool;
+    protected WorkerService workerPool;
 
     /**
      * This flag indicates whether we need to wait for a response to come back from the
@@ -181,37 +181,7 @@ public class CommitProcessor extends Zoo
                  * came in for the pending request. We can only commit a
                  * request when there is no other request being processed.
                  */
-                if (!stopped && !isProcessingRequest() &&
-                    (request = committedRequests.poll()) != null) {
-                    /*
-                     * We match with nextPending so that we can move to the
-                     * next request when it is committed. We also want to
-                     * use nextPending because it has the cnxn member set
-                     * properly.
-                     */
-                    Request pending = nextPending.get();
-                    if (pending != null &&
-                        pending.sessionId == request.sessionId &&
-                        pending.cxid == request.cxid) {
-                        // we want to send our version of the request.
-                        // the pointer to the connection in the request
-                        pending.setHdr(request.getHdr());
-                        pending.setTxn(request.getTxn());
-                        pending.zxid = request.zxid;
-                        // Set currentlyCommitting so we will block until this
-                        // completes. Cleared by CommitWorkRequest after
-                        // nextProcessor returns.
-                        currentlyCommitting.set(pending);
-                        nextPending.set(null);
-                        sendToNextProcessor(pending);
-                    } else {
-                        // this request came from someone else so just
-                        // send the commit packet
-                        currentlyCommitting.set(request);
-                        sendToNextProcessor(request);
-                    }
-                }
-
+                processCommitted();
             }
         } catch (InterruptedException e) {
             LOG.warn("Interrupted exception while waiting", e);
@@ -221,6 +191,56 @@ public class CommitProcessor extends Zoo
         LOG.info("CommitProcessor exited loop!");
     }
 
+    /*
+     * Separated this method from the main run loop
+     * for test purposes (ZOOKEEPER-1863)
+     */
+    protected void processCommitted() {
+        Request request;
+
+        if (!stopped && !isProcessingRequest() &&
+                (committedRequests.peek() != null)) {
+
+            /*
+             * ZOOKEEPER-1863: continue only if there is no new request
+             * waiting in queuedRequests or it is waiting for a
+             * commit. 
+             */
+            if ( !isWaitingForCommit() && !queuedRequests.isEmpty()) {
+                return;
+            }
+            request = committedRequests.poll();
+
+            /*
+             * We match with nextPending so that we can move to the
+             * next request when it is committed. We also want to
+             * use nextPending because it has the cnxn member set
+             * properly.
+             */
+            Request pending = nextPending.get();
+            if (pending != null &&
+                pending.sessionId == request.sessionId &&
+                pending.cxid == request.cxid) {
+                // we want to send our version of the request.
+                // the pointer to the connection in the request
+                pending.setHdr(request.getHdr());
+                pending.setTxn(request.getTxn());
+                pending.zxid = request.zxid;
+                // Set currentlyCommitting so we will block until this
+                // completes. Cleared by CommitWorkRequest after
+                // nextProcessor returns.
+                currentlyCommitting.set(pending);
+                nextPending.set(null);
+                sendToNextProcessor(pending);
+            } else {
+                // this request came from someone else so just
+                // send the commit packet
+                currentlyCommitting.set(request);
+                sendToNextProcessor(request);
+            }
+        }      
+    }
+
     @Override
     public void start() {
         int numCores = Runtime.getRuntime().availableProcessors();

Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java?rev=1610861&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java Tue Jul 15 21:54:32 2014
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.proto.GetDataRequest;
+import org.apache.zookeeper.proto.SyncRequest;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.WorkerService;
+import org.apache.zookeeper.server.RequestProcessor.RequestProcessorException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CommitProcessorConcurrencyTest {
+    protected static final Logger LOG =
+            LoggerFactory.getLogger(CommitProcessorConcurrencyTest.class);
+
+    Boolean executedFlag = false;
+    MockCommitProcessor processor;
+
+    @Before
+    public void setUp() throws Exception {
+        processor = new MockCommitProcessor();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        processor.shutdown();
+    }
+
+    class MockCommitProcessor extends CommitProcessor {
+
+        MockCommitProcessor() {
+          super( 
+                  new RequestProcessor() {
+                      public void processRequest(Request request) 
+                              throws RequestProcessorException {
+                          executedFlag = true;
+                      }
+                      public void shutdown(){}
+          },
+          "0",
+          false);
+        }
+
+        public void testStart() {
+            this.stopped = false;
+            this.workerPool = new WorkerService(
+                    "CommitProcWork", 1, true);
+        }
+
+        public void addToCommittedRequests(Request req) {
+            this.committedRequests.add(req);
+        }
+
+        public void addToNextPending(Request req) {
+            this.nextPending.set(req);
+        }
+
+        public void addToQueuedRequests(Request req) {
+            //this.numRequestsProcessing.incrementAndGet();
+            this.queuedRequests.add(req);
+        }
+
+        public void testProcessCommitted() {
+            this.processCommitted();
+        }
+
+        @Override
+        public void shutdown() {
+            this.workerPool.stop();
+        }
+    }
+
+    /*
+     * We populate the necessary data structures in the CommitProcessor
+     * instance and run processCommitted
+     */
+    @Test
+    public void raceTest() 
+    throws Exception {
+
+       ByteArrayOutputStream boas = new ByteArrayOutputStream();
+       BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas);
+       GetDataRequest getReq = new GetDataRequest("/testrace", false);
+       getReq.serialize(boa, "request");
+       ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
+       Request readReq = new Request(null, 0x0, 0, OpCode.getData,
+               bb, new ArrayList<Id>());
+
+       boas.reset();
+       SyncRequest syncReq = new SyncRequest("/testrace");
+       syncReq.serialize(boa, "request");
+       bb = ByteBuffer.wrap(boas.toByteArray());
+       Request writeReq = new Request(null, 0x0, 0, OpCode.sync,
+                                 bb, new ArrayList<Id>());
+
+       processor.addToCommittedRequests(writeReq);
+       processor.addToQueuedRequests(readReq);
+       processor.addToQueuedRequests(writeReq);
+
+       processor.testStart();
+       processor.testProcessCommitted();
+       Assert.assertFalse("Next request processor executed", executedFlag);
+    }
+}
\ No newline at end of file

Propchange: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain