You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by co...@apache.org on 2017/11/16 09:56:46 UTC

[09/32] sentry git commit: SENTRY-2031: Add trigger mechanism for Sentry to pull full path snapshot from HMS (Vadim Spector, reviewed by Sergio Pena)

SENTRY-2031: Add trigger mechanism for Sentry to pull full path snapshot from HMS (Vadim Spector, reviewed by Sergio Pena)


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/a001e445
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/a001e445
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/a001e445

Branch: refs/heads/akolb-cli
Commit: a001e445598f62bbf1551282f64d3d600c4775a3
Parents: 5aea068
Author: Vadim Spector <vs...@cloudera.com>
Authored: Mon Nov 6 10:44:32 2017 -0800
Committer: Vadim Spector <vs...@cloudera.com>
Committed: Mon Nov 6 10:44:32 2017 -0800

----------------------------------------------------------------------
 .../apache/sentry/hdfs/ServiceConstants.java    |  2 +-
 .../sentry/service/thrift/HMSFollower.java      | 32 ++++++-
 .../sentry/service/thrift/TestHMSFollower.java  | 98 ++++++++++++++++++++
 3 files changed, 130 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/a001e445/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
index f7412a3..a9afb15 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
@@ -44,7 +44,7 @@ public class ServiceConstants {
     public static final String SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC = "sentry.hdfs.sync.metastore.cache.max-tables-per-rpc";
     public static final int SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC_DEFAULT = 100;
     static final String SENTRY_SERVICE_FULL_UPDATE_SIGNAL = "sentry.hdfs.sync.full-update-signal";
-    static final String SENTRY_SERVICE_FULL_UPDATE_PUBSUB = "sentry.hdfs.sync.full-update-pubsub";
+    public static final String SENTRY_SERVICE_FULL_UPDATE_PUBSUB = "sentry.hdfs.sync.full-update-pubsub";
 
     public static final String SENTRY_HDFS_INTEGRATION_PATH_PREFIXES = "sentry.hdfs.integration.path.prefixes";
     public static final String[] SENTRY_HDFS_INTEGRATION_PATH_PREFIXES_DEFAULT =

http://git-wip-us.apache.org/repos/asf/sentry/blob/a001e445/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
index 0861132..c4cc918 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
@@ -18,6 +18,8 @@
 
 package org.apache.sentry.service.thrift;
 
+import org.apache.sentry.core.common.utils.PubSub;
+import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
 
 import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME;
 import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME_DEPRECATED;
@@ -26,6 +28,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 import javax.jdo.JDODataStoreException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
@@ -41,10 +44,12 @@ import org.slf4j.LoggerFactory;
  * update permissions stored in Sentry using SentryStore and also update the &lt obj,path &gt state
  * stored for HDFS-Sentry sync.
  */
-public class HMSFollower implements Runnable, AutoCloseable {
+public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(HMSFollower.class);
+  private static final String FULL_UPDATE_TRIGGER = "FULL UPDATE TRIGGER: ";
   private static boolean connectedToHms = false;
+
   private SentryHMSClient client;
   private final Configuration authzConf;
   private final SentryStore sentryStore;
@@ -52,6 +57,7 @@ public class HMSFollower implements Runnable, AutoCloseable {
   private boolean readyToServe;
   private final HiveNotificationFetcher notificationFetcher;
   private final boolean hdfsSyncEnabled;
+  private final AtomicBoolean fullUpdateHMS = new AtomicBoolean(false);
 
   private final LeaderStatusMonitor leaderMonitor;
 
@@ -99,6 +105,13 @@ public class HMSFollower implements Runnable, AutoCloseable {
     client = new SentryHMSClient(authzConf, hiveConnectionFactory);
     hdfsSyncEnabled = SentryServiceUtil.isHDFSSyncEnabledNoCache(authzConf); // no cache to test different settings for hdfs sync
     notificationFetcher = new HiveNotificationFetcher(sentryStore, hiveConnectionFactory);
+
+    // subscribe to full update notification
+    if (conf.getBoolean(ServerConfig.SENTRY_SERVICE_FULL_UPDATE_PUBSUB, false)) {
+      LOGGER.info(FULL_UPDATE_TRIGGER + "subscribing to topic " + PubSub.Topic.HDFS_SYNC_HMS.getName());
+      PubSub.getInstance().subscribe(PubSub.Topic.HDFS_SYNC_HMS, this);
+    }
+
   }
 
   @VisibleForTesting
@@ -243,6 +256,13 @@ public class HMSFollower implements Runnable, AutoCloseable {
       return true;
     }
 
+    // check if forced full update is required, reset update flag to false
+    // to only do it once per forced full update request.
+    if (fullUpdateHMS.compareAndSet(true, false)) {
+      LOGGER.info(FULL_UPDATE_TRIGGER + "initiating full HMS snapshot request");
+      return true;
+    }
+
     return false;
   }
 
@@ -435,4 +455,14 @@ public class HMSFollower implements Runnable, AutoCloseable {
 
     counterWait.update(eventId);
   }
+
+  /**
+   * PubSub.Subscriber callback API
+   */
+  @Override
+  public void onMessage(PubSub.Topic topic, String message) {
+    Preconditions.checkArgument(topic == PubSub.Topic.HDFS_SYNC_HMS, "Unexpected topic %s instead of %s", topic, PubSub.Topic.HDFS_SYNC_HMS);
+    LOGGER.info(FULL_UPDATE_TRIGGER + "Received [{}, {}] notification", topic, message);
+    fullUpdateHMS.set(true);
+  }
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/a001e445/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
index 7d64375..bbcf093 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
@@ -49,10 +49,13 @@ import org.apache.hive.hcatalog.messaging.HCatEventMessage.EventType;
 import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
 import org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars;
 import org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageFactory;
+import org.apache.sentry.core.common.utils.PubSub;
 import org.apache.sentry.hdfs.UniquePathsUpdate;
 import org.apache.sentry.provider.db.service.persistent.PathsImage;
 import org.apache.sentry.provider.db.service.persistent.SentryStore;
 import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable;
+import static org.apache.sentry.hdfs.ServiceConstants.ServerConfig.SENTRY_SERVICE_FULL_UPDATE_PUBSUB;
+
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -79,6 +82,7 @@ public class TestHMSFollower {
     hiveConnectionFactory = new HiveSimpleConnectionFactory(configuration, new HiveConf());
     hiveConnectionFactory.init();
     configuration.set("sentry.hive.sync.create", "true");
+    configuration.set(SENTRY_SERVICE_FULL_UPDATE_PUBSUB, "true");
 
     // enable HDFS sync, so perm and path changes will be saved into DB
     configuration.set(ServiceConstants.ServerConfig.PROCESSOR_FACTORIES, "org.apache.sentry.hdfs.SentryHDFSServiceProcessorFactory");
@@ -141,6 +145,100 @@ public class TestHMSFollower {
   }
 
   @Test
+  public void testPersistAFullSnapshotWhenFullSnapshotTrigger() throws Exception {
+    /*
+     * TEST CASE
+     *
+     * Simulates (by using mocks) the following:
+     *
+     * HMS client always returns the paths image with the eventId == 1.
+     *
+     * On the 1st run:  Sentry has not processed any notifications, so this
+     * should trigger a new full HMS snapshot request with the eventId = 1
+     *
+     * On the 2nd run: Sentry store returns the latest eventId == 1,
+     * which matches the eventId returned by HMS client. Because of the match,
+     * no full update is triggered.
+     *
+     * On the 3d run: before the run, full update flag in HMSFollower is set via
+     * publish-subscribe mechanism.
+     * Sentry store still returns the latest eventId == 1,
+     * which matches the eventId returned by HMS client. Because of the match,
+     * no full update should be triggered. However, because of the trigger set,
+     * a new full HMS snapshot will be triggered.
+     *
+     * On the 4th run: Sentry store returns the latest eventId == 1,
+     * which matches the eventId returned by HMS client. Because of the match,
+     * no full update is triggered. This is to check that forced trigger set
+     * for run 3 only works once.
+     *
+     */
+
+    final long SENTRY_PROCESSED_EVENT_ID = SentryStore.EMPTY_NOTIFICATION_ID;
+    final long HMS_PROCESSED_EVENT_ID = 1L;
+
+    // Mock that returns a full snapshot
+    Map<String, Collection<String>> snapshotObjects = new HashMap<>();
+    snapshotObjects.put("db", Sets.newHashSet("/db"));
+    snapshotObjects.put("db.table", Sets.newHashSet("/db/table"));
+    PathsImage fullSnapshot = new PathsImage(snapshotObjects, HMS_PROCESSED_EVENT_ID, 1);
+
+    // Mock that returns the current HMS notification ID
+    when(hmsClientMock.getCurrentNotificationEventId())
+        .thenReturn(new CurrentNotificationEventId(fullSnapshot.getId()));
+
+    SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class);
+    when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot);
+
+    HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
+        hmsConnectionMock, hiveInstance);
+    hmsFollower.setSentryHmsClient(sentryHmsClient);
+
+    // 1st run should get a full snapshot because AuthzPathsMapping is empty
+    when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID);
+    when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(true);
+    when(sentryStore.isHmsNotificationEmpty()).thenReturn(true);
+    hmsFollower.run();
+    verify(sentryStore, times(1)).persistFullPathsImage(
+        fullSnapshot.getPathImage(), fullSnapshot.getId());
+    // Saving notificationID is in the same transaction of saving full snapshot
+    verify(sentryStore, times(0)).persistLastProcessedNotificationID(fullSnapshot.getId());
+
+    reset(sentryStore);
+
+    // 2nd run should not get a snapshot because is already processed
+    when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId());
+    when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
+    hmsFollower.run();
+    verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong());
+    verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
+
+    reset(sentryStore);
+
+    // 3d run should not get a snapshot because is already processed,
+    // but because of full update trigger it will, as in the first run
+    PubSub.getInstance().publish(PubSub.Topic.HDFS_SYNC_HMS, "message");
+
+    when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId());
+    when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
+    hmsFollower.run();
+    verify(sentryStore, times(1)).persistFullPathsImage(
+        fullSnapshot.getPathImage(), fullSnapshot.getId());
+    verify(sentryStore, times(0)).persistLastProcessedNotificationID(fullSnapshot.getId());
+
+    reset(sentryStore);
+
+    // 4th run should not get a snapshot because is already processed and publish-subscribe
+    // trigger is only supposed to work once. This is exactly as 2nd run.
+    when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId());
+    when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
+    hmsFollower.run();
+    verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong());
+    verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
+
+  }
+
+  @Test
   public void testPersistAFullSnapshotWhenLastHmsNotificationIsLowerThanLastProcessed()
       throws Exception {
     /*