You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ak...@apache.org on 2017/03/04 05:55:45 UTC

sentry git commit: SENTRY-1601 Implement HMS Notification barrier on the server side (Alex Kolbasov, Reviewed by: Misha Dmitriev, Vadim Spector)

Repository: sentry
Updated Branches:
  refs/heads/sentry-ha-redesign 1c6ba5ebe -> aecf3f281


SENTRY-1601 Implement HMS Notification barrier on the server side (Alex Kolbasov, Reviewed by: Misha Dmitriev, Vadim Spector)


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/aecf3f28
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/aecf3f28
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/aecf3f28

Branch: refs/heads/sentry-ha-redesign
Commit: aecf3f281e4c87feb4063d87e02373f4ae747a1b
Parents: 1c6ba5e
Author: Alexander Kolbasov <ak...@cloudera.com>
Authored: Fri Mar 3 21:54:55 2017 -0800
Committer: Alexander Kolbasov <ak...@cloudera.com>
Committed: Fri Mar 3 21:54:55 2017 -0800

----------------------------------------------------------------------
 .../db/service/persistent/SentryStore.java      |  28 ++
 .../db/service/thrift/SentryMetrics.java        |   1 +
 .../thrift/SentryPolicyStoreProcessor.java      |  17 +-
 .../sentry/service/thrift/CounterWait.java      | 266 +++++++++++++++++++
 .../sentry/service/thrift/HMSFollower.java      |   9 +-
 .../sentry/service/thrift/TestCounterWait.java  |  90 +++++++
 6 files changed, 407 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/aecf3f28/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
index c1186ba..38f68cd 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
@@ -74,6 +74,7 @@ import org.apache.sentry.provider.db.service.thrift.TSentryMappingData;
 import org.apache.sentry.provider.db.service.thrift.TSentryPrivilege;
 import org.apache.sentry.provider.db.service.thrift.TSentryPrivilegeMap;
 import org.apache.sentry.provider.db.service.thrift.TSentryRole;
+import org.apache.sentry.service.thrift.CounterWait;
 import org.apache.sentry.service.thrift.ServiceConstants.PrivilegeScope;
 import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
 import org.datanucleus.store.rdbms.exceptions.MissingTableException;
