You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by va...@apache.org on 2017/06/14 00:56:47 UTC

[09/52] [abbrv] sentry git commit: SENTRY-1695: Waiting for HMS notifications from Thrift should be interruptible (Alex Kolbasov, reviewed by: Hao Hao and Sergio Pena)

SENTRY-1695: Waiting for HMS notifications from Thrift should be interruptible (Alex Kolbasov, reviewed by: Hao Hao and Sergio Pena)

Change-Id: I28e714b99ea08ea18fe1bde6fcb67b617ea3f563
Reviewed-on: http://gerrit.sjc.cloudera.com:8080/22325
Tested-by: Jenkins User
Reviewed-by: Alexander Kolbasov <ak...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/ddfc9c8c
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/ddfc9c8c
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/ddfc9c8c

Branch: refs/for/cdh5-1.5.1_ha
Commit: ddfc9c8c64df1640d510d85dd20a1746f2a3553a
Parents: ee48831
Author: Alexander Kolbasov <ak...@cloudera.com>
Authored: Thu May 4 16:51:51 2017 -0700
Committer: Alexander Kolbasov <ak...@cloudera.com>
Committed: Thu May 4 18:14:40 2017 -0700

----------------------------------------------------------------------
 .../service/thrift/SentryPolicyStoreProcessor.java  |  9 +++++++--
 .../apache/sentry/service/thrift/CounterWait.java   |  9 +++++----
 .../sentry/service/thrift/TestCounterWait.java      | 16 ++++++++++------
 3 files changed, 22 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/ddfc9c8c/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
index b5aee50..56faf00 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
@@ -924,13 +924,18 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface {
   @Override
   public TSentrySyncIDResponse sentry_sync_notifications(TSentrySyncIDRequest request)
           throws TException {
+    TSentrySyncIDResponse response = new TSentrySyncIDResponse();
     try (Timer.Context timerContext = hmsWaitTimer.time()) {
       // Wait until Sentry Server processes specified HMS Notification ID.
-      TSentrySyncIDResponse response = new TSentrySyncIDResponse();
       response.setId(sentryStore.getCounterWait().waitFor(request.getId()));
       response.setStatus(Status.OK());
-      return response;
+    } catch (InterruptedException e) {
+      String msg = String.format("wait request for id %d is interrupted",
+              request.getId());
+      LOGGER.error(msg, e);
+      response.setStatus(Status.RuntimeError(msg, e));
     }
+    return response;
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/sentry/blob/ddfc9c8c/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java
index f593bff..2b4ee84 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java
@@ -117,8 +117,9 @@ public final class CounterWait {
    * @param value requested counter value
    * @return current counter value that should be no smaller then the requested
    * value
+   * @throws InterruptedException if the wait was interrupted
    */
-  public long waitFor(long value) {
+  public long waitFor(long value) throws InterruptedException {
     // Fast path - counter value already reached, no need to block
     if (value <= currentId.get()) {
       return currentId.get();
@@ -220,9 +221,9 @@ public final class CounterWait {
       semaphore.acquireUninterruptibly(); // Will not block
     }
 
-    /** Wait until signaled. May return immediately if already signalled. */
-    void waitFor() {
-      semaphore.acquireUninterruptibly();
+    /** Wait until signaled or interrupted. May return immediately if already signalled. */
+    void waitFor() throws InterruptedException {
+      semaphore.acquire();
     }
 
     /** @return the value we are waiting for */

http://git-wip-us.apache.org/repos/asf/sentry/blob/ddfc9c8c/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java
index a700178..1b732da 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java
@@ -30,12 +30,11 @@ import java.util.concurrent.LinkedBlockingDeque;
  */
 public class TestCounterWait extends TestCase {
   // Used to verify that wakeups happen in the right order
-  private BlockingDeque<Long> outSyncQueue = new LinkedBlockingDeque<>();
-  // Number of waiters to test for
-  private int nthreads = 20;
+  private final BlockingDeque<Long> outSyncQueue = new LinkedBlockingDeque<>();
 
   public void testWaitFor() throws Exception {
     // Create a thread for each waiter
+    int nthreads = 20;
     ExecutorService executor = Executors.newFixedThreadPool(nthreads);
 
     final CounterWait waiter = new CounterWait();
@@ -46,11 +45,16 @@ public class TestCounterWait extends TestCase {
     // Create a pair of threads waiting for each value in [1, nthreads / 2]
     // We use pair of threads per value to verify that both are waken up
     for (int i = 0; i < nthreads; i++) {
-      final int finalI = i + 2;
+      int finalI = i + 2;
       final int val = finalI / 2;
       executor.execute(new Runnable() {
                          public void run() {
-                           long r = waiter.waitFor(val); // blocks
+                           long r = 0;
+                           try {
+                             r = waiter.waitFor(val); // blocks
+                           } catch (InterruptedException e) {
+                             e.printStackTrace();
+                           }
                            outSyncQueue.add(r); // Once we wake up, post result
                          }
                        }
@@ -68,7 +72,7 @@ public class TestCounterWait extends TestCase {
     // Post a counter update for each value in [ 1, nthreads / 2 ]
     // After eac update two threads should be waken up and the corresponding pair of
     // values should appear in the outSyncQueue.
-    for (int i = 0; i < nthreads / 2; i++) {
+    for (int i = 0; i < (nthreads / 2); i++) {
       waiter.update(i + 1);
       long r = outSyncQueue.takeFirst();
       assertEquals(r, i + 1);