You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ak...@apache.org on 2017/09/11 23:24:48 UTC

sentry git commit: SENTRY-1940: Sentry should time out threads waiting for notifications (Alex Kolbasov, reviewd by Vamsee Yarlagadda and Sergio Pena)

Repository: sentry
Updated Branches:
  refs/heads/master bbf5ce1fd -> b38267765


SENTRY-1940: Sentry should time out threads waiting for notifications (Alex Kolbasov, reviewd by Vamsee Yarlagadda and Sergio Pena)


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/b3826776
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/b3826776
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/b3826776

Branch: refs/heads/master
Commit: b38267765be842ca30b0bfa86f1e47bce6fdd313
Parents: bbf5ce1
Author: Alexander Kolbasov <ak...@cloudera.com>
Authored: Mon Sep 11 16:24:19 2017 -0700
Committer: Alexander Kolbasov <ak...@cloudera.com>
Committed: Mon Sep 11 16:24:40 2017 -0700

----------------------------------------------------------------------
 .../db/service/persistent/SentryStore.java      |  6 ++-
 .../thrift/SentryPolicyStoreProcessor.java      |  6 +++
 .../sentry/service/thrift/CounterWait.java      | 57 ++++++++++++++++----
 .../sentry/service/thrift/ServiceConstants.java |  6 +++
 .../sentry/service/thrift/TestCounterWait.java  | 11 +++-
 5 files changed, 75 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/b3826776/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
index a70a552..01a7c83 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import javax.jdo.FetchGroup;
 import javax.jdo.JDODataStoreException;
