You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ca...@apache.org on 2014/07/15 23:54:33 UTC
svn commit: r1610861 - in /zookeeper/trunk: CHANGES.txt
src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
Author: camille
Date: Tue Jul 15 21:54:32 2014
New Revision: 1610861
URL: http://svn.apache.org/r1610861
Log:
ZOOKEEPER-1863. Race condition in commit processor leading to out of order request completion, xid mismatch on client. (fpj and Dutch T Meyer via camille)
Added:
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java (with props)
Modified:
zookeeper/trunk/CHANGES.txt
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1610861&r1=1610860&r2=1610861&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Tue Jul 15 21:54:32 2014
@@ -690,6 +690,9 @@ BUGFIXES:
stat is not null (Michi Mutsuzaki via rakeshr)
ZOOKEEPER-1964. Fix Flaky Test in ReconfigTest.java (Hongchao Deng via fpj)
+
+ ZOOKEEPER-1863. Race condition in commit processor leading to out of order
+ request completion, xid mismatch on client. (fpj and Dutch T Meyer via camille)
IMPROVEMENTS:
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java?rev=1610861&r1=1610860&r2=1610861&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java Tue Jul 15 21:54:32 2014
@@ -77,30 +77,30 @@ public class CommitProcessor extends Zoo
/**
* Requests that we are holding until the commit comes in.
*/
- private final LinkedBlockingQueue<Request> queuedRequests =
+ protected final LinkedBlockingQueue<Request> queuedRequests =
new LinkedBlockingQueue<Request>();
/**
* Requests that have been committed.
*/
- private final LinkedBlockingQueue<Request> committedRequests =
+ protected final LinkedBlockingQueue<Request> committedRequests =
new LinkedBlockingQueue<Request>();
/** Request for which we are currently awaiting a commit */
- private final AtomicReference<Request> nextPending =
+ protected final AtomicReference<Request> nextPending =
new AtomicReference<Request>();
/** Request currently being committed (ie, sent off to next processor) */
private final AtomicReference<Request> currentlyCommitting =
new AtomicReference<Request>();
/** The number of requests currently being processed */
- private AtomicInteger numRequestsProcessing = new AtomicInteger(0);
+ protected AtomicInteger numRequestsProcessing = new AtomicInteger(0);
RequestProcessor nextProcessor;
- private volatile boolean stopped = true;
+ protected volatile boolean stopped = true;
private long workerShutdownTimeoutMS;
- private WorkerService workerPool;
+ protected WorkerService workerPool;
/**
* This flag indicates whether we need to wait for a response to come back from the
@@ -181,37 +181,7 @@ public class CommitProcessor extends Zoo
* came in for the pending request. We can only commit a
* request when there is no other request being processed.
*/
- if (!stopped && !isProcessingRequest() &&
- (request = committedRequests.poll()) != null) {
- /*
- * We match with nextPending so that we can move to the
- * next request when it is committed. We also want to
- * use nextPending because it has the cnxn member set
- * properly.
- */
- Request pending = nextPending.get();
- if (pending != null &&
- pending.sessionId == request.sessionId &&
- pending.cxid == request.cxid) {
- // we want to send our version of the request.
- // the pointer to the connection in the request
- pending.setHdr(request.getHdr());
- pending.setTxn(request.getTxn());
- pending.zxid = request.zxid;
- // Set currentlyCommitting so we will block until this
- // completes. Cleared by CommitWorkRequest after
- // nextProcessor returns.
- currentlyCommitting.set(pending);
- nextPending.set(null);
- sendToNextProcessor(pending);
- } else {
- // this request came from someone else so just
- // send the commit packet
- currentlyCommitting.set(request);
- sendToNextProcessor(request);
- }
- }
-
+ processCommitted();
}
} catch (InterruptedException e) {
LOG.warn("Interrupted exception while waiting", e);
@@ -221,6 +191,56 @@ public class CommitProcessor extends Zoo
LOG.info("CommitProcessor exited loop!");
}
+ /*
+ * Separated this method from the main run loop
+ * for test purposes (ZOOKEEPER-1863)
+ */
+ protected void processCommitted() {
+ Request request;
+
+ if (!stopped && !isProcessingRequest() &&
+ (committedRequests.peek() != null)) {
+
+ /*
+ * ZOOKEEPER-1863: continue only if there is no new request
+ * waiting in queuedRequests or it is waiting for a
+ * commit.
+ */
+ if ( !isWaitingForCommit() && !queuedRequests.isEmpty()) {
+ return;
+ }
+ request = committedRequests.poll();
+
+ /*
+ * We match with nextPending so that we can move to the
+ * next request when it is committed. We also want to
+ * use nextPending because it has the cnxn member set
+ * properly.
+ */
+ Request pending = nextPending.get();
+ if (pending != null &&
+ pending.sessionId == request.sessionId &&
+ pending.cxid == request.cxid) {
+ // we want to send our version of the request.
+ // the pointer to the connection in the request
+ pending.setHdr(request.getHdr());
+ pending.setTxn(request.getTxn());
+ pending.zxid = request.zxid;
+ // Set currentlyCommitting so we will block until this
+ // completes. Cleared by CommitWorkRequest after
+ // nextProcessor returns.
+ currentlyCommitting.set(pending);
+ nextPending.set(null);
+ sendToNextProcessor(pending);
+ } else {
+ // this request came from someone else so just
+ // send the commit packet
+ currentlyCommitting.set(request);
+ sendToNextProcessor(request);
+ }
+ }
+ }
+
@Override
public void start() {
int numCores = Runtime.getRuntime().availableProcessors();
Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java?rev=1610861&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java Tue Jul 15 21:54:32 2014
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.proto.GetDataRequest;
+import org.apache.zookeeper.proto.SyncRequest;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.WorkerService;
+import org.apache.zookeeper.server.RequestProcessor.RequestProcessorException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CommitProcessorConcurrencyTest {
+ protected static final Logger LOG =
+ LoggerFactory.getLogger(CommitProcessorConcurrencyTest.class);
+
+ Boolean executedFlag = false;
+ MockCommitProcessor processor;
+
+ @Before
+ public void setUp() throws Exception {
+ processor = new MockCommitProcessor();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ processor.shutdown();
+ }
+
+ class MockCommitProcessor extends CommitProcessor {
+
+ MockCommitProcessor() {
+ super(
+ new RequestProcessor() {
+ public void processRequest(Request request)
+ throws RequestProcessorException {
+ executedFlag = true;
+ }
+ public void shutdown(){}
+ },
+ "0",
+ false);
+ }
+
+ public void testStart() {
+ this.stopped = false;
+ this.workerPool = new WorkerService(
+ "CommitProcWork", 1, true);
+ }
+
+ public void addToCommittedRequests(Request req) {
+ this.committedRequests.add(req);
+ }
+
+ public void addToNextPending(Request req) {
+ this.nextPending.set(req);
+ }
+
+ public void addToQueuedRequests(Request req) {
+ //this.numRequestsProcessing.incrementAndGet();
+ this.queuedRequests.add(req);
+ }
+
+ public void testProcessCommitted() {
+ this.processCommitted();
+ }
+
+ @Override
+ public void shutdown() {
+ this.workerPool.stop();
+ }
+ }
+
+ /*
+ * We populate the necessary data structures in the CommitProcessor
+ * instance and run processCommitted
+ */
+ @Test
+ public void raceTest()
+ throws Exception {
+
+ ByteArrayOutputStream boas = new ByteArrayOutputStream();
+ BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas);
+ GetDataRequest getReq = new GetDataRequest("/testrace", false);
+ getReq.serialize(boa, "request");
+ ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
+ Request readReq = new Request(null, 0x0, 0, OpCode.getData,
+ bb, new ArrayList<Id>());
+
+ boas.reset();
+ SyncRequest syncReq = new SyncRequest("/testrace");
+ syncReq.serialize(boa, "request");
+ bb = ByteBuffer.wrap(boas.toByteArray());
+ Request writeReq = new Request(null, 0x0, 0, OpCode.sync,
+ bb, new ArrayList<Id>());
+
+ processor.addToCommittedRequests(writeReq);
+ processor.addToQueuedRequests(readReq);
+ processor.addToQueuedRequests(writeReq);
+
+ processor.testStart();
+ processor.testProcessCommitted();
+ Assert.assertFalse("Next request processor executed", executedFlag);
+ }
+}
\ No newline at end of file
Propchange: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain