You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2017/07/25 17:22:27 UTC
[2/2] kudu git commit: KUDU-2053. Fix race in Java RequestTracker
KUDU-2053. Fix race in Java RequestTracker
The implementation of RequestTracker.newSeqNo() was previously
implemented as:
1. allocate the new sequence number
2. add that sequence number to the 'incomplete' map
These steps were individually thread-safe by using an AtomicLong and a
thread-safe collection, respectively, but they were not performed
atomically. Thus we could have the following race:
T1: allocate seq number 1
T2: allocate seq number 2
T2: add seq number 2 to incompleteRpcs
T2: ask for firstIncomplete() -> 2
T2: send an RPC
--> server GCs seqnum < 2
T1: add seq number 1 to incompleteRpcs
T1: send an RPC with seq number 1
--> server responds with an error since this seqnum is already
GCed
This patch fixes the issue by moving back to a simpler synchronization
scheme such that the two steps (allocation and addition to the tracking
structure) are done under a single critical section.
A new unit test is included which reliably reproduced the issue prior to
the fix.
Change-Id: I56f3d1ac85d34ca663e5b6378ff8362846a2424a
Reviewed-on: http://gerrit.cloudera.org:8080/7494
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/be8e3c22
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/be8e3c22
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/be8e3c22
Branch: refs/heads/master
Commit: be8e3c22b9a3a71b2c365e2b9ed306ea23d60058
Parents: 6d6c7cc
Author: Todd Lipcon <to...@apache.org>
Authored: Mon Jul 24 23:07:40 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Jul 25 16:48:40 2017 +0000
----------------------------------------------------------------------
.../org/apache/kudu/client/RequestTracker.java | 33 +++++++++-----
.../apache/kudu/client/TestRequestTracker.java | 48 ++++++++++++++++++++
2 files changed, 70 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/be8e3c22/java/kudu-client/src/main/java/org/apache/kudu/client/RequestTracker.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RequestTracker.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RequestTracker.java
index 6593a52..de84131 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RequestTracker.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RequestTracker.java
@@ -17,9 +17,8 @@
package org.apache.kudu.client;
-import java.util.Queue;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.TreeSet;
+import javax.annotation.concurrent.GuardedBy;
import org.apache.yetus.audience.InterfaceAudience;
@@ -28,8 +27,12 @@ import org.apache.yetus.audience.InterfaceAudience;
*/
@InterfaceAudience.Private
public class RequestTracker {
- private final AtomicLong sequenceIdTracker = new AtomicLong();
- private final Queue<Long> incompleteRpcs = new PriorityBlockingQueue<>();
+ private Object lock = new Object();
+
+ @GuardedBy("lock")
+ private long nextSeqNo = 1;
+ @GuardedBy("lock")
+ private final TreeSet<Long> incompleteRpcs = new TreeSet<>();
static final long NO_SEQ_NO = -1;
@@ -48,9 +51,11 @@ public class RequestTracker {
* @return a new sequence number
*/
public long newSeqNo() {
- Long next = sequenceIdTracker.incrementAndGet();
- incompleteRpcs.add(next);
- return next;
+ synchronized (lock) {
+ long seq = nextSeqNo++;
+ incompleteRpcs.add(seq);
+ return seq;
+ }
}
/**
@@ -59,8 +64,12 @@ public class RequestTracker {
* @return the first incomplete sequence number
*/
public long firstIncomplete() {
- Long peek = incompleteRpcs.peek();
- return peek == null ? NO_SEQ_NO : peek;
+ synchronized (lock) {
+ if (incompleteRpcs.isEmpty()) {
+ return NO_SEQ_NO;
+ }
+ return incompleteRpcs.first();
+ }
}
/**
@@ -68,7 +77,9 @@ public class RequestTracker {
* @param sequenceId the sequence id to mark as complete
*/
public void rpcCompleted(long sequenceId) {
- incompleteRpcs.remove(sequenceId);
+ synchronized (lock) {
+ incompleteRpcs.remove(sequenceId);
+ }
}
public String getClientId() {
http://git-wip-us.apache.org/repos/asf/kudu/blob/be8e3c22/java/kudu-client/src/test/java/org/apache/kudu/client/TestRequestTracker.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRequestTracker.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRequestTracker.java
index 83e247d..c75c4fb 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRequestTracker.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRequestTracker.java
@@ -18,8 +18,19 @@ package org.apache.kudu.client;
import static org.junit.Assert.assertEquals;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.Assert;
import org.junit.Test;
+import com.google.common.collect.Lists;
+
public class TestRequestTracker {
@Test(timeout = 10000)
@@ -71,4 +82,41 @@ public class TestRequestTracker {
// Test that we get back to NO_SEQ_NO after marking them all.
assertEquals(RequestTracker.NO_SEQ_NO, tracker.firstIncomplete());
}
+
+ private static class Checker {
+ long curIncomplete = 0;
+ public synchronized void check(long seqNo, long firstIncomplete) {
+ Assert.assertTrue("should not send a seq number that was previously marked complete",
+ seqNo >= curIncomplete);
+ curIncomplete = Math.max(firstIncomplete, curIncomplete);
+ }
+ }
+
+ @Test(timeout = 30000)
+ public void testMultiThreaded() throws InterruptedException, ExecutionException {
+ final AtomicBoolean done = new AtomicBoolean(false);
+ final RequestTracker rt = new RequestTracker("fake id");
+ final Checker checker = new Checker();
+ ExecutorService exec = Executors.newCachedThreadPool();
+ List<Future<Void>> futures = Lists.newArrayList();
+ for (int i = 0; i < 16; i++) {
+ futures.add(exec.submit(new Callable<Void>() {
+ @Override
+ public Void call() {
+ while (!done.get()) {
+ long seqNo = rt.newSeqNo();
+ long incomplete = rt.firstIncomplete();
+ checker.check(seqNo, incomplete);
+ rt.rpcCompleted(seqNo);
+ }
+ return null;
+ }
+ }));
+ }
+ Thread.sleep(5000);
+ done.set(true);
+ for (Future<Void> f : futures) {
+ f.get();
+ }
+ }
}