@@ -187,7 +188,7 @@ public class SentryStore {
    * <p>
    * Keeping it here isn't ideal but serves the purpose until we find a better home.
    */
-  private final CounterWait counterWait = new CounterWait();
+  private final CounterWait counterWait;
 
   public static Properties getDataNucleusProperties(Configuration conf)
           throws SentrySiteConfigurationException, IOException {
@@ -267,6 +268,9 @@ public class SentryStore {
     pmf = JDOHelper.getPersistenceManagerFactory(prop);
     tm = new TransactionManager(pmf, conf);
     verifySentryStoreSchema(checkSchemaVersion);
+    long notificationTimeout = conf.getInt(ServerConfig.SENTRY_NOTIFICATION_SYNC_TIMEOUT_MS,
+            ServerConfig.SENTRY_NOTIFICATION_SYNC_TIMEOUT_DEFAULT);
+    counterWait = new CounterWait(notificationTimeout, TimeUnit.MILLISECONDS);
   }
 
   public void setPersistUpdateDeltas(boolean persistUpdateDeltas) {

http://git-wip-us.apache.org/repos/asf/sentry/blob/b3826776/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
index cfd0e30..cd85400 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
@@ -26,6 +26,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeoutException;
 import java.util.regex.Pattern;
 
 import org.apache.commons.lang.StringUtils;
@@ -1162,6 +1163,11 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface {
               request.getId());
       LOGGER.error(msg, e);
       response.setStatus(Status.RuntimeError(msg, e));
+      Thread.currentThread().interrupt();
+    } catch (TimeoutException e) {
+      String msg = String.format("timeod out wait request for id %d", request.getId());
+      LOGGER.warn(msg, e);
+      response.setStatus(Status.RuntimeError(msg, e));
     }
     return response;
   }

http://git-wip-us.apache.org/repos/asf/sentry/blob/b3826776/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java
index 2c9e87a..2268ce7 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java
@@ -24,6 +24,8 @@ import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -33,7 +35,9 @@ import java.util.concurrent.atomic.AtomicLong;
  * counter reaches some value interesting to them.
  * Consumers call {@link #waitFor(long)} which may either return
  * immediately if the counter reached the specified value, or block
- * until this value is reached.
+ * until this value is reached. Consumers can also specify timeout for the
+ * {@link #waitFor(long)} in which case it may return {@link TimeoutException}
+ * when the wait was not successfull within the specified time limit.
  * <p>
  * All waiters should be waken up when the counter becomes equal or higher
  * then the value they are waiting for.
@@ -77,6 +81,9 @@ public final class CounterWait {
   /** Counter value. May only increase. */
   private final AtomicLong currentId = new AtomicLong(0);
 
+  private final long waitTimeout;
+  private final TimeUnit waitTimeUnit;
+
   /**
    * Waiters sorted by the value of the counter they are waiting for.
    * Note that {@link PriorityBlockingQueue} is thread-safe.
@@ -87,6 +94,31 @@ public final class CounterWait {
           new PriorityBlockingQueue<>();
 
   /**
+   * Create an instance of CounterWait object that will not timeout during wait
+   */
+  public CounterWait() {
+    this(0, TimeUnit.SECONDS);
+  }
+
+  /**
+   * Create an instance of CounterWait object that will timeout during wait
+   * @param waitTimeout maximum time in seconds to wait for counter
+   */
+  public CounterWait(long waitTimeoutSec) {
+    this(waitTimeoutSec, TimeUnit.SECONDS);
+  }
+
+  /**
+   * Create an instance of CounterWait object that will timeout during wait
+   * @param waitTimeout maximum time to wait for counter
+   * @param waitTimeUnit time units for wait
+   */
+  public CounterWait(long waitTimeout, TimeUnit waitTimeUnit) {
+    this.waitTimeout = waitTimeout;
+    this.waitTimeUnit = waitTimeUnit;
+  }
+
+  /**
    * Update the counter value and wake up all threads waiting for this
    * value or any value below it.
    * <p>
@@ -149,9 +181,10 @@ public final class CounterWait {
    * @param value requested counter value
    * @return current counter value that should be no smaller then the requested
    * value
-   * @throws InterruptedException if the wait was interrupted
+   * @throws InterruptedException if the wait was interrupted, TimeoutException if
+   * wait was not successfull within the timeout value specified at the construction time.
    */
-  public long waitFor(long value) throws InterruptedException {
+  public long waitFor(long value) throws InterruptedException, TimeoutException {
     // Fast path - counter value already reached, no need to block
     if (value <= currentId.get()) {
       return currentId.get();
@@ -235,7 +268,7 @@ public final class CounterWait {
    * ValueEvents are stored in priority queue sorted by value, so they should be
    * comparable by the value.
    */
-  private static class ValueEvent implements Comparable<ValueEvent> {
+  private class ValueEvent implements Comparable<ValueEvent> {
     /** Value waited for. */
     private final long value;
     /** Binary semaphore to synchronize waiters */
@@ -254,11 +287,17 @@ public final class CounterWait {
     }
 
     /** Wait until signaled or interrupted. May return immediately if already signalled. */
-    void waitFor() throws InterruptedException {
-      semaphore.acquire();
+    void waitFor() throws InterruptedException, TimeoutException {
+      if (waitTimeout == 0) {
+        semaphore.acquire();
+        return;
+      }
+      if (!semaphore.tryAcquire(waitTimeout, waitTimeUnit)) {
+        throw new TimeoutException();
+      }
     }
 
-    /** @return the value we are waiting for */
+    /** @return the value we are waiting for. */
     long getValue() {
       return value;
     }
@@ -269,7 +308,7 @@ public final class CounterWait {
     }
 
     /**
-     * Compare objects by value
+     * Compare objects by value.
      */
     @Override
     public int compareTo(final ValueEvent o) {
@@ -279,7 +318,7 @@ public final class CounterWait {
     }
 
     /**
-     * Use identity comparison of objects
+     * Use identity comparison of objects.
      */
     @Override
     public boolean equals(final Object o) {

http://git-wip-us.apache.org/repos/asf/sentry/blob/b3826776/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
index 48aec1e..280aebc 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
@@ -139,6 +139,12 @@ public class ServiceConstants {
     public static final String SERVER_HA_ZOOKEEPER_CLIENT_TICKET_CACHE = "sentry.zookeeper.client.ticketcache";
     public static final String SERVER_HA_ZOOKEEPER_CLIENT_TICKET_CACHE_DEFAULT = "false";
     public static final String SERVER_HA_STANDBY_SIG = "sentry.ha.standby.signal";
+
+    // Timeout value in seconds for HMS notificationID synchronization
+    // Should match the value for RPC timeout in HMS client config
+    public static final String SENTRY_NOTIFICATION_SYNC_TIMEOUT_MS = "sentry.notification.sync.timeout.ms";
+    public static final int SENTRY_NOTIFICATION_SYNC_TIMEOUT_DEFAULT = 200000;
+
     public static final ImmutableMap<String, String> SENTRY_STORE_DEFAULTS =
         ImmutableMap.<String, String>builder()
         .put("datanucleus.connectionPoolingType", "BoneCP")

http://git-wip-us.apache.org/repos/asf/sentry/blob/b3826776/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java
index e4846d9..090999a 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java
@@ -25,6 +25,8 @@ import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.junit.Test;
 
@@ -56,7 +58,7 @@ public class TestCounterWait {
                            long r = 0;
                            try {
                              r = waiter.waitFor(val); // blocks
-                           } catch (InterruptedException e) {
+                           } catch (InterruptedException | TimeoutException e) {
                              e.printStackTrace();
                            }
                            outSyncQueue.add(r); // Once we wake up, post result
@@ -89,6 +91,13 @@ public class TestCounterWait {
     executor.shutdown();
   }
 
+  // Test for waitFor() timeout throwing TimeoutException
+  @Test(expected = TimeoutException.class)
+  public void testWaitForWithTimeout() throws Exception {
+    CounterWait waiter = new CounterWait(1, TimeUnit.MILLISECONDS);
+    waiter.waitFor(1); // Should throw exception
+  }
+
   private void sleep(long ms) {
     try {
       Thread.sleep(ms);