You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by co...@apache.org on 2017/11/16 09:56:46 UTC
[09/32] sentry git commit: SENTRY-2031: Add trigger mechanism for
Sentry to pull full path snapshot from HMS (Vadim Spector,
reviewed by Sergio Pena)
SENTRY-2031: Add trigger mechanism for Sentry to pull full path snapshot from HMS (Vadim Spector, reviewed by Sergio Pena)
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/a001e445
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/a001e445
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/a001e445
Branch: refs/heads/akolb-cli
Commit: a001e445598f62bbf1551282f64d3d600c4775a3
Parents: 5aea068
Author: Vadim Spector <vs...@cloudera.com>
Authored: Mon Nov 6 10:44:32 2017 -0800
Committer: Vadim Spector <vs...@cloudera.com>
Committed: Mon Nov 6 10:44:32 2017 -0800
----------------------------------------------------------------------
.../apache/sentry/hdfs/ServiceConstants.java | 2 +-
.../sentry/service/thrift/HMSFollower.java | 32 ++++++-
.../sentry/service/thrift/TestHMSFollower.java | 98 ++++++++++++++++++++
3 files changed, 130 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sentry/blob/a001e445/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
index f7412a3..a9afb15 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
@@ -44,7 +44,7 @@ public class ServiceConstants {
public static final String SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC = "sentry.hdfs.sync.metastore.cache.max-tables-per-rpc";
public static final int SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC_DEFAULT = 100;
static final String SENTRY_SERVICE_FULL_UPDATE_SIGNAL = "sentry.hdfs.sync.full-update-signal";
- static final String SENTRY_SERVICE_FULL_UPDATE_PUBSUB = "sentry.hdfs.sync.full-update-pubsub";
+ public static final String SENTRY_SERVICE_FULL_UPDATE_PUBSUB = "sentry.hdfs.sync.full-update-pubsub";
public static final String SENTRY_HDFS_INTEGRATION_PATH_PREFIXES = "sentry.hdfs.integration.path.prefixes";
public static final String[] SENTRY_HDFS_INTEGRATION_PATH_PREFIXES_DEFAULT =
http://git-wip-us.apache.org/repos/asf/sentry/blob/a001e445/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
index 0861132..c4cc918 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
@@ -18,6 +18,8 @@
package org.apache.sentry.service.thrift;
+import org.apache.sentry.core.common.utils.PubSub;
+import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME;
import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME_DEPRECATED;
@@ -26,6 +28,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.jdo.JDODataStoreException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
@@ -41,10 +44,12 @@ import org.slf4j.LoggerFactory;
* update permissions stored in Sentry using SentryStore and also update the < obj,path > state
* stored for HDFS-Sentry sync.
*/
-public class HMSFollower implements Runnable, AutoCloseable {
+public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber {
private static final Logger LOGGER = LoggerFactory.getLogger(HMSFollower.class);
+ private static final String FULL_UPDATE_TRIGGER = "FULL UPDATE TRIGGER: ";
private static boolean connectedToHms = false;
+
private SentryHMSClient client;
private final Configuration authzConf;
private final SentryStore sentryStore;
@@ -52,6 +57,7 @@ public class HMSFollower implements Runnable, AutoCloseable {
private boolean readyToServe;
private final HiveNotificationFetcher notificationFetcher;
private final boolean hdfsSyncEnabled;
+ private final AtomicBoolean fullUpdateHMS = new AtomicBoolean(false);
private final LeaderStatusMonitor leaderMonitor;
@@ -99,6 +105,13 @@ public class HMSFollower implements Runnable, AutoCloseable {
client = new SentryHMSClient(authzConf, hiveConnectionFactory);
hdfsSyncEnabled = SentryServiceUtil.isHDFSSyncEnabledNoCache(authzConf); // no cache to test different settings for hdfs sync
notificationFetcher = new HiveNotificationFetcher(sentryStore, hiveConnectionFactory);
+
+ // subscribe to full update notification
+ if (conf.getBoolean(ServerConfig.SENTRY_SERVICE_FULL_UPDATE_PUBSUB, false)) {
+ LOGGER.info(FULL_UPDATE_TRIGGER + "subscribing to topic " + PubSub.Topic.HDFS_SYNC_HMS.getName());
+ PubSub.getInstance().subscribe(PubSub.Topic.HDFS_SYNC_HMS, this);
+ }
+
}
@VisibleForTesting
@@ -243,6 +256,13 @@ public class HMSFollower implements Runnable, AutoCloseable {
return true;
}
+ // check if forced full update is required, reset update flag to false
+ // to only do it once per forced full update request.
+ if (fullUpdateHMS.compareAndSet(true, false)) {
+ LOGGER.info(FULL_UPDATE_TRIGGER + "initiating full HMS snapshot request");
+ return true;
+ }
+
return false;
}
@@ -435,4 +455,14 @@ public class HMSFollower implements Runnable, AutoCloseable {
counterWait.update(eventId);
}
+
+ /**
+ * PubSub.Subscriber callback API
+ */
+ @Override
+ public void onMessage(PubSub.Topic topic, String message) {
+ Preconditions.checkArgument(topic == PubSub.Topic.HDFS_SYNC_HMS, "Unexpected topic %s instead of %s", topic, PubSub.Topic.HDFS_SYNC_HMS);
+ LOGGER.info(FULL_UPDATE_TRIGGER + "Received [{}, {}] notification", topic, message);
+ fullUpdateHMS.set(true);
+ }
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/a001e445/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
index 7d64375..bbcf093 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
@@ -49,10 +49,13 @@ import org.apache.hive.hcatalog.messaging.HCatEventMessage.EventType;
import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
import org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars;
import org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageFactory;
+import org.apache.sentry.core.common.utils.PubSub;
import org.apache.sentry.hdfs.UniquePathsUpdate;
import org.apache.sentry.provider.db.service.persistent.PathsImage;
import org.apache.sentry.provider.db.service.persistent.SentryStore;
import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable;
+import static org.apache.sentry.hdfs.ServiceConstants.ServerConfig.SENTRY_SERVICE_FULL_UPDATE_PUBSUB;
+
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
@@ -79,6 +82,7 @@ public class TestHMSFollower {
hiveConnectionFactory = new HiveSimpleConnectionFactory(configuration, new HiveConf());
hiveConnectionFactory.init();
configuration.set("sentry.hive.sync.create", "true");
+ configuration.set(SENTRY_SERVICE_FULL_UPDATE_PUBSUB, "true");
// enable HDFS sync, so perm and path changes will be saved into DB
configuration.set(ServiceConstants.ServerConfig.PROCESSOR_FACTORIES, "org.apache.sentry.hdfs.SentryHDFSServiceProcessorFactory");
@@ -141,6 +145,100 @@ public class TestHMSFollower {
}
@Test
+ public void testPersistAFullSnapshotWhenFullSnapshotTrigger() throws Exception {
+ /*
+ * TEST CASE
+ *
+ * Simulates (by using mocks) the following:
+ *
+ * HMS client always returns the paths image with the eventId == 1.
+ *
+ * On the 1st run: Sentry has not processed any notifications, so this
+ * should trigger a new full HMS snapshot request with the eventId = 1
+ *
+ * On the 2nd run: Sentry store returns the latest eventId == 1,
+ * which matches the eventId returned by HMS client. Because of the match,
+ * no full update is triggered.
+ *
+ * On the 3d run: before the run, full update flag in HMSFollower is set via
+ * publish-subscribe mechanism.
+ * Sentry store still returns the latest eventId == 1,
+ * which matches the eventId returned by HMS client. Because of the match,
+ * no full update should be triggered. However, because of the trigger set,
+ * a new full HMS snapshot will be triggered.
+ *
+ * On the 4th run: Sentry store returns the latest eventId == 1,
+ * which matches the eventId returned by HMS client. Because of the match,
+ * no full update is triggered. This is to check that forced trigger set
+ * for run 3 only works once.
+ *
+ */
+
+ final long SENTRY_PROCESSED_EVENT_ID = SentryStore.EMPTY_NOTIFICATION_ID;
+ final long HMS_PROCESSED_EVENT_ID = 1L;
+
+ // Mock that returns a full snapshot
+ Map<String, Collection<String>> snapshotObjects = new HashMap<>();
+ snapshotObjects.put("db", Sets.newHashSet("/db"));
+ snapshotObjects.put("db.table", Sets.newHashSet("/db/table"));
+ PathsImage fullSnapshot = new PathsImage(snapshotObjects, HMS_PROCESSED_EVENT_ID, 1);
+
+ // Mock that returns the current HMS notification ID
+ when(hmsClientMock.getCurrentNotificationEventId())
+ .thenReturn(new CurrentNotificationEventId(fullSnapshot.getId()));
+
+ SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class);
+ when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot);
+
+ HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
+ hmsConnectionMock, hiveInstance);
+ hmsFollower.setSentryHmsClient(sentryHmsClient);
+
+ // 1st run should get a full snapshot because AuthzPathsMapping is empty
+ when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID);
+ when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(true);
+ when(sentryStore.isHmsNotificationEmpty()).thenReturn(true);
+ hmsFollower.run();
+ verify(sentryStore, times(1)).persistFullPathsImage(
+ fullSnapshot.getPathImage(), fullSnapshot.getId());
+ // Saving notificationID is in the same transaction of saving full snapshot
+ verify(sentryStore, times(0)).persistLastProcessedNotificationID(fullSnapshot.getId());
+
+ reset(sentryStore);
+
+ // 2nd run should not get a snapshot because is already processed
+ when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId());
+ when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
+ hmsFollower.run();
+ verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong());
+ verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
+
+ reset(sentryStore);
+
+ // 3d run should not get a snapshot because is already processed,
+ // but because of full update trigger it will, as in the first run
+ PubSub.getInstance().publish(PubSub.Topic.HDFS_SYNC_HMS, "message");
+
+ when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId());
+ when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
+ hmsFollower.run();
+ verify(sentryStore, times(1)).persistFullPathsImage(
+ fullSnapshot.getPathImage(), fullSnapshot.getId());
+ verify(sentryStore, times(0)).persistLastProcessedNotificationID(fullSnapshot.getId());
+
+ reset(sentryStore);
+
+ // 4th run should not get a snapshot because is already processed and publish-subscribe
+ // trigger is only supposed to work once. This is exactly as 2nd run.
+ when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId());
+ when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
+ hmsFollower.run();
+ verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong());
+ verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
+
+ }
+
+ @Test
public void testPersistAFullSnapshotWhenLastHmsNotificationIsLowerThanLastProcessed()
throws Exception {
/*