You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ak...@apache.org on 2017/09/11 23:24:48 UTC
sentry git commit: SENTRY-1940: Sentry should time out threads
waiting for notifications (Alex Kolbasov,
reviewd by Vamsee Yarlagadda and Sergio Pena)
Repository: sentry
Updated Branches:
refs/heads/master bbf5ce1fd -> b38267765
SENTRY-1940: Sentry should time out threads waiting for notifications (Alex Kolbasov, reviewd by Vamsee Yarlagadda and Sergio Pena)
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/b3826776
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/b3826776
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/b3826776
Branch: refs/heads/master
Commit: b38267765be842ca30b0bfa86f1e47bce6fdd313
Parents: bbf5ce1
Author: Alexander Kolbasov <ak...@cloudera.com>
Authored: Mon Sep 11 16:24:19 2017 -0700
Committer: Alexander Kolbasov <ak...@cloudera.com>
Committed: Mon Sep 11 16:24:40 2017 -0700
----------------------------------------------------------------------
.../db/service/persistent/SentryStore.java | 6 ++-
.../thrift/SentryPolicyStoreProcessor.java | 6 +++
.../sentry/service/thrift/CounterWait.java | 57 ++++++++++++++++----
.../sentry/service/thrift/ServiceConstants.java | 6 +++
.../sentry/service/thrift/TestCounterWait.java | 11 +++-
5 files changed, 75 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sentry/blob/b3826776/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
index a70a552..01a7c83 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import javax.jdo.FetchGroup;
import javax.jdo.JDODataStoreException;
@@ -187,7 +188,7 @@ public class SentryStore {
* <p>
* Keeping it here isn't ideal but serves the purpose until we find a better home.
*/
- private final CounterWait counterWait = new CounterWait();
+ private final CounterWait counterWait;
public static Properties getDataNucleusProperties(Configuration conf)
throws SentrySiteConfigurationException, IOException {
@@ -267,6 +268,9 @@ public class SentryStore {
pmf = JDOHelper.getPersistenceManagerFactory(prop);
tm = new TransactionManager(pmf, conf);
verifySentryStoreSchema(checkSchemaVersion);
+ long notificationTimeout = conf.getInt(ServerConfig.SENTRY_NOTIFICATION_SYNC_TIMEOUT_MS,
+ ServerConfig.SENTRY_NOTIFICATION_SYNC_TIMEOUT_DEFAULT);
+ counterWait = new CounterWait(notificationTimeout, TimeUnit.MILLISECONDS);
}
public void setPersistUpdateDeltas(boolean persistUpdateDeltas) {
http://git-wip-us.apache.org/repos/asf/sentry/blob/b3826776/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
index cfd0e30..cd85400 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
@@ -26,6 +26,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
@@ -1162,6 +1163,11 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface {
request.getId());
LOGGER.error(msg, e);
response.setStatus(Status.RuntimeError(msg, e));
+ Thread.currentThread().interrupt();
+ } catch (TimeoutException e) {
+ String msg = String.format("timeod out wait request for id %d", request.getId());
+ LOGGER.warn(msg, e);
+ response.setStatus(Status.RuntimeError(msg, e));
}
return response;
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/b3826776/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java
index 2c9e87a..2268ce7 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java
@@ -24,6 +24,8 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -33,7 +35,9 @@ import java.util.concurrent.atomic.AtomicLong;
* counter reaches some value interesting to them.
* Consumers call {@link #waitFor(long)} which may either return
* immediately if the counter reached the specified value, or block
- * until this value is reached.
+ * until this value is reached. Consumers can also specify timeout for the
+ * {@link #waitFor(long)} in which case it may return {@link TimeoutException}
+ * when the wait was not successfull within the specified time limit.
* <p>
* All waiters should be waken up when the counter becomes equal or higher
* then the value they are waiting for.
@@ -77,6 +81,9 @@ public final class CounterWait {
/** Counter value. May only increase. */
private final AtomicLong currentId = new AtomicLong(0);
+ private final long waitTimeout;
+ private final TimeUnit waitTimeUnit;
+
/**
* Waiters sorted by the value of the counter they are waiting for.
* Note that {@link PriorityBlockingQueue} is thread-safe.
@@ -87,6 +94,31 @@ public final class CounterWait {
new PriorityBlockingQueue<>();
/**
+ * Create an instance of CounterWait object that will not timeout during wait
+ */
+ public CounterWait() {
+ this(0, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Create an instance of CounterWait object that will timeout during wait
+ * @param waitTimeout maximum time in seconds to wait for counter
+ */
+ public CounterWait(long waitTimeoutSec) {
+ this(waitTimeoutSec, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Create an instance of CounterWait object that will timeout during wait
+ * @param waitTimeout maximum time to wait for counter
+ * @param waitTimeUnit time units for wait
+ */
+ public CounterWait(long waitTimeout, TimeUnit waitTimeUnit) {
+ this.waitTimeout = waitTimeout;
+ this.waitTimeUnit = waitTimeUnit;
+ }
+
+ /**
* Update the counter value and wake up all threads waiting for this
* value or any value below it.
* <p>
@@ -149,9 +181,10 @@ public final class CounterWait {
* @param value requested counter value
* @return current counter value that should be no smaller then the requested
* value
- * @throws InterruptedException if the wait was interrupted
+ * @throws InterruptedException if the wait was interrupted, TimeoutException if
+ * wait was not successfull within the timeout value specified at the construction time.
*/
- public long waitFor(long value) throws InterruptedException {
+ public long waitFor(long value) throws InterruptedException, TimeoutException {
// Fast path - counter value already reached, no need to block
if (value <= currentId.get()) {
return currentId.get();
@@ -235,7 +268,7 @@ public final class CounterWait {
* ValueEvents are stored in priority queue sorted by value, so they should be
* comparable by the value.
*/
- private static class ValueEvent implements Comparable<ValueEvent> {
+ private class ValueEvent implements Comparable<ValueEvent> {
/** Value waited for. */
private final long value;
/** Binary semaphore to synchronize waiters */
@@ -254,11 +287,17 @@ public final class CounterWait {
}
/** Wait until signaled or interrupted. May return immediately if already signalled. */
- void waitFor() throws InterruptedException {
- semaphore.acquire();
+ void waitFor() throws InterruptedException, TimeoutException {
+ if (waitTimeout == 0) {
+ semaphore.acquire();
+ return;
+ }
+ if (!semaphore.tryAcquire(waitTimeout, waitTimeUnit)) {
+ throw new TimeoutException();
+ }
}
- /** @return the value we are waiting for */
+ /** @return the value we are waiting for. */
long getValue() {
return value;
}
@@ -269,7 +308,7 @@ public final class CounterWait {
}
/**
- * Compare objects by value
+ * Compare objects by value.
*/
@Override
public int compareTo(final ValueEvent o) {
@@ -279,7 +318,7 @@ public final class CounterWait {
}
/**
- * Use identity comparison of objects
+ * Use identity comparison of objects.
*/
@Override
public boolean equals(final Object o) {
http://git-wip-us.apache.org/repos/asf/sentry/blob/b3826776/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
index 48aec1e..280aebc 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
@@ -139,6 +139,12 @@ public class ServiceConstants {
public static final String SERVER_HA_ZOOKEEPER_CLIENT_TICKET_CACHE = "sentry.zookeeper.client.ticketcache";
public static final String SERVER_HA_ZOOKEEPER_CLIENT_TICKET_CACHE_DEFAULT = "false";
public static final String SERVER_HA_STANDBY_SIG = "sentry.ha.standby.signal";
+
+ // Timeout value in seconds for HMS notificationID synchronization
+ // Should match the value for RPC timeout in HMS client config
+ public static final String SENTRY_NOTIFICATION_SYNC_TIMEOUT_MS = "sentry.notification.sync.timeout.ms";
+ public static final int SENTRY_NOTIFICATION_SYNC_TIMEOUT_DEFAULT = 200000;
+
public static final ImmutableMap<String, String> SENTRY_STORE_DEFAULTS =
ImmutableMap.<String, String>builder()
.put("datanucleus.connectionPoolingType", "BoneCP")
http://git-wip-us.apache.org/repos/asf/sentry/blob/b3826776/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java
index e4846d9..090999a 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java
@@ -25,6 +25,8 @@ import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.junit.Test;
@@ -56,7 +58,7 @@ public class TestCounterWait {
long r = 0;
try {
r = waiter.waitFor(val); // blocks
- } catch (InterruptedException e) {
+ } catch (InterruptedException | TimeoutException e) {
e.printStackTrace();
}
outSyncQueue.add(r); // Once we wake up, post result
@@ -89,6 +91,13 @@ public class TestCounterWait {
executor.shutdown();
}
+ // Test for waitFor() timeout throwing TimeoutException
+ @Test(expected = TimeoutException.class)
+ public void testWaitForWithTimeout() throws Exception {
+ CounterWait waiter = new CounterWait(1, TimeUnit.MILLISECONDS);
+ waiter.waitFor(1); // Should throw exception
+ }
+
private void sleep(long ms) {
try {
Thread.sleep(ms);