@@ -143,6 +144,17 @@ public class SentryStore {
   private Thread privCleanerThread = null;
   private final TransactionManager tm;
 
+  /**
+   * counterWait is used to synchronize notifications between Thrift and HMSFollower.
+   * Technically it doesn't belong here, but the only thing that connects HMSFollower
+   * and Thrift API is SentryStore. An alternative could be a singleton CounterWait or
+   * some factory that returns CounterWait instances keyed by name, but this complicates
+   * things unnecessary.
+   * <p>
+   * Keeping it here isn't ideal but serves the purpose until we find a better home.
+   */
+  private final CounterWait counterWait = new CounterWait();
+
   public static Properties getDataNucleusProperties(Configuration conf)
           throws SentrySiteConfigurationException, IOException {
     Properties prop = new Properties();
@@ -237,6 +249,10 @@ public class SentryStore {
     return tm;
   }
 
+  public CounterWait getCounterWait() {
+    return counterWait;
+  }
+
   // ensure that the backend DB schema is set
   void verifySentryStoreSchema(boolean checkVersion) throws Exception {
     if (!checkVersion) {
@@ -420,6 +436,18 @@ public class SentryStore {
   }
 
   /**
+   * @return number of threads waiting for HMS notifications to be processed
+   */
+  public Gauge<Integer> getHMSWaitersCountGauge() {
+    return new Gauge<Integer>() {
+      @Override
+      public Integer getValue() {
+        return counterWait.waitersCount();
+      }
+    };
+  }
+
+  /**
    * Lets the test code know how many privs are in the db, so that we know
    * if they are in fact being cleaned up when not being referenced any more.
    * @return The number of rows in the db priv table.

http://git-wip-us.apache.org/repos/asf/sentry/blob/aecf3f28/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryMetrics.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryMetrics.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryMetrics.java
index 3f7542c..a359a04 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryMetrics.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryMetrics.java
@@ -121,6 +121,7 @@ public final class SentryMetrics {
       addGauge(SentryStore.class, "privilege_count",
               sentryStore.getPrivilegeCountGauge());
       addGauge(SentryStore.class, "group_count", sentryStore.getGroupCountGauge());
+      addGauge(SentryStore.class, "hms.waiters", sentryStore.getHMSWaitersCountGauge());
       gaugesAdded = true;
     }
   }

http://git-wip-us.apache.org/repos/asf/sentry/blob/aecf3f28/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
index 30e91ae..2ba3d38 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
@@ -60,6 +60,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.codahale.metrics.Timer;
+import static com.codahale.metrics.MetricRegistry.name;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
@@ -82,7 +84,10 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface {
   private final SentryStore sentryStore;
   private final NotificationHandlerInvoker notificationHandlerInvoker;
   private final ImmutableSet<String> adminGroups;
-  SentryMetrics sentryMetrics;
+  private SentryMetrics sentryMetrics;
+  private final Timer hmsWaitTimer =
+          SentryMetrics.getInstance().
+                  getTimer(name(SentryPolicyStoreProcessor.class, "hms", "wait"));
 
   private List<SentryPolicyStorePlugin> sentryPlugins = new LinkedList<SentryPolicyStorePlugin>();
 
@@ -1142,7 +1147,13 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface {
   }
 
   @Override
-  public TSentrySyncIDResponse sentry_sync_notifications(TSentrySyncIDRequest request) throws TException {
-    throw new UnsupportedOperationException("sentry_sync_notifications");
+  public TSentrySyncIDResponse sentry_sync_notifications(TSentrySyncIDRequest request)
+          throws TException {
+    try (Timer.Context timerContext = hmsWaitTimer.time()) {
+      // Wait until Sentry Server processes specified HMS Notification ID.
+      TSentrySyncIDResponse response = new TSentrySyncIDResponse();
+      response.setId(sentryStore.getCounterWait().waitFor(request.getId()));
+      return response;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/aecf3f28/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java
new file mode 100644
index 0000000..f593bff
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import org.apache.http.annotation.ThreadSafe;
+
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Waiting for counter to reach certain value.
+ * The counter starts from zero and its value increases over time.
+ * The class allows for multiple consumers waiting until the value of the
+ * counter reaches some value interesting to them.
+ * Consumers call {@link #waitFor(long)} which may either return
+ * immediately if the counter reached the specified value, or block
+ * until this value is reached.
+ * <p>
+ * All waiters should be waken up when the counter becomes equal or higher
+ * then the value they are waiting for.
+ * <p>
+ * The counter is updated by a single updater that should only increase the
+ * counter value.
+ * The updater calls the {@link #update(long)} method to update the counter
+ * value and this should wake up all threads waiting for any value smaller or
+ * equal to the new one.
+ * <p>
+ * The class is thread-safe.
+ * It is designed for use by multiple waiter threads and a single
+ * updater thread, but it will work correctly even in the presence of multiple
+ * updater threads.
+ */
+@ThreadSafe
+public final class CounterWait {
+  // Implementation notes.
+  //
+  // The implementation is based on:
+  //
+  // 1) Using an atomic counter value which guarantees consistency.
+  //    Since everyone needs only to know when the counter value reached the
+  //    certain value and the counter may only increase its value,
+  //    it is safe to update the counter by another thread after its value
+  //    was read.
+  //
+  // 2) Priority queue of waiters, sorted by their expected values. The smallest
+  //    value is always at the top of the queue. The priority queue itself
+  //    is thread-safe, so no locks are needed to protect access to it.
+  //
+  // Each waiter is implemented using a binary semaphore.
+  // This solves the problem of a wakeup that happens before the sleep -
+  // in this case the acquire() doesn't block and returns immediately.
+  //
+  // NOTE: We use PriorityBlockingQueue for waiters because it is thread-safe,
+  //       we are not using its blocking queue semantics.
+
+  /** Counter value. May only increase. */
+  private final AtomicLong currentId = new AtomicLong(0);
+
+  /**
+   * Waiters sorted by the value of the counter they are waiting for.
+   * Note that {@link PriorityBlockingQueue} is thread-safe.
+   * We are not using this as a blocking queue, but as a synchronized
+   * PriorityQueue.
+   */
+  private final PriorityBlockingQueue<ValueEvent> waiters =
+          new PriorityBlockingQueue<>();
+
+  /**
+   * Update the counter value and wake up all threads waiting for this
+   * value or any value below it.
+   * <p>
+   * The counter value should only increase.
+   * An attempt to decrease the value is raising
+   * {@link IllegalArgumentException}.
+   * The usual case is to have a single updater thread, but we enforce this
+   * by synchronizing the call.
+   *
+   * @param newValue the new counter value
+   */
+  public synchronized void update(long newValue) {
+    // Make sure the counter is never decremented
+    if (newValue < currentId.get()) {
+      throw new IllegalArgumentException("new counter value " +
+              String.valueOf(newValue) +
+              "is smaller then the previous one " + currentId);
+    }
+    currentId.set(newValue);
+
+    // Wake up any threads waiting for a counter to reach this value.
+    wakeup(newValue);
+  }
+
+
+  /**
+   * Wait for specified counter value.
+   * Returns immediately if the value is reached or blocks until the value
+   * is reached.
+   * Multiple threads can call the method concurrently.
+   *
+   * @param value requested counter value
+   * @return current counter value that should be no smaller then the requested
+   * value
+   */
+  public long waitFor(long value) {
+    // Fast path - counter value already reached, no need to block
+    if (value <= currentId.get()) {
+      return currentId.get();
+    }
+
+    // Enqueue the waiter for this value
+    ValueEvent eid = new ValueEvent(value);
+    waiters.put(eid);
+
+    // It is possible that between the fast path check and the time the
+    // value event is enqueued, the counter value already reached the requested
+    // value. In this case we return immediately.
+    if (value <= currentId.get()) {
+      return currentId.get();
+    }
+
+    // At this point we may be sure that by the time the event was enqueued,
+    // the counter was below the requested value. This means that update()
+    // is guaranteed to wake us up when the counter reaches the requested value.
+    // The wake up may actually happen before we start waiting, in this case
+    // the event's blocking queue will be non-empty and the waitFor() below
+    // will not block, so it is safe to wake up before the wait.
+    // So sit tight and wait patiently.
+    eid.waitFor();
+    return currentId.get();
+  }
+
+  /**
+   * Wake up any threads waiting for a counter to reach specified value
+   * Peek at the top of the queue. If the queue is empty or the top value
+   * exceeds the current value, we are done. Otherwise wakeup the top thread,
+   * remove the corresponding waiter and continue.
+   * <p>
+   * Note that the waiter may be removed under our nose by
+   * {@link #waitFor(long)} method, but this is Ok - in this case
+   * waiters.remove() will just return false.
+   *
+   * @param value current counter value
+   */
+  private void wakeup(long value) {
+    while (true) {
+      // Get the top of the waiters queue or null if it is empty
+      ValueEvent e = waiters.poll();
+      if (e == null) {
+        // Queue is empty - return.
+        return;
+      }
+      // No one to wake up, return event to the queue and exit
+      if (e.getValue() > value) {
+        waiters.add(e);
+        return;
+      }
+      // Due for wake-up call
+      e.wakeup();
+    }
+  }
+
+  // Useful for debugging
+  @Override
+  public String toString() {
+    return "CounterWait{" + "currentId=" + currentId +
+            ", waiters=" + waiters + "}";
+  }
+
+  /**
+   * Return number of waiters. This is mostly useful for metrics/debugging
+   *
+   * @return number of sleeping waiters
+   */
+  public int waitersCount() {
+    return waiters.size();
+  }
+
+  /**
+   * Representation of the waiting event.
+   * The waiting event consists of the expected value and a binary semaphore.
+   * <p>
+   * Each thread waiting for the given value, creates a ValueEvent and tries
+   * to acquire a semaphore. This blocks until the semaphore is released.
+   * <p>
+   * ValueEvents are stored in priority queue sorted by value, so they should be
+   * comparable by the value.
+   */
+  private static class ValueEvent implements Comparable<ValueEvent> {
+    /** Value waited for. */
+    private final long value;
+    /** Binary semaphore to synchronize waiters */
+    private final Semaphore semaphore = new Semaphore(1);
+
+    /**
+     * Instantiates a new Value event.
+     *
+     * @param v the expected value
+     */
+    ValueEvent(long v) {
+      this.value = v;
+      // Acquire the semaphore. Subsequent calls to waitFor() will block until
+      // wakeup() releases the semaphore.
+      semaphore.acquireUninterruptibly(); // Will not block
+    }
+
+    /** Wait until signaled. May return immediately if already signalled. */
+    void waitFor() {
+      semaphore.acquireUninterruptibly();
+    }
+
+    /** @return the value we are waiting for */
+    long getValue() {
+      return value;
+    }
+
+    /** Wakeup the waiting thread. */
+    void wakeup() {
+      semaphore.release();
+    }
+
+    /**
+     * Compare objects by value
+     */
+    @Override
+    public int compareTo(final ValueEvent o) {
+      return value == o.value ? 0
+              : value < o.value ? -1
+              : 1;
+    }
+
+    /**
+     * Use identity comparison of objects
+     */
+    @Override
+    public boolean equals(final Object o) {
+      return (this == o);
+    }
+
+    @Override
+    public int hashCode() {
+      return (int) (value ^ (value >>> 32));
+    }
+
+    @Override
+    public String toString() {
+      return String.valueOf(value);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/aecf3f28/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
index 8b07f5b..f3f51da 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
@@ -345,6 +345,7 @@ public class HMSFollower implements Runnable {
   void processNotificationEvents(List<NotificationEvent> events) throws
       SentryInvalidHMSEventException, SentryInvalidInputException {
     SentryJSONMessageDeserializer deserializer = new SentryJSONMessageDeserializer();
+    final CounterWait counterWait = sentryStore.getCounterWait();
 
     for (NotificationEvent event : events) {
       String dbName, tableName, oldLocation, newLocation, location;
@@ -479,7 +480,13 @@ public class HMSFollower implements Runnable {
           //TODO: Handle HDFS plugin
           break;
       }
-    currentEventID = event.getEventId();
+      currentEventID = event.getEventId();
+      // Wake up any HMS waiters that are waiting for this ID.
+      // counterWait should never be null, but tests mock SentryStore and a mocked one
+      // doesn't have it.
+      if (counterWait != null) {
+        counterWait.update(currentEventID);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/sentry/blob/aecf3f28/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java
new file mode 100644
index 0000000..a700178
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestCounterWait.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import junit.framework.TestCase;
+
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+
+/**
+ * Test for CounterWait class
+ */
+public class TestCounterWait extends TestCase {
+  // Used to verify that wakeups happen in the right order
+  private BlockingDeque<Long> outSyncQueue = new LinkedBlockingDeque<>();
+  // Number of waiters to test for
+  private int nthreads = 20;
+
+  public void testWaitFor() throws Exception {
+    // Create a thread for each waiter
+    ExecutorService executor = Executors.newFixedThreadPool(nthreads);
+
+    final CounterWait waiter = new CounterWait();
+
+    // Initial value is zero, so this shouldn't block
+    assertEquals(0, waiter.waitFor(0));
+
+    // Create a pair of threads waiting for each value in [1, nthreads / 2]
+    // We use pair of threads per value to verify that both are waken up
+    for (int i = 0; i < nthreads; i++) {
+      final int finalI = i + 2;
+      final int val = finalI / 2;
+      executor.execute(new Runnable() {
+                         public void run() {
+                           long r = waiter.waitFor(val); // blocks
+                           outSyncQueue.add(r); // Once we wake up, post result
+                         }
+                       }
+      );
+    }
+
+    // Wait until all threads are asleep.
+    while(waiter.waitersCount() < nthreads) {
+      sleep(20);
+    }
+
+    // All threads should be blocked, so outSyncQueue should be empty
+    assertTrue(outSyncQueue.isEmpty());
+
+    // Post a counter update for each value in [ 1, nthreads / 2 ]
+    // After eac update two threads should be waken up and the corresponding pair of
+    // values should appear in the outSyncQueue.
+    for (int i = 0; i < nthreads / 2; i++) {
+      waiter.update(i + 1);
+      long r = outSyncQueue.takeFirst();
+      assertEquals(r, i + 1);
+      r = outSyncQueue.takeFirst();
+      assertEquals(r, i + 1);
+      assertTrue(outSyncQueue.isEmpty());
+    }
+
+    // We are done
+    executor.shutdown();
+  }
+
+  private void sleep(long ms) {
+    try {
+      Thread.sleep(ms);
+    } catch (InterruptedException e) {
+    }
+  }
+}