You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2017/07/25 17:22:27 UTC

[2/2] kudu git commit: KUDU-2053. Fix race in Java RequestTracker

KUDU-2053. Fix race in Java RequestTracker

The implementation of RequestTracker.newSeqNo() was previously
implemented as:

1. allocate the new sequence number
2. add that sequence number to the 'incomplete' map

These steps were individually thread-safe by using an AtomicLong and a
thread-safe collection, respectively, but they were not performed
atomically. Thus we could have the following race:

T1: allocate seq number 1
T2:     allocate seq number 2
T2:     add seq number 2 to incompleteRpcs
T2:     ask for firstIncomplete() -> 2
T2:     send an RPC
        --> server GCs seqnum < 2
T1: add seq number 1 to incompleteRpcs
T1: send an RPC with seq number 1
    --> server responds with an error since this seqnum is already
        GCed

This patch fixes the issue by moving back to a simpler synchronization
scheme such that the two steps (allocation and addition to the tracking
structure) are done under a single critical section.

A new unit test is included which reliably reproduced the issue prior to
the fix.

Change-Id: I56f3d1ac85d34ca663e5b6378ff8362846a2424a
Reviewed-on: http://gerrit.cloudera.org:8080/7494
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/be8e3c22
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/be8e3c22
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/be8e3c22

Branch: refs/heads/master
Commit: be8e3c22b9a3a71b2c365e2b9ed306ea23d60058
Parents: 6d6c7cc
Author: Todd Lipcon <to...@apache.org>
Authored: Mon Jul 24 23:07:40 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Jul 25 16:48:40 2017 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/RequestTracker.java  | 33 +++++++++-----
 .../apache/kudu/client/TestRequestTracker.java  | 48 ++++++++++++++++++++
 2 files changed, 70 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/be8e3c22/java/kudu-client/src/main/java/org/apache/kudu/client/RequestTracker.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RequestTracker.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RequestTracker.java
index 6593a52..de84131 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RequestTracker.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RequestTracker.java
@@ -17,9 +17,8 @@
 
 package org.apache.kudu.client;
 
-import java.util.Queue;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.TreeSet;
+import javax.annotation.concurrent.GuardedBy;
 
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -28,8 +27,12 @@ import org.apache.yetus.audience.InterfaceAudience;
  */
 @InterfaceAudience.Private
 public class RequestTracker {
-  private final AtomicLong sequenceIdTracker = new AtomicLong();
-  private final Queue<Long> incompleteRpcs = new PriorityBlockingQueue<>();
+  private Object lock = new Object();
+
+  @GuardedBy("lock")
+  private long nextSeqNo = 1;
+  @GuardedBy("lock")
+  private final TreeSet<Long> incompleteRpcs = new TreeSet<>();
 
   static final long NO_SEQ_NO = -1;
 
@@ -48,9 +51,11 @@ public class RequestTracker {
    * @return a new sequence number
    */
   public long newSeqNo() {
-    Long next = sequenceIdTracker.incrementAndGet();
-    incompleteRpcs.add(next);
-    return next;
+    synchronized (lock) {
+      long seq = nextSeqNo++;
+      incompleteRpcs.add(seq);
+      return seq;
+    }
   }
 
   /**
@@ -59,8 +64,12 @@ public class RequestTracker {
    * @return the first incomplete sequence number
    */
   public long firstIncomplete() {
-    Long peek = incompleteRpcs.peek();
-    return peek == null ? NO_SEQ_NO : peek;
+    synchronized (lock) {
+      if (incompleteRpcs.isEmpty()) {
+        return NO_SEQ_NO;
+      }
+      return incompleteRpcs.first();
+    }
   }
 
   /**
@@ -68,7 +77,9 @@ public class RequestTracker {
    * @param sequenceId the sequence id to mark as complete
    */
   public void rpcCompleted(long sequenceId) {
-    incompleteRpcs.remove(sequenceId);
+    synchronized (lock) {
+      incompleteRpcs.remove(sequenceId);
+    }
   }
 
   public String getClientId() {

http://git-wip-us.apache.org/repos/asf/kudu/blob/be8e3c22/java/kudu-client/src/test/java/org/apache/kudu/client/TestRequestTracker.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRequestTracker.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRequestTracker.java
index 83e247d..c75c4fb 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRequestTracker.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRequestTracker.java
@@ -18,8 +18,19 @@ package org.apache.kudu.client;
 
 import static org.junit.Assert.assertEquals;
 
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.Assert;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
+
 public class TestRequestTracker {
 
   @Test(timeout = 10000)
@@ -71,4 +82,41 @@ public class TestRequestTracker {
     // Test that we get back to NO_SEQ_NO after marking them all.
     assertEquals(RequestTracker.NO_SEQ_NO, tracker.firstIncomplete());
   }
+
+  private static class Checker {
+    long curIncomplete = 0;
+    public synchronized void check(long seqNo, long firstIncomplete) {
+      Assert.assertTrue("should not send a seq number that was previously marked complete",
+          seqNo >= curIncomplete);
+      curIncomplete = Math.max(firstIncomplete, curIncomplete);
+    }
+  }
+
+  @Test(timeout = 30000)
+  public void testMultiThreaded() throws InterruptedException, ExecutionException {
+    final AtomicBoolean done = new AtomicBoolean(false);
+    final RequestTracker rt = new RequestTracker("fake id");
+    final Checker checker = new Checker();
+    ExecutorService exec = Executors.newCachedThreadPool();
+    List<Future<Void>> futures = Lists.newArrayList();
+    for (int i = 0; i < 16; i++) {
+      futures.add(exec.submit(new Callable<Void>() {
+        @Override
+        public Void call() {
+          while (!done.get()) {
+            long seqNo = rt.newSeqNo();
+            long incomplete = rt.firstIncomplete();
+            checker.check(seqNo, incomplete);
+            rt.rpcCompleted(seqNo);
+          }
+          return null;
+        }
+      }));
+    }
+    Thread.sleep(5000);
+    done.set(true);
+    for (Future<Void> f : futures) {
+      f.get();
+    }
+  }
 }