You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by sp...@apache.org on 2018/05/31 03:32:16 UTC
[38/86] sentry git commit: Revert "SENTRY-2208: Refactor out Sentry
service into own module from sentry-provider-db (Anthony Young-Garner,
reviewed by Sergio Pena, Steve Moist, Na Li)"
http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java
new file mode 100644
index 0000000..2505da9
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.provider.db.service.persistent;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.imps.DefaultACLProvider;
+import org.apache.curator.framework.recipes.leader.LeaderSelector;
+import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.sentry.service.thrift.JaasConfiguration;
+import org.apache.zookeeper.ZooDefs.Perms;
+import org.apache.zookeeper.client.ZooKeeperSaslClient;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ThreadFactory;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.sentry.service.common.ServiceConstants.ServerConfig.*;
+
+/**
+ * HAContext stores the global ZooKeeper related context.
+ * <p>
+ * This class is a singleton - only one ZooKeeper context is maintained.
+ */
+public final class HAContext implements AutoCloseable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(HAContext.class);
+ private static HAContext serverHAContext = null;
+ private static boolean aclUnChecked = true;
+
+ private static final String SENTRY_ZK_JAAS_NAME = "SentryClient";
+ private static final String SHUTDOWN_THREAD_NAME = "ha-context-shutdown";
+ private final String zookeeperQuorum;
+ private final String namespace;
+
+ private final boolean zkSecure;
+ private final List<ACL> saslACL;
+
+ private final CuratorFramework curatorFramework;
+
+ private HAContext(Configuration conf) throws IOException {
+ this.zookeeperQuorum = conf.get(SENTRY_HA_ZOOKEEPER_QUORUM, "");
+ int retriesMaxCount = conf.getInt(SENTRY_HA_ZOOKEEPER_RETRIES_MAX_COUNT,
+ SENTRY_HA_ZOOKEEPER_RETRIES_MAX_COUNT_DEFAULT);
+ int sleepMsBetweenRetries = conf.getInt(SENTRY_HA_ZOOKEEPER_SLEEP_BETWEEN_RETRIES_MS,
+ SENTRY_HA_ZOOKEEPER_SLEEP_BETWEEN_RETRIES_MS_DEFAULT);
+ String ns = conf.get(SENTRY_HA_ZOOKEEPER_NAMESPACE, SENTRY_HA_ZOOKEEPER_NAMESPACE_DEFAULT);
+ // Namespace shouldn't start with slash.
+ // If config namespace starts with slash, remove it first
+ this.namespace = ns.startsWith("/") ? ns.substring(1) : ns;
+
+ this.zkSecure = conf.getBoolean(SENTRY_HA_ZOOKEEPER_SECURITY,
+ SENTRY_HA_ZOOKEEPER_SECURITY_DEFAULT);
+ this.validateConf();
+ ACLProvider aclProvider;
+ if (zkSecure) {
+ LOGGER.info("Connecting to ZooKeeper with SASL/Kerberos and using 'sasl' ACLs");
+ this.setJaasConfiguration(conf);
+ System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY,
+ SENTRY_ZK_JAAS_NAME);
+ saslACL = Lists.newArrayList();
+ saslACL.add(new ACL(Perms.ALL, new Id("sasl", getServicePrincipal(conf,
+ PRINCIPAL))));
+ saslACL.add(new ACL(Perms.ALL, new Id("sasl", getServicePrincipal(conf,
+ SERVER_HA_ZOOKEEPER_CLIENT_PRINCIPAL))));
+ aclProvider = new SASLOwnerACLProvider();
+ String allowConnect = conf.get(ALLOW_CONNECT);
+
+ if (!Strings.isNullOrEmpty(allowConnect)) {
+ for (String principal : allowConnect.split("\\s*,\\s*")) {
+ LOGGER.info("Adding acls for {}", principal);
+ saslACL.add(new ACL(Perms.ALL, new Id("sasl", principal)));
+ }
+ }
+ } else {
+ saslACL = null;
+ LOGGER.info("Connecting to ZooKeeper without authentication");
+ aclProvider = new DefaultACLProvider();
+ }
+
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(sleepMsBetweenRetries, retriesMaxCount);
+ this.curatorFramework = CuratorFrameworkFactory.builder()
+ .namespace(this.namespace)
+ .connectString(this.zookeeperQuorum)
+ .retryPolicy(retryPolicy)
+ .aclProvider(aclProvider)
+ .build();
+ }
+
+ private void start() {
+ if (curatorFramework.getState() != CuratorFrameworkState.STARTED) {
+ curatorFramework.start();
+ }
+ }
+
+ /**
+ * Create a singleton instance of ZooKeeper context (if needed) and return it.
+ * The instance returned is already running.
+ *
+ * @param conf Configuration, The following keys are used:
+ * <ul>
+ * <li>SENTRY_HA_ZOOKEEPER_QUORUM</li>
+ * <li>SENTRY_HA_ZOOKEEPER_RETRIES_MAX_COUNT</li>
+ * <li>SENTRY_HA_ZOOKEEPER_SLEEP_BETWEEN_RETRIES_MS</li>
+ * <li>SENTRY_HA_ZOOKEEPER_NAMESPACE</li>
+ * <li>SENTRY_HA_ZOOKEEPER_SECURITY</li>
+ * <li>LOGIN_CONTEXT_NAME_KEY</li>
+ * <li>PRINCIPAL</li>
+ * <li>SERVER_HA_ZOOKEEPER_CLIENT_PRINCIPAL</li>
+ * <li>ALLOW_CONNECT</li>
+ * <li>SERVER_HA_ZOOKEEPER_CLIENT_TICKET_CACHE</li>
+ * <li>SERVER_HA_ZOOKEEPER_CLIENT_KEYTAB</li>
+ * <li>RPC_ADDRESS</li>
+ * </ul>
+ * @return Global ZooKeeper context.
+ * @throws Exception
+ */
+ static synchronized HAContext getHAContext(Configuration conf) throws IOException {
+ if (serverHAContext != null) {
+ return serverHAContext;
+ }
+ serverHAContext = new HAContext(conf);
+
+ serverHAContext.start();
+ ThreadFactory haContextShutdownThreadFactory = new ThreadFactoryBuilder()
+ .setDaemon(false)
+ .setNameFormat(SHUTDOWN_THREAD_NAME)
+ .build();
+ Runtime.getRuntime()
+ .addShutdownHook(haContextShutdownThreadFactory
+ .newThread(new Runnable() {
+ @Override
+ public void run() {
+ LOGGER.info("ShutdownHook closing curator framework");
+ try {
+ if (serverHAContext != null) {
+ serverHAContext.close();
+ }
+ } catch (Throwable t) {
+ LOGGER.error("Error stopping curator framework", t);
+ }
+ }
+ }));
+ return serverHAContext;
+ }
+
+ /**
+ * HA context for server which verifies the ZK ACLs on namespace
+ *
+ * @param conf Configuration - see {@link #getHAContext(Configuration)}
+ * @return Server ZK context
+ * @throws Exception
+ */
+ public static HAContext getHAServerContext(Configuration conf) throws Exception {
+ HAContext serverContext = getHAContext(conf);
+ serverContext.checkAndSetACLs();
+ return serverContext;
+ }
+
+ /**
+ * Reset existing HA context.
+ * Should be only used by tests to provide different configurations.
+ */
+ public static void resetHAContext() {
+ HAContext oldContext = serverHAContext;
+ if (oldContext != null) {
+ try {
+ oldContext.close();
+ } catch (Exception e) {
+ LOGGER.error("Failed to close HACOntext", e);
+ }
+ }
+ serverHAContext = null;
+ }
+
+
+ private void validateConf() {
+ checkNotNull(zookeeperQuorum, "Zookeeper Quorum should not be null.");
+ checkNotNull(namespace, "Zookeeper namespace should not be null.");
+ }
+
+ private static String getServicePrincipal(Configuration conf, String confProperty) {
+ String principal = checkNotNull(conf.get(confProperty));
+ checkArgument(!principal.isEmpty(), "Server principal is empty.");
+ return principal.split("[/@]")[0];
+ }
+
+ private void checkAndSetACLs() throws Exception {
+ if (zkSecure && aclUnChecked) {
+ // If znodes were previously created without security enabled, and now it is, we need to go
+ // through all existing znodes and set the ACLs for them. This is done just once at the startup
+ // We can't get the namespace znode through curator; have to go through zk client
+ String newNamespace = "/" + curatorFramework.getNamespace();
+ if (curatorFramework.getZookeeperClient().getZooKeeper().exists(newNamespace, null) != null) {
+ List<ACL> acls = curatorFramework.getZookeeperClient().getZooKeeper().getACL(newNamespace, new Stat());
+ if (acls.isEmpty() || !acls.get(0).getId().getScheme().equals("sasl")) {
+ LOGGER.info("'sasl' ACLs not set; setting...");
+ List<String> children = curatorFramework.getZookeeperClient().getZooKeeper().getChildren(newNamespace,
+ null);
+ for (String child : children) {
+ this.checkAndSetACLs("/" + child);
+ }
+ curatorFramework.getZookeeperClient().getZooKeeper().setACL(newNamespace, saslACL, -1);
+ }
+ }
+ aclUnChecked = false;
+ }
+ }
+
+ private void checkAndSetACLs(String path) throws Exception {
+ LOGGER.info("Setting acls on {}", path);
+ List<String> children = curatorFramework.getChildren().forPath(path);
+ for (String child : children) {
+ this.checkAndSetACLs(path + "/" + child);
+ }
+ curatorFramework.setACL().withACL(saslACL).forPath(path);
+ }
+
+ // This gets ignored during most tests, see ZKXTestCaseWithSecurity#setupZKServer()
+ private void setJaasConfiguration(Configuration conf) throws IOException {
+ if ("false".equalsIgnoreCase(conf.get(
+ SERVER_HA_ZOOKEEPER_CLIENT_TICKET_CACHE,
+ SERVER_HA_ZOOKEEPER_CLIENT_TICKET_CACHE_DEFAULT))) {
+ String keytabFile = conf.get(SERVER_HA_ZOOKEEPER_CLIENT_KEYTAB);
+ checkArgument(!keytabFile.isEmpty(), "Keytab File is empty.");
+ String principal = conf.get(SERVER_HA_ZOOKEEPER_CLIENT_PRINCIPAL);
+ principal = SecurityUtil.getServerPrincipal(principal,
+ conf.get(RPC_ADDRESS, RPC_ADDRESS_DEFAULT));
+ checkArgument(!principal.isEmpty(), "Kerberos principal is empty.");
+
+ // This is equivalent to writing a jaas.conf file and setting the system property,
+ // "java.security.auth.login.config", to point to it (but this way we don't have to write
+ // a file, and it works better for the tests)
+ JaasConfiguration.addEntryForKeytab(SENTRY_ZK_JAAS_NAME, principal, keytabFile);
+ } else {
+ // Create jaas conf for ticket cache
+ JaasConfiguration.addEntryForTicketCache(SENTRY_ZK_JAAS_NAME);
+ }
+ javax.security.auth.login.Configuration.setConfiguration(JaasConfiguration.getInstance());
+ }
+
+ /**
+ * Create a new Curator leader szselector
+ * @param path Zookeeper path
+ * @param listener Curator listener for leader selection changes
+ * @return an instance of leader selector associated with the running curator framework
+ */
+ public LeaderSelector newLeaderSelector(String path, LeaderSelectorListener listener) {
+ return new LeaderSelector(this.curatorFramework, path, listener);
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.curatorFramework.close();
+ }
+
+ private class SASLOwnerACLProvider implements ACLProvider {
+ @Override
+ public List<ACL> getDefaultAcl() {
+ return saslACL;
+ }
+
+ @Override
+ public List<ACL> getAclForPath(String path) {
+ return saslACL;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java
new file mode 100644
index 0000000..42770df
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java
@@ -0,0 +1,530 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.sentry.provider.db.service.persistent;
+
+import org.apache.sentry.core.common.utils.PubSub;
+import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
+
+import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME;
+import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME_DEPRECATED;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jdo.JDODataStoreException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.thrift.TException;
+import org.apache.sentry.service.thrift.SentryHMSClient;
+import org.apache.sentry.service.thrift.HiveConnectionFactory;
+import org.apache.sentry.service.thrift.HiveNotificationFetcher;
+import org.apache.sentry.api.common.SentryServiceUtil;
+import org.apache.sentry.service.thrift.SentryStateBank;
+import org.apache.sentry.service.thrift.SentryServiceState;
+import org.apache.sentry.service.thrift.HMSFollowerState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HMSFollower is the thread which follows the Hive MetaStore state changes from Sentry.
+ * It gets the full update and notification logs from HMS and applies it to
+ * update permissions stored in Sentry using SentryStore and also update the < obj,path > state
+ * stored for HDFS-Sentry sync.
+ */
+public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(HMSFollower.class);
+ private static final String FULL_UPDATE_TRIGGER = "FULL UPDATE TRIGGER: ";
+ private static boolean connectedToHms = false;
+
+ private SentryHMSClient client;
+ private final Configuration authzConf;
+ private final SentryStore sentryStore;
+ private final NotificationProcessor notificationProcessor;
+ private boolean readyToServe;
+ private final HiveNotificationFetcher notificationFetcher;
+ private final boolean hdfsSyncEnabled;
+ private final AtomicBoolean fullUpdateHMS = new AtomicBoolean(false);
+
+ private final LeaderStatusMonitor leaderMonitor;
+
+ /**
+ * Current generation of HMS snapshots. HMSFollower is single-threaded, so no need
+ * to protect against concurrent modification.
+ */
+ private long hmsImageId = SentryStore.EMPTY_PATHS_SNAPSHOT_ID;
+
+ /**
+ * Configuring Hms Follower thread.
+ *
+ * @param conf sentry configuration
+ * @param store sentry store
+ * @param leaderMonitor singleton instance of LeaderStatusMonitor
+ */
+ public HMSFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor,
+ HiveConnectionFactory hiveConnectionFactory) {
+ this(conf, store, leaderMonitor, hiveConnectionFactory, null);
+ }
+
+ /**
+ * Constructor should be used only for testing purposes.
+ *
+ * @param conf sentry configuration
+ * @param store sentry store
+ * @param leaderMonitor
+ * @param authServerName Server that sentry is Authorizing
+ */
+ @VisibleForTesting
+ public HMSFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor,
+ HiveConnectionFactory hiveConnectionFactory, String authServerName) {
+ LOGGER.info("HMSFollower is being initialized");
+ readyToServe = false;
+ authzConf = conf;
+ this.leaderMonitor = leaderMonitor;
+ sentryStore = store;
+
+ if (authServerName == null) {
+ authServerName = conf.get(AUTHZ_SERVER_NAME.getVar(),
+ conf.get(AUTHZ_SERVER_NAME_DEPRECATED.getVar(), AUTHZ_SERVER_NAME_DEPRECATED.getDefault()));
+ }
+
+ notificationProcessor = new NotificationProcessor(sentryStore, authServerName, authzConf);
+ client = new SentryHMSClient(authzConf, hiveConnectionFactory);
+ hdfsSyncEnabled = SentryServiceUtil.isHDFSSyncEnabledNoCache(authzConf); // no cache to test different settings for hdfs sync
+ notificationFetcher = new HiveNotificationFetcher(sentryStore, hiveConnectionFactory);
+
+ // subscribe to full update notification
+ if (conf.getBoolean(ServerConfig.SENTRY_SERVICE_FULL_UPDATE_PUBSUB, false)) {
+ LOGGER.info(FULL_UPDATE_TRIGGER + "subscribing to topic " + PubSub.Topic.HDFS_SYNC_HMS.getName());
+ PubSub.getInstance().subscribe(PubSub.Topic.HDFS_SYNC_HMS, this);
+ }
+ if(!hdfsSyncEnabled) {
+ try {
+ // Clear all the HMS metadata learned so far and learn it fresh when the feature
+ // is enabled back.
+ store.clearHmsPathInformation();
+ } catch (Exception ex) {
+ LOGGER.error("Failed to clear HMS path info", ex);
+ LOGGER.error("Please manually clear data from SENTRY_PATH_CHANGE/AUTHZ_PATH/AUTHZ_PATHS_MAPPING tables." +
+ "If not, HDFS ACL's will be inconsistent when HDFS sync feature is enabled back.");
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public static boolean isConnectedToHms() {
+ return connectedToHms;
+ }
+
+ @VisibleForTesting
+ void setSentryHmsClient(SentryHMSClient client) {
+ this.client = client;
+ }
+
+ @Override
+ public void close() {
+ if (client != null) {
+ // Close any outstanding connections to HMS
+ try {
+ client.disconnect();
+ SentryStateBank.disableState(HMSFollowerState.COMPONENT,HMSFollowerState.CONNECTED);
+ } catch (Exception failure) {
+ LOGGER.error("Failed to close the Sentry Hms Client", failure);
+ }
+ }
+
+ notificationFetcher.close();
+ }
+
+ @Override
+ public void run() {
+ SentryStateBank.enableState(HMSFollowerState.COMPONENT,HMSFollowerState.STARTED);
+ long lastProcessedNotificationId;
+ try {
+ try {
+ // Initializing lastProcessedNotificationId based on the latest persisted notification ID.
+ lastProcessedNotificationId = sentryStore.getLastProcessedNotificationID();
+ } catch (Exception e) {
+ LOGGER.error("Failed to get the last processed notification id from sentry store, "
+ + "Skipping the processing", e);
+ return;
+ }
+ // Wake any clients connected to this service waiting for HMS already processed notifications.
+ wakeUpWaitingClientsForSync(lastProcessedNotificationId);
+ // Only the leader should listen to HMS updates
+ if (!isLeader()) {
+ // Close any outstanding connections to HMS
+ close();
+ return;
+ }
+ syncupWithHms(lastProcessedNotificationId);
+ } finally {
+ SentryStateBank.disableState(HMSFollowerState.COMPONENT,HMSFollowerState.STARTED);
+ }
+ }
+
+ private boolean isLeader() {
+ return (leaderMonitor == null) || leaderMonitor.isLeader();
+ }
+
+ @VisibleForTesting
+ String getAuthServerName() {
+ return notificationProcessor.getAuthServerName();
+ }
+
+ /**
+ * Processes new Hive Metastore notifications.
+ *
+ * <p>If no notifications are processed yet, then it
+ * does a full initial snapshot of the Hive Metastore followed by new notifications updates that
+ * could have happened after it.
+ *
+ * <p>Clients connections waiting for an event notification will be
+ * woken up afterwards.
+ */
+ void syncupWithHms(long notificationId) {
+ try {
+ client.connect();
+ connectedToHms = true;
+ SentryStateBank.enableState(HMSFollowerState.COMPONENT,HMSFollowerState.CONNECTED);
+ } catch (Throwable e) {
+ LOGGER.error("HMSFollower cannot connect to HMS!!", e);
+ return;
+ }
+
+ try {
+ if (hdfsSyncEnabled) {
+ // Before getting notifications, checking if a full HMS snapshot is required.
+ if (isFullSnapshotRequired(notificationId)) {
+ createFullSnapshot();
+ return;
+ }
+ } else if (isSentryOutOfSync(notificationId)) {
+ // Out-of-sync, fetching all the notifications
+ // in HMS NOTIFICATION_LOG.
+ sentryStore.setLastProcessedNotificationID(0L);
+ notificationId = 0L;
+ }
+
+ Collection<NotificationEvent> notifications =
+ notificationFetcher.fetchNotifications(notificationId);
+
+ // After getting notifications, check if HMS did some clean-up and notifications
+ // are out-of-sync with Sentry.
+ if (hdfsSyncEnabled &&
+ areNotificationsOutOfSync(notifications, notificationId)) {
+ // Out-of-sync, taking a HMS full snapshot.
+ createFullSnapshot();
+ return;
+ }
+
+ if (!readyToServe) {
+ // Allow users and/or applications who look into the Sentry console output to see
+ // when Sentry is ready to serve.
+ System.out.println("Sentry HMS support is ready");
+ readyToServe = true;
+ }
+
+ // Continue with processing new notifications if no snapshots are done.
+ processNotifications(notifications);
+ } catch (TException e) {
+ LOGGER.error("An error occurred while fetching HMS notifications: ", e);
+ close();
+ } catch (Throwable t) {
+ // catching errors to prevent the executor to halt.
+ LOGGER.error("Exception in HMSFollower! Caused by: " + t.getMessage(), t);
+
+ close();
+ }
+ }
+
+ /**
+ * Checks if a new full HMS snapshot request is needed by checking if:
+ * <ul>
+ * <li>Sentry HMS Notification table is EMPTY</li>
+ * <li>HDFSSync is enabled and Sentry Authz Snapshot table is EMPTY</li>
+ * <li>The current notification Id on the HMS is less than the
+ * latest processed by Sentry.</li>
+ * <li>Full Snapshot Signal is detected</li>
+ * </ul>
+ *
+ * @param latestSentryNotificationId The notification Id to check against the HMS
+ * @return True if a full snapshot is required; False otherwise.
+ * @throws Exception If an error occurs while checking the SentryStore or the HMS client.
+ */
+ private boolean isFullSnapshotRequired(long latestSentryNotificationId) throws Exception {
+ if (sentryStore.isHmsNotificationEmpty()) {
+ LOGGER.debug("Sentry Store has no HMS Notifications. Create Full HMS Snapshot. "
+ + "latest sentry notification Id = {}", latestSentryNotificationId);
+ return true;
+ }
+
+ // Once HDFS sync is enabled, and if MAuthzPathsMapping
+ // table is still empty, we need to request a full snapshot
+ if (sentryStore.isAuthzPathsSnapshotEmpty()) {
+ LOGGER.debug("HDFSSync is enabled and MAuthzPathsMapping table is empty." +
+ " Need to request a full snapshot");
+ return true;
+ }
+
+ if(isSentryOutOfSync(latestSentryNotificationId)) {
+ return true;
+ }
+
+ // check if forced full update is required, reset update flag to false
+ // to only do it once per forced full update request.
+ if (fullUpdateHMS.compareAndSet(true, false)) {
+ LOGGER.info(FULL_UPDATE_TRIGGER + "initiating full HMS snapshot request");
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Checks the last notification processed by sentry and the current event-id of
+ * HMS to see if sentry is out of sync.
+ *
+ * @param latestSentryNotificationId The notification Id to check against the HMS
+ * @return True, sentry is out-of-sync, False otherwise
+ * @throws Exception If an error occurs while fetching the current notification from HMS
+ */
+ private boolean isSentryOutOfSync(long latestSentryNotificationId) throws Exception {
+ long currentHmsNotificationId = notificationFetcher.getCurrentNotificationId();
+ if (currentHmsNotificationId < latestSentryNotificationId) {
+ LOGGER.info("The current notification ID on HMS = {} is less than the latest processed Sentry "
+ + "notification ID = {}. Sentry, Out-of-sync",
+ currentHmsNotificationId, latestSentryNotificationId);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Checks if the HMS and Sentry processed notifications are out-of-sync.
+ * This could happen because the HMS did some clean-up of old notifications
+ * and Sentry was not requesting notifications during that time.
+ *
+ * @param events All new notifications to check for an out-of-sync.
+ * @param latestProcessedId The latest notification processed by Sentry to check against the
+ * list of notifications events.
+ * @return True if an out-of-sync is found; False otherwise.
+ */
+ private boolean areNotificationsOutOfSync(Collection<NotificationEvent> events,
+ long latestProcessedId) {
+ if (events.isEmpty()) {
+ return false;
+ }
+
+ /*
+ * If the sequence of notifications has a gap, then an out-of-sync might
+ * have happened due to the following issue:
+ *
+ * - HDFS sync was disabled or Sentry was shutdown for a time period longer than
+ * the HMS notification clean-up thread causing old notifications to be deleted.
+ *
+ * HMS notifications may contain both gaps in the sequence and duplicates
+ * (the same ID repeated more then once for different events).
+ *
+ * To accept duplicates (see NotificationFetcher for more info), then a gap is found
+ * if the 1st notification received is higher than the current ID processed + 1.
+ * i.e.
+ * 1st ID = 3, latest ID = 3 (duplicate found but no gap detected)
+ * 1st ID = 4, latest ID = 3 (consecutive ID found but no gap detected)
+ * 1st ID = 5, latest ID = 3 (a gap is detected)
+ */
+
+ List<NotificationEvent> eventList = (List<NotificationEvent>) events;
+ long firstNotificationId = eventList.get(0).getEventId();
+
+ if (firstNotificationId > (latestProcessedId + 1)) {
+ LOGGER.info("First HMS event notification Id = {} is greater than latest Sentry processed"
+ + "notification Id = {} + 1. Need to request a full HMS snapshot.", firstNotificationId, latestProcessedId);
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Request for full snapshot and persists it if there is no snapshot available in the sentry
+ * store. Also, wakes-up any waiting clients.
+ *
+ * @return ID of last notification processed.
+ * @throws Exception if there are failures
+ */
+ private long createFullSnapshot() throws Exception {
+ LOGGER.debug("Attempting to take full HMS snapshot");
+ Preconditions.checkState(!SentryStateBank.isEnabled(SentryServiceState.COMPONENT,
+ SentryServiceState.FULL_UPDATE_RUNNING),
+ "HMSFollower shown loading full snapshot when it should not be.");
+ try {
+ // Set that the full update is running
+ SentryStateBank
+ .enableState(SentryServiceState.COMPONENT, SentryServiceState.FULL_UPDATE_RUNNING);
+
+ PathsImage snapshotInfo = client.getFullSnapshot();
+ if (snapshotInfo.getPathImage().isEmpty()) {
+ LOGGER.debug("Received empty path image from HMS while taking a full snapshot");
+ return snapshotInfo.getId();
+ }
+
+ // Check we're still the leader before persisting the new snapshot
+ if (!isLeader()) {
+ LOGGER.info("Not persisting full snapshot since not a leader");
+ return SentryStore.EMPTY_NOTIFICATION_ID;
+ }
+ try {
+ if (hdfsSyncEnabled) {
+ LOGGER.info("Persisting full snapshot for notification Id = {}", snapshotInfo.getId());
+ sentryStore.persistFullPathsImage(snapshotInfo.getPathImage(), snapshotInfo.getId());
+ } else {
+ // We need to persist latest notificationID for next poll
+ LOGGER.info("HDFSSync is disabled. Not Persisting full snapshot, "
+ + "but only setting last processed notification Id = {}", snapshotInfo.getId());
+ sentryStore.setLastProcessedNotificationID(snapshotInfo.getId());
+ }
+ } catch (Exception failure) {
+ LOGGER.error("Received exception while persisting HMS path full snapshot ");
+ throw failure;
+ }
+ // Wake up any HMS waiters that could have been put on hold before getting the
+ // eventIDBefore value.
+ wakeUpWaitingClientsForSync(snapshotInfo.getId());
+ // HMSFollower connected to HMS and it finished full snapshot if that was required
+ // Log this message only once
+ LOGGER.info("Sentry HMS support is ready");
+ return snapshotInfo.getId();
+ } catch(Exception failure) {
+ LOGGER.error("Received exception while creating HMS path full snapshot ");
+ throw failure;
+ } finally {
+ SentryStateBank
+ .disableState(SentryServiceState.COMPONENT, SentryServiceState.FULL_UPDATE_RUNNING);
+ }
+ }
+
+ /**
+ * Process the collection of notifications and wake up any waiting clients.
+ * Also, persists the notification ID regardless of processing result.
+ *
+ * @param events list of event to be processed
+ * @throws Exception if the complete notification list is not processed because of JDO Exception
+ */
+ public void processNotifications(Collection<NotificationEvent> events) throws Exception {
+ boolean isNotificationProcessed;
+ if (events.isEmpty()) {
+ return;
+ }
+
+ for (NotificationEvent event : events) {
+ isNotificationProcessed = false;
+ try {
+ // Only the leader should process the notifications
+ if (!isLeader()) {
+ LOGGER.debug("Not processing notifications since not a leader");
+ return;
+ }
+ isNotificationProcessed = notificationProcessor.processNotificationEvent(event);
+ } catch (Exception e) {
+ if (e.getCause() instanceof JDODataStoreException) {
+ LOGGER.info("Received JDO Storage Exception, Could be because of processing "
+ + "duplicate notification");
+ if (event.getEventId() <= sentryStore.getLastProcessedNotificationID()) {
+ // Rest of the notifications need not be processed.
+ LOGGER.error("Received event with Id: {} which is smaller then the ID "
+ + "persisted in store", event.getEventId());
+ break;
+ }
+ } else {
+ LOGGER.error("Processing the notification with ID:{} failed with exception {}",
+ event.getEventId(), e);
+ }
+ }
+ if (!isNotificationProcessed) {
+ try {
+ // Update the notification ID in the persistent store even when the notification is
+ // not processed as the content in in the notification is not valid.
+ // Continue processing the next notification.
+ LOGGER.debug("Explicitly Persisting Notification ID = {} ", event.getEventId());
+ sentryStore.persistLastProcessedNotificationID(event.getEventId());
+ } catch (Exception failure) {
+ LOGGER.error("Received exception while persisting the notification ID = {}", event.getEventId());
+ throw failure;
+ }
+ }
+ // Wake up any HMS waiters that are waiting for this ID.
+ wakeUpWaitingClientsForSync(event.getEventId());
+ }
+ }
+
+ /**
+ * Wakes up HMS waiters waiting for a specific event notification.<p>
+ *
+ * Verify that HMS image id didn't change since the last time we looked.
+ * If id did, it is possible that notifications jumped backward, so reset
+ * the counter to the current value.
+ *
+ * @param eventId Id of a notification
+ */
+ private void wakeUpWaitingClientsForSync(long eventId) {
+ CounterWait counterWait = sentryStore.getCounterWait();
+
+ LOGGER.debug("wakeUpWaitingClientsForSync: eventId = {}, hmsImageId = {}", eventId, hmsImageId);
+ // Wake up any HMS waiters that are waiting for this ID.
+ // counterWait should never be null, but tests mock SentryStore and a mocked one
+ // doesn't have it.
+ if (counterWait == null) {
+ return;
+ }
+
+ long lastHMSSnapshotId = hmsImageId;
+ try {
+ // Read actual HMS image ID
+ lastHMSSnapshotId = sentryStore.getLastProcessedImageID();
+ LOGGER.debug("wakeUpWaitingClientsForSync: lastHMSSnapshotId = {}", lastHMSSnapshotId);
+ } catch (Exception e) {
+ counterWait.update(eventId);
+ LOGGER.error("Failed to get the last processed HMS image id from sentry store");
+ return;
+ }
+
+ // Reset the counter if the persisted image ID is greater than current image ID
+ if (lastHMSSnapshotId > hmsImageId) {
+ counterWait.reset(eventId);
+ hmsImageId = lastHMSSnapshotId;
+ LOGGER.debug("wakeUpWaitingClientsForSync: reset counterWait with eventId = {}, new hmsImageId = {}", eventId, hmsImageId);
+ }
+
+ LOGGER.debug("wakeUpWaitingClientsForSync: update counterWait with eventId = {}, hmsImageId = {}", eventId, hmsImageId);
+ counterWait.update(eventId);
+ }
+
+ /**
+ * PubSub.Subscriber callback API
+ */
+ @Override
+ public void onMessage(PubSub.Topic topic, String message) {
+ Preconditions.checkArgument(topic == PubSub.Topic.HDFS_SYNC_HMS, "Unexpected topic %s instead of %s", topic, PubSub.Topic.HDFS_SYNC_HMS);
+ LOGGER.info(FULL_UPDATE_TRIGGER + "Received [{}, {}] notification", topic, message);
+ fullUpdateHMS.set(true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/LeaderStatusMonitor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/LeaderStatusMonitor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/LeaderStatusMonitor.java
new file mode 100644
index 0000000..c2f1ad0
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/LeaderStatusMonitor.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.provider.db.service.persistent;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.leader.LeaderSelector;
+import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
+import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
+import org.apache.hadoop.conf.Configuration;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.sentry.service.common.ServiceConstants.ServerConfig.*;
+
+/**
+ * LeaderStatusMonitor participates in the distributed leader election protocol
+ * and allows clients to access the global leaadership status.
+ * <p>
+ * LeaderStatusMonitor is a singleton that uses Curator framework via
+ * {@link HAContext}.The leadership status can be accessed via the
+ * {@link #isLeader()} method.<p>
+ *
+ * Usually leadership re-election is initiated by the Curator framework when one
+ * of the nodes disconnects from ZooKeeper, but LeaderStatusMonitor also supports
+ * voluntary release of the leadership via the {@link #deactivate()} method. This is
+ * intended to be used for debugging purposes.
+ * <p>
+ * The class also simulates leader election in non-HA environments. In such cases its
+ * {@link #isLeader()} method always returns True. The non-HA environment is determined
+ * by the absence of the SENTRY_HA_ZOOKEEPER_QUORUM in the configuration.
+ *
+ * <h2>Implementation notes</h2>
+ *
+ * <h3>Initialization</h3>
+ *
+ * Class initialization is split between the constructor and the {@link #init()} method.
+ * There are two reasons for it:
+ * <ul>
+ * <li>We do not want to pass <strong>this</strong> reference to
+ * {@link HAContext#newLeaderSelector(String, LeaderSelectorListener)}
+ * until it is fully initialized</li>
+ * <li>We do not want to call {@link LeaderSelector#start()} method in constructor</li>
+ * </ul>
+ *
+ * Since LeaderStatusMonitor is a singleton and an instance can only be obtained via the
+ * {@link #getLeaderStatusMonitor(Configuration)} method, we hide this construction split
+ * from the callers.
+ *
+ * <h3>Synchronization</h3>
+ * Singleton synchronization is achieved using the synchronized class builder
+ * {@link #getLeaderStatusMonitor(Configuration)}
+ * <p>
+ * Upon becoming a leader, the code loops in {@link #takeLeadership(CuratorFramework)}
+ * until it receives a deactivation signal from {@link #deactivate()}. This is synchronized
+ * using a {@link #lock} and condition variable {@link #cond}.
+ * <p>
+ * Access to the leadership status {@link #isLeader} is also protected by the {@link #lock}.
+ * This isn't strictly necessary and a volatile field would be sufficient, but since we
+ * already use the {@link #lock} this is more straightforward.
+ */
+@ThreadSafe
+public final class LeaderStatusMonitor
+ extends LeaderSelectorListenerAdapter implements AutoCloseable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(LeaderStatusMonitor.class);
+
+ private static final String LEADER_SELECTOR_SUFFIX = "leader";
+
+ /** Unique instance of the singleton object */
+ private static LeaderStatusMonitor leaderStatusMonitor = null;
+
+ private final HAContext haContext;
+
+ /** Unique string describing this instance */
+ private final String defaultIncarnationId = generateIncarnationId();
+ private String incarnationId;
+
+ /** True when not using ZooKeeeper */
+ private final boolean isSingleNodeMode;
+
+ /** Lock and condition used to signal the leader to voluntary release leadership */
+ private final Lock lock = new ReentrantLock();
+ /** Condition variable used to synchronize voluntary leadership release */
+ private final Condition cond = lock.newCondition();
+ /** Leadership status - true if leader. */
+ private boolean isLeader = false;
+
+ /** Curator framework leader monitor */
+ private LeaderSelector leaderSelector = null;
+
+ /** The number of times this incarnation has become the leader. */
+ private final AtomicLong leaderCount = new AtomicLong(0);
+
+ /**
+ * Constructor. Initialize state and create HA context if configuration
+ * specifies ZooKeeper servers.
+ * @param conf Configuration. The fields we are interested in are:
+ * <ul>
+ * <li>SENTRY_HA_ZOOKEEPER_QUORUM</li>
+ * </ul>
+ * Configuration is also passed to the
+ * {@link HAContext#newLeaderSelector(String, LeaderSelectorListener)}
+ * which uses more properties.
+ * @throws Exception
+ */
+
+ @VisibleForTesting
+ protected LeaderStatusMonitor(Configuration conf) throws Exception {
+ // Only enable HA configuration if zookeeper is configured
+ String zkServers = conf.get(SENTRY_HA_ZOOKEEPER_QUORUM, "");
+ if (zkServers.isEmpty()) {
+ isSingleNodeMode = true;
+ haContext = null;
+ isLeader = true;
+ incarnationId = "";
+ LOGGER.info("Leader election protocol disabled, assuming single active server");
+ return;
+ }
+ isSingleNodeMode = false;
+ incarnationId = defaultIncarnationId;
+ haContext = HAContext.getHAServerContext(conf);
+
+ LOGGER.info("Created LeaderStatusMonitor(incarnationId={}, "
+ + "zkServers='{}')", incarnationId, zkServers);
+ }
+
+ /**
+ * Tests may need to provide custm incarnation ID
+ * @param conf confguration
+ * @param incarnationId custom incarnation ID
+ * @throws Exception
+ */
+ @VisibleForTesting
+ protected LeaderStatusMonitor(Configuration conf, String incarnationId) throws Exception {
+ this(conf);
+ this.incarnationId = incarnationId;
+ }
+
+ /**
+ * Second half of the constructor links this object with {@link HAContext} and
+ * starts leader election protocol.
+ */
+ @VisibleForTesting
+ protected void init() {
+ if (isSingleNodeMode) {
+ return;
+ }
+
+ leaderSelector = haContext.newLeaderSelector("/" + LEADER_SELECTOR_SUFFIX, this);
+ leaderSelector.setId(incarnationId);
+ leaderSelector.autoRequeue();
+ leaderSelector.start();
+ }
+
+ /**
+ *
+ * @param conf Configuration. See {@link #LeaderStatusMonitor(Configuration)} for details.
+ * @return A global LeaderStatusMonitor instance.
+ * @throws Exception
+ */
+ @SuppressWarnings("LawOfDemeter")
+ public static synchronized LeaderStatusMonitor getLeaderStatusMonitor(Configuration conf)
+ throws Exception {
+ if (leaderStatusMonitor == null) {
+ leaderStatusMonitor = new LeaderStatusMonitor(conf);
+ leaderStatusMonitor.init();
+ }
+ return leaderStatusMonitor;
+ }
+
+ /**
+ * @return number of times this leader was elected. Used for metrics.
+ */
+ public long getLeaderCount() {
+ return leaderCount.get();
+ }
+
+ /**
+ * Shut down the LeaderStatusMonitor and wait for it to transition to
+ * standby.
+ */
+ @Override
+ public void close() {
+ if (leaderSelector != null) {
+ // Shut down our Curator hooks.
+ leaderSelector.close();
+ }
+ }
+
+ /**
+ * Deactivate the current client, if it is active.
+ * In non-HA case this is a no-op.
+ */
+ public void deactivate() {
+ if (isSingleNodeMode) {
+ return;
+ }
+ lock.lock();
+ try {
+ cond.signal();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * @return true iff we are the leader.
+ * In non-HA case always returns true
+ */
+ public boolean isLeader() {
+ if (isSingleNodeMode) {
+ return true;
+ }
+ lock.lock();
+ @SuppressWarnings("FieldAccessNotGuarded")
+ boolean leader = isLeader;
+ lock.unlock();
+ return leader;
+ }
+
+ /**
+ * Curator framework callback which is called when we become a leader.
+ * Should return only when we decide to resign.
+ */
+ @Override
+ public void takeLeadership(CuratorFramework client) throws Exception {
+ leaderCount.incrementAndGet();
+ LOGGER.info("Becoming leader in Sentry HA cluster:{}", this);
+ lock.lock();
+ try {
+ isLeader = true;
+ // Wait until we are interrupted or receive a signal
+ cond.await();
+ } catch (InterruptedException ignored) {
+ Thread.currentThread().interrupt();
+ LOGGER.error("takeLeadership call interrupted:" + this, ignored);
+ } finally {
+ isLeader = false;
+ lock.unlock();
+ LOGGER.info("Resigning from leader status in a Sentry HA cluster:{}", this);
+ }
+ }
+
+ /**
+ * Generate ID for the activator. <p>
+ *
+ * Ideally we would like something like host@pid, but Java doesn't provide a good
+ * way to determine pid value, so we use
+ * {@link RuntimeMXBean#getName()} which usually contains host
+ * name and pid.
+ */
+ private static String generateIncarnationId() {
+ return ManagementFactory.getRuntimeMXBean().getName();
+ }
+
+ @Override
+ public String toString() {
+ return isSingleNodeMode?"Leader election disabled":
+ String.format("{isSingleNodeMode=%b, incarnationId=%s, isLeader=%b, leaderCount=%d}",
+ isSingleNodeMode, incarnationId, isLeader, leaderCount.longValue());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/NotificationProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/NotificationProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/NotificationProcessor.java
new file mode 100644
index 0000000..228d37c
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/NotificationProcessor.java
@@ -0,0 +1,777 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.provider.db.service.persistent;
+
+import com.codahale.metrics.Timer;
+import com.codahale.metrics.Timer.Context;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAddPartitionMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterPartitionMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterTableMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateDatabaseMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateTableMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropDatabaseMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropPartitionMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropTableMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageDeserializer;
+import org.apache.sentry.core.common.exception.SentryInvalidHMSEventException;
+import org.apache.sentry.core.common.exception.SentryInvalidInputException;
+import org.apache.sentry.core.common.exception.SentryNoSuchObjectException;
+import org.apache.sentry.core.common.utils.PathUtils;
+import org.apache.sentry.hdfs.PathsUpdate;
+import org.apache.sentry.hdfs.PermissionsUpdate;
+import org.apache.sentry.hdfs.SentryMalformedPathException;
+import org.apache.sentry.hdfs.UniquePathsUpdate;
+import org.apache.sentry.hdfs.Updateable.Update;
+import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges;
+import org.apache.sentry.api.service.thrift.SentryMetrics;
+import org.apache.sentry.api.service.thrift.TSentryAuthorizable;
+import org.apache.sentry.api.common.SentryServiceUtil;
+import org.apache.sentry.hdfs.service.thrift.TPrivilegeEntityType;
+import org.apache.sentry.hdfs.service.thrift.TPrivilegeEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import static com.codahale.metrics.MetricRegistry.name;
+import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_CREATE_WITH_POLICY_STORE;
+import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_DROP_WITH_POLICY_STORE;
+
+
+
+/**
+ * NotificationProcessor processes various notification events generated from
+ * the Hive MetaStore state change, and applies these changes to the complete
+ * HMS Paths snapshot or delta update stored in Sentry using SentryStore.
+ *
+ * <p>NotificationProcessor should not skip processing notification events for any reason.
+ * If some notification events are to be skipped, appropriate logic should be added in
+ * HMSFollower before invoking NotificationProcessor.
+ */
+final class NotificationProcessor {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(NotificationProcessor.class);
+ private final SentryStore sentryStore;
+ private final SentryJSONMessageDeserializer deserializer;
+ private final String authServerName;
+ // These variables can be updated even after object is instantiated, for testing purposes.
+ private boolean syncStoreOnCreate = false;
+ private boolean syncStoreOnDrop = false;
+ private final boolean hdfsSyncEnabled;
+
+ /**
+ * Configuring notification processor.
+ *
+ * @param sentryStore sentry backend store
+ * @param authServerName Server that sentry is authorizing
+ * @param conf sentry configuration
+ */
+ NotificationProcessor(SentryStore sentryStore, String authServerName,
+ Configuration conf) {
+ this.sentryStore = sentryStore;
+ deserializer = new SentryJSONMessageDeserializer();
+ this.authServerName = authServerName;
+ syncStoreOnCreate = Boolean
+ .parseBoolean(conf.get(AUTHZ_SYNC_CREATE_WITH_POLICY_STORE.getVar(),
+ AUTHZ_SYNC_CREATE_WITH_POLICY_STORE.getDefault()));
+ syncStoreOnDrop = Boolean.parseBoolean(conf.get(AUTHZ_SYNC_DROP_WITH_POLICY_STORE.getVar(),
+ AUTHZ_SYNC_DROP_WITH_POLICY_STORE.getDefault()));
+ hdfsSyncEnabled = SentryServiceUtil.isHDFSSyncEnabledNoCache(conf);
+ }
+
+ /**
+ * Split path into components on the "/" character.
+ * The path should not start with "/".
+ * This is consumed by Thrift interface, so the return result should be
+ * {@code List<String>}
+ *
+ * @param path input oath e.g. {@code foo/bar}
+ * @return list of components, e.g. [foo, bar]
+ */
+ private static List<String> splitPath(String path) {
+ return Lists.newArrayList(PathUtils.splitPath(path));
+ }
+
+ /**
+ * Constructs permission update to be persisted for drop event that can be persisted
+ * from thrift object.
+ *
+ * @param authorizable thrift object that is dropped.
+ * @return update to be persisted
+ * @throws SentryInvalidInputException if the required fields are set in argument provided
+ */
+ @VisibleForTesting
+ static Update getPermUpdatableOnDrop(TSentryAuthorizable authorizable)
+ throws SentryInvalidInputException {
+ PermissionsUpdate update = new PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false);
+ String authzObj = SentryServiceUtil.getAuthzObj(authorizable);
+ update.addPrivilegeUpdate(authzObj)
+ .putToDelPrivileges(new TPrivilegeEntity(TPrivilegeEntityType.ROLE, PermissionsUpdate.ALL_ROLES),
+ PermissionsUpdate.ALL_ROLES);
+ return update;
+ }
+
+ @VisibleForTesting
+ String getAuthServerName() {
+ return authServerName;
+ }
+
+ /**
+ * Constructs permission update to be persisted for rename event that can be persisted from thrift
+ * object.
+ *
+ * @param oldAuthorizable old thrift object
+ * @param newAuthorizable new thrift object
+ * @return update to be persisted
+ * @throws SentryInvalidInputException if the required fields are set in arguments provided
+ */
+ @VisibleForTesting
+ static Update getPermUpdatableOnRename(TSentryAuthorizable oldAuthorizable,
+ TSentryAuthorizable newAuthorizable)
+ throws SentryInvalidInputException {
+ String oldAuthz = SentryServiceUtil.getAuthzObj(oldAuthorizable);
+ String newAuthz = SentryServiceUtil.getAuthzObj(newAuthorizable);
+ PermissionsUpdate update = new PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false);
+ TPrivilegeChanges privUpdate = update.addPrivilegeUpdate(PermissionsUpdate.RENAME_PRIVS);
+ privUpdate.putToAddPrivileges(new TPrivilegeEntity(TPrivilegeEntityType.AUTHZ_OBJ, newAuthz), newAuthz);
+ privUpdate.putToDelPrivileges(new TPrivilegeEntity(TPrivilegeEntityType.AUTHZ_OBJ, oldAuthz), oldAuthz);
+ return update;
+ }
+
+ /**
+ * This function is only used for testing purposes.
+ *
+ * @param value to be set
+ */
+ @SuppressWarnings("SameParameterValue")
+ @VisibleForTesting
+ void setSyncStoreOnCreate(boolean value) {
+ syncStoreOnCreate = value;
+ }
+
+ /**
+ * This function is only used for testing purposes.
+ *
+ * @param value to be set
+ */
+ @SuppressWarnings("SameParameterValue")
+ @VisibleForTesting
+ void setSyncStoreOnDrop(boolean value) {
+ syncStoreOnDrop = value;
+ }
+
+ /**
+ * Processes the event and persist to sentry store.
+ *
+ * @param event to be processed
+ * @return true, if the event is persisted to sentry store. false, if the event is not persisted.
+ * @throws Exception if there is an error processing the event.
+ */
+ boolean processNotificationEvent(NotificationEvent event) throws Exception {
+ LOGGER
+ .debug("Processing event with id:{} and Type:{}", event.getEventId(), event.getEventType());
+
+ // Expose time used for each request time as a metric.
+ // We use lower-case version of the event name.
+ EventType eventType = EventType.valueOf(event.getEventType());
+ Timer timer = SentryMetrics
+ .getInstance()
+ .getTimer(name(HMSFollower.class, eventType.toString().toLowerCase()));
+
+ try (Context ignored = timer.time()) {
+ switch (eventType) {
+ case CREATE_DATABASE:
+ return processCreateDatabase(event);
+ case DROP_DATABASE:
+ return processDropDatabase(event);
+ case CREATE_TABLE:
+ return processCreateTable(event);
+ case DROP_TABLE:
+ return processDropTable(event);
+ case ALTER_TABLE:
+ return processAlterTable(event);
+ case ADD_PARTITION:
+ return processAddPartition(event);
+ case DROP_PARTITION:
+ return processDropPartition(event);
+ case ALTER_PARTITION:
+ return processAlterPartition(event);
+ default:
+ LOGGER.error("Notification with ID:{} has invalid event type: {}", event.getEventId(),
+ event.getEventType());
+ return false;
+ }
+ }
+ }
+
+ /**
+ * Processes "create database" notification event, and applies its corresponding
+ * snapshot change as well as delta path update into Sentry DB.
+ *
+ * @param event notification event to be processed.
+ * @throws Exception if encounters errors while persisting the path change
+ */
+ private boolean processCreateDatabase(NotificationEvent event) throws Exception {
+ SentryJSONCreateDatabaseMessage message =
+ deserializer.getCreateDatabaseMessage(event.getMessage());
+ String dbName = message.getDB();
+ String location = message.getLocation();
+ if ((dbName == null) || (location == null)) {
+ LOGGER.warn("Create database event "
+ + "has incomplete information. dbName: {} location: {}",
+ StringUtils.defaultIfBlank(dbName, "null"),
+ StringUtils.defaultIfBlank(location, "null"));
+ return false;
+ }
+
+ if (syncStoreOnCreate) {
+ dropSentryDbPrivileges(dbName, event);
+ }
+
+ if (hdfsSyncEnabled) {
+ List<String> locations = Collections.singletonList(location);
+ addPaths(dbName, locations, event);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Processes "drop database" notification event, and applies its corresponding
+ * snapshot change as well as delta path update into Sentry DB.
+ *
+ * @param event notification event to be processed.
+ * @throws Exception if encounters errors while persisting the path change
+ */
+ private boolean processDropDatabase(NotificationEvent event) throws Exception {
+ SentryJSONDropDatabaseMessage dropDatabaseMessage =
+ deserializer.getDropDatabaseMessage(event.getMessage());
+ String dbName = dropDatabaseMessage.getDB();
+ String location = dropDatabaseMessage.getLocation();
+ if (dbName == null) {
+ LOGGER.warn("Drop database event has incomplete information: dbName = null");
+ return false;
+ }
+ if (syncStoreOnDrop) {
+ dropSentryDbPrivileges(dbName, event);
+ }
+
+ if (hdfsSyncEnabled) {
+ List<String> locations = Collections.singletonList(location);
+ removePaths(dbName, locations, event);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Processes "create table" notification event, and applies its corresponding
+ * snapshot change as well as delta path update into Sentry DB.
+ *
+ * @param event notification event to be processed.
+ * @throws Exception if encounters errors while persisting the path change
+ */
+ private boolean processCreateTable(NotificationEvent event)
+ throws Exception {
+ SentryJSONCreateTableMessage createTableMessage = deserializer
+ .getCreateTableMessage(event.getMessage());
+ String dbName = createTableMessage.getDB();
+ String tableName = createTableMessage.getTable();
+ String location = createTableMessage.getLocation();
+ if ((dbName == null) || (tableName == null) || (location == null)) {
+ LOGGER.warn(String.format("Create table event " + "has incomplete information."
+ + " dbName = %s, tableName = %s, location = %s",
+ StringUtils.defaultIfBlank(dbName, "null"),
+ StringUtils.defaultIfBlank(tableName, "null"),
+ StringUtils.defaultIfBlank(location, "null")));
+ return false;
+ }
+ if (syncStoreOnCreate) {
+ dropSentryTablePrivileges(dbName, tableName, event);
+ }
+
+ if (hdfsSyncEnabled) {
+ String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName);
+ List<String> locations = Collections.singletonList(location);
+ addPaths(authzObj, locations, event);
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Processes "drop table" notification event. It drops all partitions belongs to
+ * the table as well. And applies its corresponding snapshot change as well
+ * as delta path update into Sentry DB.
+ *
+ * @param event notification event to be processed.
+ * @throws Exception if encounters errors while persisting the path change
+ */
+ private boolean processDropTable(NotificationEvent event) throws Exception {
+ SentryJSONDropTableMessage dropTableMessage = deserializer
+ .getDropTableMessage(event.getMessage());
+ String dbName = dropTableMessage.getDB();
+ String tableName = dropTableMessage.getTable();
+ if ((dbName == null) || (tableName == null)) {
+ LOGGER.warn("Drop table event "
+ + "has incomplete information. dbName: {}, tableName: {}",
+ StringUtils.defaultIfBlank(dbName, "null"),
+ StringUtils.defaultIfBlank(tableName, "null"));
+ return false;
+ }
+ if (syncStoreOnDrop) {
+ dropSentryTablePrivileges(dbName, tableName, event);
+ }
+
+ if (hdfsSyncEnabled) {
+ String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName);
+ removeAllPaths(authzObj, event);
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Processes "alter table" notification event, and applies its corresponding
+ * snapshot change as well as delta path update into Sentry DB.
+ *
+ * @param event notification event to be processed.
+ * @throws Exception if encounters errors while persisting the path change
+ */
+ private boolean processAlterTable(NotificationEvent event) throws Exception {
+
+ SentryJSONAlterTableMessage alterTableMessage =
+ deserializer.getAlterTableMessage(event.getMessage());
+ String oldDbName = alterTableMessage.getDB();
+ String oldTableName = alterTableMessage.getTable();
+ String newDbName = event.getDbName();
+ String newTableName = event.getTableName();
+ String oldLocation = alterTableMessage.getOldLocation();
+ String newLocation = alterTableMessage.getNewLocation();
+
+ if ((oldDbName == null)
+ || (oldTableName == null)
+ || (newDbName == null)
+ || (newTableName == null)
+ || (oldLocation == null)
+ || (newLocation == null)) {
+ LOGGER.warn(String.format("Alter table notification ignored since event "
+ + "has incomplete information. oldDbName = %s, oldTableName = %s, oldLocation = %s, "
+ + "newDbName = %s, newTableName = %s, newLocation = %s",
+ StringUtils.defaultIfBlank(oldDbName, "null"),
+ StringUtils.defaultIfBlank(oldTableName, "null"),
+ StringUtils.defaultIfBlank(oldLocation, "null"),
+ StringUtils.defaultIfBlank(newDbName, "null"),
+ StringUtils.defaultIfBlank(newTableName, "null"),
+ StringUtils.defaultIfBlank(newLocation, "null")));
+ return false;
+ }
+
+ if ((oldDbName.equals(newDbName))
+ && (oldTableName.equals(newTableName))
+ && (oldLocation.equals(newLocation))) {
+ LOGGER.debug(String.format("Alter table notification ignored as neither name nor "
+ + "location has changed: oldAuthzObj = %s, oldLocation = %s, newAuthzObj = %s, "
+ + "newLocation = %s", oldDbName + "." + oldTableName, oldLocation,
+ newDbName + "." + newTableName, newLocation));
+ return false;
+ }
+
+ if (!newDbName.equalsIgnoreCase(oldDbName) || !oldTableName.equalsIgnoreCase(newTableName)) {
+ // Name has changed
+ try {
+ renamePrivileges(oldDbName, oldTableName, newDbName, newTableName);
+ } catch (SentryNoSuchObjectException e) {
+ LOGGER.debug("Rename Sentry privilege ignored as there are no privileges on the table:"
+ + " {}.{}", oldDbName, oldTableName);
+ } catch (Exception e) {
+ LOGGER.info("Could not process Alter table event. Event: {}", event.toString(), e);
+ return false;
+ }
+ }
+
+ if (!hdfsSyncEnabled) {
+ return false;
+ }
+ String oldAuthzObj = oldDbName + "." + oldTableName;
+ String newAuthzObj = newDbName + "." + newTableName;
+ renameAuthzPath(oldAuthzObj, newAuthzObj, oldLocation, newLocation, event);
+ return true;
+ }
+
+ /**
+ * Processes "add partition" notification event, and applies its corresponding
+ * snapshot change as well as delta path update into Sentry DB.
+ *
+ * @param event notification event to be processed.
+ * @throws Exception if encounters errors while persisting the path change
+ */
+ private boolean processAddPartition(NotificationEvent event)
+ throws Exception {
+ if (!hdfsSyncEnabled) {
+ return false;
+ }
+
+ SentryJSONAddPartitionMessage addPartitionMessage =
+ deserializer.getAddPartitionMessage(event.getMessage());
+ String dbName = addPartitionMessage.getDB();
+ String tableName = addPartitionMessage.getTable();
+ List<String> locations = addPartitionMessage.getLocations();
+ if ((dbName == null) || (tableName == null) || (locations == null)) {
+ LOGGER.warn(String.format("Create table event has incomplete information. "
+ + "dbName = %s, tableName = %s, locations = %s",
+ StringUtils.defaultIfBlank(dbName, "null"),
+ StringUtils.defaultIfBlank(tableName, "null"),
+ locations != null ? locations.toString() : "null"));
+ return false;
+ }
+ String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName);
+ addPaths(authzObj, locations, event);
+ return true;
+ }
+
+ /**
+ * Processes "drop partition" notification event, and applies its corresponding
+ * snapshot change as well as delta path update into Sentry DB.
+ *
+ * @param event notification event to be processed.
+ * @throws Exception if encounters errors while persisting the path change
+ */
+ private boolean processDropPartition(NotificationEvent event)
+ throws Exception {
+ if (!hdfsSyncEnabled) {
+ return false;
+ }
+
+ SentryJSONDropPartitionMessage dropPartitionMessage =
+ deserializer.getDropPartitionMessage(event.getMessage());
+ String dbName = dropPartitionMessage.getDB();
+ String tableName = dropPartitionMessage.getTable();
+ List<String> locations = dropPartitionMessage.getLocations();
+ if ((dbName == null) || (tableName == null) || (locations == null)) {
+ LOGGER.warn(String.format("Drop partition event "
+ + "has incomplete information. dbName = %s, tableName = %s, location = %s",
+ StringUtils.defaultIfBlank(dbName, "null"),
+ StringUtils.defaultIfBlank(tableName, "null"),
+ locations != null ? locations.toString() : "null"));
+ return false;
+ }
+ String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName);
+ removePaths(authzObj, locations, event);
+ return true;
+ }
+
+ /**
+ * Processes "alter partition" notification event, and applies its corresponding
+ * snapshot change as well as delta path update into Sentry DB.
+ *
+ * @param event notification event to be processed.
+ * @throws Exception if encounters errors while persisting the path change
+ */
+ private boolean processAlterPartition(NotificationEvent event) throws Exception {
+ if (!hdfsSyncEnabled) {
+ return false;
+ }
+
+ SentryJSONAlterPartitionMessage alterPartitionMessage =
+ deserializer.getAlterPartitionMessage(event.getMessage());
+ String dbName = alterPartitionMessage.getDB();
+ String tableName = alterPartitionMessage.getTable();
+ String oldLocation = alterPartitionMessage.getOldLocation();
+ String newLocation = alterPartitionMessage.getNewLocation();
+
+ if ((dbName == null)
+ || (tableName == null)
+ || (oldLocation == null)
+ || (newLocation == null)) {
+ LOGGER.warn(String.format("Alter partition event "
+ + "has incomplete information. dbName = %s, tableName = %s, "
+ + "oldLocation = %s, newLocation = %s",
+ StringUtils.defaultIfBlank(dbName, "null"),
+ StringUtils.defaultIfBlank(tableName, "null"),
+ StringUtils.defaultIfBlank(oldLocation, "null"),
+ StringUtils.defaultIfBlank(newLocation, "null")));
+ return false;
+ }
+
+ if (oldLocation.equals(newLocation)) {
+ LOGGER.debug(String.format("Alter partition notification ignored as"
+ + "location has not changed: AuthzObj = %s, Location = %s", dbName + "."
+ + "." + tableName, oldLocation));
+ return false;
+ }
+
+ String oldAuthzObj = dbName + "." + tableName;
+ renameAuthzPath(oldAuthzObj, oldAuthzObj, oldLocation, newLocation, event);
+ return true;
+ }
+
+ /**
+ * Adds an authzObj along with a set of paths into the authzObj -> [Paths] mapping
+ * as well as persist the corresponding delta path change to Sentry DB.
+ *
+ * @param authzObj the given authzObj
+ * @param locations a set of paths need to be added
+ * @param event the NotificationEvent object from where authzObj and locations were obtained
+ */
+ private void addPaths(String authzObj, Collection<String> locations, NotificationEvent event)
+ throws Exception {
+ // AuthzObj is case insensitive
+ authzObj = authzObj.toLowerCase();
+
+ UniquePathsUpdate update = new UniquePathsUpdate(event, false);
+ Collection<String> paths = new HashSet<>(locations.size());
+ // addPath and persist into Sentry DB.
+ // Skip update if encounter malformed path.
+ for (String location : locations) {
+ String pathTree = getPath(location);
+ if (pathTree == null) {
+ LOGGER.debug("HMS Path Update ["
+ + "OP : addPath, "
+ + "authzObj : " + authzObj + ", "
+ + "path : " + location + "] - nothing to add" + ", "
+ + "notification event ID: " + event.getEventId() + "]");
+ } else {
+ LOGGER.debug("HMS Path Update ["
+ + "OP : addPath, " + "authzObj : "
+ + authzObj + ", "
+ + "path : " + location + ", "
+ + "notification event ID: " + event.getEventId() + "]");
+ update.newPathChange(authzObj).addToAddPaths(splitPath(pathTree));
+ paths.add(pathTree);
+ }
+ }
+ sentryStore.addAuthzPathsMapping(authzObj, paths, update);
+ }
+
+ /**
+ * Removes a set of paths map to a given authzObj from the authzObj -> [Paths] mapping
+ * as well as persist the corresponding delta path change to Sentry DB.
+ *
+ * @param authzObj the given authzObj
+ * @param locations a set of paths need to be removed
+ * @param event the NotificationEvent object from where authzObj and locations were obtained
+ */
+ private void removePaths(String authzObj, Collection<String> locations, NotificationEvent event)
+ throws Exception {
+ // AuthzObj is case insensitive
+ authzObj = authzObj.toLowerCase();
+
+ UniquePathsUpdate update = new UniquePathsUpdate(event, false);
+ Collection<String> paths = new HashSet<>(locations.size());
+ for (String location : locations) {
+ String pathTree = getPath(location);
+ if (pathTree == null) {
+ LOGGER.debug("HMS Path Update ["
+ + "OP : removePath, "
+ + "authzObj : " + authzObj + ", "
+ + "path : " + location + "] - nothing to remove" + ", "
+ + "notification event ID: " + event.getEventId() + "]");
+ } else {
+ LOGGER.debug("HMS Path Update ["
+ + "OP : removePath, "
+ + "authzObj : " + authzObj + ", "
+ + "path : " + location + ", "
+ + "notification event ID: " + event.getEventId() + "]");
+ update.newPathChange(authzObj).addToDelPaths(splitPath(pathTree));
+ paths.add(pathTree);
+ }
+ }
+ sentryStore.deleteAuthzPathsMapping(authzObj, paths, update);
+ }
+
+ /**
+ * Removes a given authzObj and all paths belongs to it from the
+ * authzObj -> [Paths] mapping as well as persist the corresponding
+ * delta path change to Sentry DB.
+ *
+ * @param authzObj the given authzObj to be deleted
+ * @param event the NotificationEvent object from where authzObj and locations were obtained
+ */
+ private void removeAllPaths(String authzObj, NotificationEvent event)
+ throws Exception {
+ // AuthzObj is case insensitive
+ authzObj = authzObj.toLowerCase();
+
+ LOGGER.debug("HMS Path Update ["
+ + "OP : removeAllPaths, "
+ + "authzObj : " + authzObj + ", "
+ + "notification event ID: " + event.getEventId() + "]");
+ UniquePathsUpdate update = new UniquePathsUpdate(event, false);
+ update.newPathChange(authzObj).addToDelPaths(
+ Lists.newArrayList(PathsUpdate.ALL_PATHS));
+ sentryStore.deleteAllAuthzPathsMapping(authzObj, update);
+ }
+
+ /**
+ * Renames a given authzObj and alter the paths belongs to it from the
+ * authzObj -> [Paths] mapping as well as persist the corresponding
+ * delta path change to Sentry DB.
+ *
+ * @param oldAuthzObj the existing authzObj
+ * @param newAuthzObj the new name to be changed to
+ * @param oldLocation a existing path of the given authzObj
+ * @param newLocation a new path to be changed to
+ * @param event the NotificationEvent object from where authzObj and locations were obtained
+ */
+ private void renameAuthzPath(String oldAuthzObj, String newAuthzObj, String oldLocation,
+ String newLocation, NotificationEvent event) throws Exception {
+ // AuthzObj is case insensitive
+ oldAuthzObj = oldAuthzObj.toLowerCase();
+ newAuthzObj = newAuthzObj.toLowerCase();
+ String oldPathTree = getPath(oldLocation);
+ String newPathTree = getPath(newLocation);
+
+ LOGGER.debug("HMS Path Update ["
+ + "OP : renameAuthzObject, "
+ + "oldAuthzObj : " + oldAuthzObj + ", "
+ + "newAuthzObj : " + newAuthzObj + ", "
+ + "oldLocation : " + oldLocation + ", "
+ + "newLocation : " + newLocation + ", "
+ + "notification event ID: " + event.getEventId() + "]");
+
+ // In the case of HiveObj name has changed
+ if (!oldAuthzObj.equalsIgnoreCase(newAuthzObj)) {
+ // Skip update if encounter malformed path for both oldLocation and newLocation.
+ if ((oldPathTree != null) && (newPathTree != null)) {
+ UniquePathsUpdate update = new UniquePathsUpdate(event, false);
+ update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree));
+ update.newPathChange(newAuthzObj).addToAddPaths(splitPath(newPathTree));
+ if (oldLocation.equals(newLocation)) {
+ //Only name has changed
+ // - Alter table rename for an external table
+ sentryStore.renameAuthzObj(oldAuthzObj, newAuthzObj, update);
+ } else {
+ // Both name and location has changed
+ // - Alter table rename for managed table
+ sentryStore.renameAuthzPathsMapping(oldAuthzObj, newAuthzObj, oldPathTree,
+ newPathTree, update);
+ }
+ } else {
+ updateAuthzPathsMapping(oldAuthzObj, oldPathTree, newAuthzObj, newPathTree, event);
+ }
+ } else if (!oldLocation.equals(newLocation)) {
+ // Only Location has changed, e.g. Alter table set location
+ if ((oldPathTree != null) && (newPathTree != null)) {
+ UniquePathsUpdate update = new UniquePathsUpdate(event, false);
+ update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree));
+ update.newPathChange(oldAuthzObj).addToAddPaths(splitPath(newPathTree));
+ sentryStore.updateAuthzPathsMapping(oldAuthzObj, oldPathTree,
+ newPathTree, update);
+ } else {
+ updateAuthzPathsMapping(oldAuthzObj, oldPathTree, newAuthzObj, newPathTree,event);
+ }
+ } else {
+ // This code should not be hit as appropriate checks are performed by the callers of this method.
+ LOGGER.error("Update Notification for Auhorizable object {}, with no change, skipping",
+ oldAuthzObj);
+ throw new SentryInvalidHMSEventException("Update Notification for Authorizable object"
+ + "with no change");
+ }
+ }
+
+ private void updateAuthzPathsMapping(String oldAuthzObj, String oldPathTree,
+ String newAuthzObj, String newPathTree, NotificationEvent event) throws Exception {
+ if (oldPathTree != null) {
+ UniquePathsUpdate update = new UniquePathsUpdate(event, false);
+ update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree));
+ sentryStore.deleteAuthzPathsMapping(oldAuthzObj,
+ Collections.singleton(oldPathTree),
+ update);
+ } else if (newPathTree != null) {
+ UniquePathsUpdate update = new UniquePathsUpdate(event, false);
+ update.newPathChange(newAuthzObj).addToAddPaths(splitPath(newPathTree));
+ sentryStore.addAuthzPathsMapping(newAuthzObj,
+ Collections.singleton(newPathTree),
+ update);
+ }
+
+ }
+
+ /**
+ * Get path tree from a given path. It return null if encounters
+ * SentryMalformedPathException which indicates a malformed path.
+ *
+ * @param path a path
+ * @return the path tree given a path.
+ */
+ private String getPath(String path) {
+ try {
+ return PathsUpdate.parsePath(path);
+ } catch (SentryMalformedPathException e) {
+ LOGGER.error("Unexpected path while parsing {}", path, e);
+ }
+ return null;
+ }
+
+ private void dropSentryDbPrivileges(String dbName, NotificationEvent event) {
+ try {
+ TSentryAuthorizable authorizable = new TSentryAuthorizable(authServerName);
+ authorizable.setDb(dbName);
+ sentryStore.dropPrivilege(authorizable, getPermUpdatableOnDrop(authorizable));
+ } catch (SentryNoSuchObjectException e) {
+ LOGGER.debug("Drop Sentry privilege ignored as there are no privileges on the database: {}",
+ dbName);
+ } catch (Exception e) {
+ LOGGER.error("Could not process Drop database event." + "Event: " + event.toString(), e);
+ }
+ }
+
+ private void dropSentryTablePrivileges(String dbName, String tableName,
+ NotificationEvent event) {
+ try {
+ TSentryAuthorizable authorizable = new TSentryAuthorizable(authServerName);
+ authorizable.setDb(dbName);
+ authorizable.setTable(tableName);
+ sentryStore.dropPrivilege(authorizable, getPermUpdatableOnDrop(authorizable));
+ } catch (SentryNoSuchObjectException e) {
+ LOGGER.debug("Drop Sentry privilege ignored as there are no privileges on the table: {}.{}",
+ dbName, tableName);
+ } catch (Exception e) {
+ LOGGER.error("Could not process Drop table event. Event: " + event.toString(), e);
+ }
+ }
+
+ private void renamePrivileges(String oldDbName, String oldTableName, String newDbName,
+ String newTableName) throws
+ Exception {
+ TSentryAuthorizable oldAuthorizable = new TSentryAuthorizable(authServerName);
+ oldAuthorizable.setDb(oldDbName);
+ oldAuthorizable.setTable(oldTableName);
+ TSentryAuthorizable newAuthorizable = new TSentryAuthorizable(authServerName);
+ newAuthorizable.setDb(newDbName);
+ newAuthorizable.setTable(newTableName);
+ Update update =
+ getPermUpdatableOnRename(oldAuthorizable, newAuthorizable);
+ sentryStore.renamePrivilege(oldAuthorizable, newAuthorizable, update);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java
new file mode 100644
index 0000000..9813a5a
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.provider.db.service.persistent;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * A container for complete hive paths snapshot.
+ * <p>
+ * It is composed by a hiveObj to Paths mapping, a paths image ID and the sequence number/change ID
+ * of latest delta change that the snapshot maps to.
+ */
+public class PathsImage {
+
+ // A full image of hiveObj to Paths mapping.
+ private final Map<String, Collection<String>> pathImage;
+ private final long id;
+ private final long curImgNum;
+
+ public PathsImage(Map<String, Collection<String>> pathImage, long id, long curImgNum) {
+ this.pathImage = pathImage;
+ this.id = id;
+ this.curImgNum = curImgNum;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public long getCurImgNum() {
+ return curImgNum;
+ }
+
+ public Map<String, Collection<String>> getPathImage() {
+ return pathImage;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PermissionsImage.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PermissionsImage.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PermissionsImage.java
new file mode 100644
index 0000000..4a02db2
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PermissionsImage.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.provider.db.service.persistent;
+
+import org.apache.sentry.hdfs.service.thrift.TPrivilegeEntity;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A container for complete sentry permission snapshot.
+ * <p>
+ * It is composed by a role to groups mapping, and hiveObj to < role/user, privileges > mapping.
+ * It also has the sequence number/change ID of latest delta change that the snapshot maps to.
+ */
+public class PermissionsImage {
+
+ // A full snapshot of sentry role to groups mapping.
+ private final Map<String, List<String>> roleImage;
+
+ // A full snapshot of hiveObj to <role/user, privileges> mapping.
+ private final Map<String, Map<TPrivilegeEntity, String>> privilegeImage;
+ private final long curSeqNum;
+
+ public PermissionsImage(Map<String, List<String>> roleImage,
+ Map<String, Map<TPrivilegeEntity, String>> privilegeImage, long curSeqNum) {
+ this.roleImage = roleImage;
+ this.privilegeImage = privilegeImage;
+ this.curSeqNum = curSeqNum;
+ }
+
+ public long getCurSeqNum() {
+ return curSeqNum;
+ }
+
+ public Map<String, Map<TPrivilegeEntity, String>> getPrivilegeImage() {
+ return privilegeImage;
+ }
+
+ public Map<String, List<String>> getRoleImage() {
+ return roleImage;
+ }
+}