You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ak...@apache.org on 2017/03/04 05:55:45 UTC
sentry git commit: SENTRY-1601 Implement HMS Notification barrier on
the server side (Alex Kolbasov, Reviewed by: Misha Dmitriev, Vadim Spector)
Repository: sentry
Updated Branches:
refs/heads/sentry-ha-redesign 1c6ba5ebe -> aecf3f281
SENTRY-1601 Implement HMS Notification barrier on the server side (Alex Kolbasov, Reviewed by: Misha Dmitriev, Vadim Spector)
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/aecf3f28
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/aecf3f28
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/aecf3f28
Branch: refs/heads/sentry-ha-redesign
Commit: aecf3f281e4c87feb4063d87e02373f4ae747a1b
Parents: 1c6ba5e
Author: Alexander Kolbasov <ak...@cloudera.com>
Authored: Fri Mar 3 21:54:55 2017 -0800
Committer: Alexander Kolbasov <ak...@cloudera.com>
Committed: Fri Mar 3 21:54:55 2017 -0800
----------------------------------------------------------------------
.../db/service/persistent/SentryStore.java | 28 ++
.../db/service/thrift/SentryMetrics.java | 1 +
.../thrift/SentryPolicyStoreProcessor.java | 17 +-
.../sentry/service/thrift/CounterWait.java | 266 +++++++++++++++++++
.../sentry/service/thrift/HMSFollower.java | 9 +-
.../sentry/service/thrift/TestCounterWait.java | 90 +++++++
6 files changed, 407 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sentry/blob/aecf3f28/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
index c1186ba..38f68cd 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
@@ -74,6 +74,7 @@ import org.apache.sentry.provider.db.service.thrift.TSentryMappingData;
import org.apache.sentry.provider.db.service.thrift.TSentryPrivilege;
import org.apache.sentry.provider.db.service.thrift.TSentryPrivilegeMap;
import org.apache.sentry.provider.db.service.thrift.TSentryRole;
+import org.apache.sentry.service.thrift.CounterWait;
import org.apache.sentry.service.thrift.ServiceConstants.PrivilegeScope;
import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
import org.datanucleus.store.rdbms.exceptions.MissingTableException;
@@ -143,6 +144,17 @@ public class SentryStore {
private Thread privCleanerThread = null;
private final TransactionManager tm;
+ /**
+ * counterWait is used to synchronize notifications between Thrift and HMSFollower.
+ * Technically it doesn't belong here, but the only thing that connects HMSFollower
+ * and Thrift API is SentryStore. An alternative could be a singleton CounterWait or
+ * some factory that returns CounterWait instances keyed by name, but this complicates
+ * things unnecessary.
+ * <p>
+ * Keeping it here isn't ideal but serves the purpose until we find a better home.
+ */
+ private final CounterWait counterWait = new CounterWait();
+
public static Properties getDataNucleusProperties(Configuration conf)
throws SentrySiteConfigurationException, IOException {
Properties prop = new Properties();
@@ -237,6 +249,10 @@ public class SentryStore {
return tm;
}
+ public CounterWait getCounterWait() {
+ return counterWait;
+ }
+
// ensure that the backend DB schema is set
void verifySentryStoreSchema(boolean checkVersion) throws Exception {
if (!checkVersion) {
@@ -420,6 +436,18 @@ public class SentryStore {
}
/**
+ * @return number of threads waiting for HMS notifications to be processed
+ */
+ public Gauge<Integer> getHMSWaitersCountGauge() {
+ return new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return counterWait.waitersCount();
+ }
+ };
+ }
+
+ /**
* Lets the test code know how many privs are in the db, so that we know
* if they are in fact being cleaned up when not being referenced any more.
* @return The number of rows in the db priv table.
http://git-wip-us.apache.org/repos/asf/sentry/blob/aecf3f28/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryMetrics.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryMetrics.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryMetrics.java
index 3f7542c..a359a04 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryMetrics.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryMetrics.java
@@ -121,6 +121,7 @@ public final class SentryMetrics {
addGauge(SentryStore.class, "privilege_count",
sentryStore.getPrivilegeCountGauge());
addGauge(SentryStore.class, "group_count", sentryStore.getGroupCountGauge());
+ addGauge(SentryStore.class, "hms.waiters", sentryStore.getHMSWaitersCountGauge());
gaugesAdded = true;
}
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/aecf3f28/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
index 30e91ae..2ba3d38 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
@@ -60,6 +60,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.Timer;
+import static com.codahale.metrics.MetricRegistry.name;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
@@ -82,7 +84,10 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface {
private final SentryStore sentryStore;
private final NotificationHandlerInvoker notificationHandlerInvoker;
private final ImmutableSet<String> adminGroups;
- SentryMetrics sentryMetrics;
+ private SentryMetrics sentryMetrics;
+ private final Timer hmsWaitTimer =
+ SentryMetrics.getInstance().
+ getTimer(name(SentryPolicyStoreProcessor.class, "hms", "wait"));
private List<SentryPolicyStorePlugin> sentryPlugins = new LinkedList<SentryPolicyStorePlugin>();
@@ -1142,7 +1147,13 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface {
}
@Override
- public TSentrySyncIDResponse sentry_sync_notifications(TSentrySyncIDRequest request) throws TException {
- throw new UnsupportedOperationException("sentry_sync_notifications");
+ public TSentrySyncIDResponse sentry_sync_notifications(TSentrySyncIDRequest request)
+ throws TException {
+ try (Timer.Context timerContext = hmsWaitTimer.time()) {
+ // Wait until Sentry Server processes specified HMS Notification ID.
+ TSentrySyncIDResponse response = new TSentrySyncIDResponse();
+ response.setId(sentryStore.getCounterWait().waitFor(request.getId()));
+ return response;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/aecf3f28/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java
new file mode 100644
index 0000000..f593bff
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import org.apache.http.annotation.ThreadSafe;
+
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Waiting for counter to reach certain value.
+ * The counter starts from zero and its value increases over time.
+ * The class allows for multiple consumers waiting until the value of the
+ * counter reaches some value interesting to them.
+ * Consumers call {@link #waitFor(long)} which may either return
+ * immediately if the counter reached the specified value, or block
+ * until this value is reached.
+ * <p>
+ * All waiters should be waken up when the counter becomes equal or higher
+ * then the value they are waiting for.
+ * <p>
+ * The counter is updated by a single updater that should only increase the
+ * counter value.
+ * The updater calls the {@link #update(long)} method to update the counter
+ * value and this should wake up all threads waiting for any value smaller or
+ * equal to the new one.
+ * <p>
+ * The class is thread-safe.
+ * It is designed for use by multiple waiter threads and a single
+ * updater thread, but it will work correctly even in the presence of multiple
+ * updater threads.
+ */
+@ThreadSafe
+public final class CounterWait {
+ // Implementation notes.
+ //
+ // The implementation is based on:
+ //
+ // 1) Using an atomic counter value which guarantees consistency.
+ // Since everyone needs only to know when the counter value reached the
+ // certain value and the counter may only increase its value,
+ // it is safe to update the counter by another thread after its value
+ // was read.
+ //
+ // 2) Priority queue of waiters, sorted by their expected values. The smallest
+ // value is always at the top of the queue. The priority queue itself
+ // is thread-safe, so no locks are needed to protect access to it.
+ //
+ // Each waiter is implemented using a binary semaphore.
+ // This solves the problem of a wakeup that happens before the sleep -
+ // in this case the acquire() doesn't block and returns immediately.
+ //
+ // NOTE: We use PriorityBlockingQueue for waiters because it is thread-safe,
+ // we are not using its blocking queue semantics.
+
+ /** Counter value. May only increase. */
+ private final AtomicLong currentId = new AtomicLong(0);
+
+ /**
+ * Waiters sorted by the value of the counter they are waiting for.
+ * Note that {@link PriorityBlockingQueue} is thread-safe.
+ * We are not using this as a blocking queue, but as a synchronized
+ * PriorityQueue.
+ */
+ private final PriorityBlockingQueue<ValueEvent> waiters =
+ new PriorityBlockingQueue<>();
+
+ /**
+ * Update the counter value and wake up all threads waiting for this
+ * value or any value below it.
+ * <p>
+ * The counter value should only increase.
+ * An attempt to decrease the value is raising
+ * {@link IllegalArgumentException}.
+ * The usual case is to have a single updater thread, but we enforce this
+ * by synchronizing the call.
+ *
+ * @param newValue the new counter value
+ */
+ public synchronized void update(long newValue) {
+ // Make sure the counter is never decremented
+ if (newValue < currentId.get()) {
+ throw new IllegalArgumentException("new counter value " +
+ String.valueOf(newValue) +
+ "is smaller then the previous one " + currentId);
+ }
+ currentId.set(newValue);
+
+ // Wake up any threads waiting for a counter to reach this value.
+ wakeup(newValue);
+ }
+
+
+ /**
+ * Wait for specified counter value.
+ * Returns immediately if the value is reached or blocks until the value
+ * is reached.
+ * Multiple threads can call the method concurrently.
+ *
+ * @param value requested counter value
+ * @return current counter value that should be no smaller then the requested
+ * value
+ */
+ public long waitFor(long value) {
+ // Fast path - counter value already reached, no need to block
+ if (value <= currentId.get()) {
+ return currentId.get();
+ }
+
+ // Enqueue the waiter for this value
+ ValueEvent eid = new ValueEvent(value);
+ waiters.put(eid);
+
+ // It is possible that between the fast path check and the time the
+ // value event is enqueued, the counter value already reached the requested
+ // value. In this case we return immediately.
+ if (value <= currentId.get()) {
+ return currentId.get();
+ }
+
+ // At this point we may be sure that by the time the event was enqueued,
+ // the counter was below the requested value. This means that update()
+ // is guaranteed to wake us up when the counter reaches the requested value.
+ // The wake up may actually happen before we start waiting, in this case
+ // the event's blocking queue will be non-empty and the waitFor() below
+ // will not block, so it is safe to wake up before the wait.
+ // So sit tight and wait patiently.
+ eid.waitFor();
+ return currentId.get();
+ }
+
+ /**
+ * Wake up any threads waiting for a counter to reach specified value
+ * Peek at the top of the queue. If the queue is empty or the top value
+ * exceeds the current value, we are done. Otherwise wakeup the top thread,
+ * remove the corresponding waiter and continue.
+ * <p>
+ * Note that the waiter may be removed under our nose by
+ * {@link #waitFor(long)} method, but this is Ok - in this case
+ * waiters.remove() will just return false.
+ *
+ * @param value current counter value
+ */
+ private void wakeup(long value) {
+ while (true) {
+ // Get the top of the waiters queue or null if it is empty
+ ValueEvent e = waiters.poll();
+ if (e == null) {
+ // Queue is empty - return.
+ return;
+ }
+ // No one to wake up, return event to the queue and exit
+ if (e.getValue() > value) {
+ waiters.add(e);
+ return;
+ }
+ // Due for wake-up call
+ e.wakeup();
+ }
+ }
+
+ // Useful for debugging
+ @Override
+ public String toString() {
+ return "CounterWait{" + "currentId=" + currentId +
+ ", waiters=" + waiters + "}";
+ }
+
+ /**
+ * Return number of waiters. This is mostly useful for metrics/debugging
+ *
+ * @return number of sleeping waiters
+ */
+ public int waitersCount() {
+ return waiters.size();
+ }
+
+ /**
+ * Representation of the waiting event.
+ * The waiting event consists of the expected value and a binary semaphore.
+ * <p>
+ * Each thread waiting for the given value, creates a ValueEvent and tries
+ * to acquire a semaphore. This blocks until the semaphore is released.
+ * <p>
+ * ValueEvents are stored in priority queue sorted by value, so they should be
+ * comparable by the value.
+ */
+ private static class ValueEvent implements Comparable<ValueEvent> {
+ /** Value waited for. */
+ private final long value;
+ /** Binary semaphore to synchronize waiters */
+ private final Semaphore semaphore = new Semaphore(1);
+
+ /**
+ * Instantiates a new Value event.
+ *
+ * @param v the expected value
+ */
+ ValueEvent(long v) {
+ this.value = v;
+ // Acquire the semaphore. Subsequent calls to waitFor() will block until
+ // wakeup() releases the semaphore.
+ semaphore.acquireUninterruptibly(); // Will not block
+ }
+
+ /** Wait until signaled. May return immediately if already signalled. */
+ void waitFor() {
+ semaphore.acquireUninterruptibly();
+ }
+
+ /** @return the value we are waiting for */
+ long getValue() {
+ return value;
+ }
+
+ /** Wakeup the waiting thread. */
+ void wakeup() {
+ semaphore.release();
+ }
+
+ /**
+ * Compare objects by value
+ */
+ @Override
+ public int compareTo(final ValueEvent o) {
+ return value == o.value ? 0
+ : value < o.value ? -1
+ : 1;
+ }
+
+ /**
+ * Use identity comparison of objects
+ */
+ @Override
+ public boolean equals(final Object o) {
+ return (this == o);
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) (value ^ (value >>> 32));
+ }
+
+ @Override
+ public String toString() {
+ return String.valueOf(value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/aecf3f28/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
index 8b07f5b..f3f51da 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
@@ -345,6 +345,7 @@ public class HMSFollower implements Runnable {
void processNotificationEvents(List<NotificationEvent> events) throws
SentryInvalidHMSEventException, SentryInvalidInputException {
SentryJSONMessageDeserializer deserializer = new SentryJSONMessageDeserializer();
+ final CounterWait counterWait = sentryStore.getCounterWait();
for (NotificationEvent event : events) {
String dbName, tableName, oldLocation, newLocation, location;
@@ -479,7 +480,13 @@ public class HMSFollower implements Runnable {
//TODO: Handle HDFS plugin
break;
}
- currentEventID = event.getEventId();
+ currentEventID = event.getEventId();
+ // Wake up any HMS waiters that are waiting for this ID.
+ // counterWait should never be null, but tests mock SentryStore and a mocked one
+ // doesn't have it.
+ if (counterWait != null) {
+ counterWait.update(currentEventID);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/aecf3f28/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java
new file mode 100644
index 0000000..a700178
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import junit.framework.TestCase;
+
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+
+/**
+ * Test for CounterWait class
+ */
+public class TestCounterWait extends TestCase {
+ // Used to verify that wakeups happen in the right order
+ private BlockingDeque<Long> outSyncQueue = new LinkedBlockingDeque<>();
+ // Number of waiters to test for
+ private int nthreads = 20;
+
+ public void testWaitFor() throws Exception {
+ // Create a thread for each waiter
+ ExecutorService executor = Executors.newFixedThreadPool(nthreads);
+
+ final CounterWait waiter = new CounterWait();
+
+ // Initial value is zero, so this shouldn't block
+ assertEquals(0, waiter.waitFor(0));
+
+ // Create a pair of threads waiting for each value in [1, nthreads / 2]
+ // We use pair of threads per value to verify that both are waken up
+ for (int i = 0; i < nthreads; i++) {
+ final int finalI = i + 2;
+ final int val = finalI / 2;
+ executor.execute(new Runnable() {
+ public void run() {
+ long r = waiter.waitFor(val); // blocks
+ outSyncQueue.add(r); // Once we wake up, post result
+ }
+ }
+ );
+ }
+
+ // Wait until all threads are asleep.
+ while(waiter.waitersCount() < nthreads) {
+ sleep(20);
+ }
+
+ // All threads should be blocked, so outSyncQueue should be empty
+ assertTrue(outSyncQueue.isEmpty());
+
+ // Post a counter update for each value in [ 1, nthreads / 2 ]
+ // After eac update two threads should be waken up and the corresponding pair of
+ // values should appear in the outSyncQueue.
+ for (int i = 0; i < nthreads / 2; i++) {
+ waiter.update(i + 1);
+ long r = outSyncQueue.takeFirst();
+ assertEquals(r, i + 1);
+ r = outSyncQueue.takeFirst();
+ assertEquals(r, i + 1);
+ assertTrue(outSyncQueue.isEmpty());
+ }
+
+ // We are done
+ executor.shutdown();
+ }
+
+ private void sleep(long ms) {
+ try {
+ Thread.sleep(ms);
+ } catch (InterruptedException e) {
+ }
+ }
+}