You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ka...@apache.org on 2017/07/14 00:57:42 UTC
[6/6] sentry git commit: SENTRY-1769 Refactor HMSFollower Class
(Kalyan Kumar Kalvagadda reviewed by Vamsee Yarlagadda, Na Li,
Sergio Pena and Alexander Kolbasov)
SENTRY-1769 Refactor HMSFollower Class (Kalyan Kumar Kalvagadda reviewed by Vamsee Yarlagadda, Na Li, Sergio Pena and Alexander Kolbasov)
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/024d99de
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/024d99de
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/024d99de
Branch: refs/heads/sentry-ha-redesign-kkalyan
Commit: 024d99de2663b6bdb741498f03862cd9337d9ad3
Parents: 3a7b54f
Author: Kalyan Kumar Kalvagadda <kk...@cloudera.com>
Authored: Thu Jul 13 19:57:03 2017 -0500
Committer: Kalyan Kumar Kalvagadda <kk...@cloudera.com>
Committed: Thu Jul 13 19:57:03 2017 -0500
----------------------------------------------------------------------
.../apache/sentry/hdfs/PathImageRetriever.java | 13 +-
.../org/apache/sentry/hdfs/SentryPlugin.java | 40 +-
.../provider/db/SentryPolicyStorePlugin.java | 4 +-
.../db/service/model/MAuthzPathsSnapshotId.java | 2 +-
.../service/model/MSentryHmsNotification.java | 8 +-
.../db/service/persistent/PathsImage.java | 10 +-
.../db/service/persistent/SentryStore.java | 13 +-
.../thrift/SentryPolicyStoreProcessor.java | 5 +-
.../sentry/service/thrift/HMSFollower.java | 641 -------------------
.../sentry/service/thrift/HmsFollower.java | 275 ++++++++
.../service/thrift/NotificationProcessor.java | 571 +++++++++++++----
.../sentry/service/thrift/SentryHmsClient.java | 244 +++++++
.../sentry/service/thrift/SentryService.java | 17 +-
.../service/thrift/SentryServiceUtil.java | 35 +-
.../sentry/service/thrift/ServiceConstants.java | 2 +-
.../db/service/persistent/TestSentryStore.java | 1 -
.../sentry/service/thrift/TestHMSFollower.java | 384 -----------
.../sentry/service/thrift/TestHmsFollower.java | 532 +++++++++++++++
.../thrift/TestNotificationProcessor.java | 465 ++++++++++++++
.../service/thrift/TestSentryHmsClient.java | 470 ++++++++++++++
.../TestDbPrivilegeCleanupOnDrop.java | 4 +-
21 files changed, 2531 insertions(+), 1205 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sentry/blob/024d99de/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java
index 2426b40..ac5c5b2 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java
@@ -25,9 +25,14 @@ import org.apache.sentry.provider.db.service.persistent.PathsImage;
import org.apache.sentry.provider.db.service.persistent.SentryStore;
import javax.annotation.concurrent.ThreadSafe;
+
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.sentry.hdfs.service.thrift.TPathChanges;
+import org.apache.sentry.provider.db.service.persistent.PathsImage;
+import org.apache.sentry.provider.db.service.persistent.SentryStore;
/**
* PathImageRetriever obtains a complete snapshot of Hive Paths from a persistent
@@ -37,10 +42,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* It is a thread safe class, as all the underlying database operation is thread safe.
*/
@ThreadSafe
-public class PathImageRetriever implements ImageRetriever<PathsUpdate> {
+class PathImageRetriever implements ImageRetriever<PathsUpdate> {
- private final SentryStore sentryStore;
private static final String[] root = {"/"};
+ private final SentryStore sentryStore;
PathImageRetriever(SentryStore sentryStore) {
this.sentryStore = sentryStore;
@@ -55,8 +60,8 @@ public class PathImageRetriever implements ImageRetriever<PathsUpdate> {
// persistent storage, along with the sequence number of latest
// delta change the snapshot corresponds to.
PathsImage pathsImage = sentryStore.retrieveFullPathsImage();
- long curSeqNum = pathsImage.getCurSeqNum();
long curImgNum = pathsImage.getCurImgNum();
+ long curSeqNum = pathsImage.getId();
Map<String, Set<String>> pathImage = pathsImage.getPathImage();
// Translates the complete Hive paths snapshot into a PathsUpdate.
@@ -73,7 +78,7 @@ public class PathImageRetriever implements ImageRetriever<PathsUpdate> {
}
SentryHdfsMetricsUtil.getPathChangesHistogram.update(pathsUpdate
- .getPathChanges().size());
+ .getPathChanges().size());
// Translate PathsUpdate that contains a full image to TPathsDump for
// consumer (NN) to be able to quickly construct UpdateableAuthzPaths
http://git-wip-us.apache.org/repos/asf/sentry/blob/024d99de/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
index d6100de..0c3ba5b 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
@@ -23,12 +23,14 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.core.common.exception.SentryInvalidInputException;
import org.apache.sentry.core.common.utils.SigUtils;
import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges;
import org.apache.sentry.hdfs.service.thrift.TRoleChanges;
import org.apache.sentry.provider.db.SentryPolicyStorePlugin;
import org.apache.sentry.provider.db.service.persistent.SentryStore;
+import org.apache.sentry.service.thrift.SentryServiceUtil;
import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleAddGroupsRequest;
import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleDeleteGroupsRequest;
import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleGrantPrivilegeRequest;
@@ -38,7 +40,7 @@ import org.apache.sentry.provider.db.service.thrift.TDropSentryRoleRequest;
import org.apache.sentry.provider.db.service.thrift.TRenamePrivilegesRequest;
import org.apache.sentry.provider.db.service.thrift.TSentryGroup;
import org.apache.sentry.provider.db.service.thrift.TSentryPrivilege;
-import org.apache.sentry.service.thrift.HMSFollower;
+import org.apache.sentry.service.thrift.HmsFollower;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,7 +58,7 @@ import static org.apache.sentry.hdfs.service.thrift.sentry_hdfs_serviceConstants
* <ol>
* <li>
* Whenever updates happen on HMS, a corresponding notification log is generated,
- * and {@link HMSFollower} will process the notification event and persist it in database.
+ * and {@link HmsFollower} will process the notification event and persist it in database.
* <li>
* The NameNode periodically asks Sentry for updates. Sentry may return zero
* or more updates previously received via HMS notification log.
@@ -240,16 +242,22 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
@Override
public Update onRenameSentryPrivilege(TRenamePrivilegesRequest request)
- throws SentryPluginException {
- String oldAuthz = HMSFollower.getAuthzObj(request.getOldAuthorizable());
- String newAuthz = HMSFollower.getAuthzObj(request.getNewAuthorizable());
+ throws SentryPluginException, SentryInvalidInputException{
+ String oldAuthz = null;
+ String newAuthz = null;
+ try {
+ oldAuthz = SentryServiceUtil.getAuthzObj(request.getOldAuthorizable());
+ newAuthz = SentryServiceUtil.getAuthzObj(request.getNewAuthorizable());
+ } catch (SentryInvalidInputException failure) {
+ LOGGER.error("onRenameSentryPrivilege, Could not rename sentry privilege ", failure);
+ throw failure;
+ }
PermissionsUpdate update = new PermissionsUpdate();
TPrivilegeChanges privUpdate = update.addPrivilegeUpdate(PermissionsUpdate.RENAME_PRIVS);
privUpdate.putToAddPrivileges(newAuthz, newAuthz);
privUpdate.putToDelPrivileges(oldAuthz, oldAuthz);
- LOGGER.debug(String.format("onRenameSentryPrivilege, Authz Perm preUpdate [ %s ]",
- oldAuthz));
+ LOGGER.debug("onRenameSentryPrivilege, Authz Perm preUpdate [ {} ]", oldAuthz);
return update;
}
@@ -283,8 +291,7 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
update.addPrivilegeUpdate(authzObj).putToDelPrivileges(
roleName, privilege.getAction().toUpperCase());
- LOGGER.debug(String.format("onAlterSentryRoleRevokePrivilegeCore, Authz Perm preUpdate [ %s ]",
- authzObj));
+ LOGGER.debug("onAlterSentryRoleRevokePrivilegeCore, Authz Perm preUpdate [ {} ]", authzObj);
return update;
}
@@ -296,8 +303,7 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
request.getRoleName(), PermissionsUpdate.ALL_AUTHZ_OBJ);
update.addRoleUpdate(request.getRoleName()).addToDelGroups(PermissionsUpdate.ALL_GROUPS);
- LOGGER.debug(String.format("onDropSentryRole, Authz Perm preUpdate [ %s ]",
- request.getRoleName()));
+ LOGGER.debug("onDropSentryRole, Authz Perm preUpdate [ {} ]", request.getRoleName());
return update;
}
@@ -305,12 +311,18 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
public Update onDropSentryPrivilege(TDropPrivilegesRequest request)
throws SentryPluginException {
PermissionsUpdate update = new PermissionsUpdate();
- String authzObj = HMSFollower.getAuthzObj(request.getAuthorizable());
+ String authzObj = null;
+ try {
+ authzObj = SentryServiceUtil.getAuthzObj(request.getAuthorizable());
+ } catch (SentryInvalidInputException failure) {
+ LOGGER.error("onDropSentryPrivilege, Could not drop sentry privilege "
+ + failure.toString(), failure);
+ throw new SentryPluginException(failure.getMessage(), failure);
+ }
update.addPrivilegeUpdate(authzObj).putToDelPrivileges(
PermissionsUpdate.ALL_ROLES, PermissionsUpdate.ALL_ROLES);
- LOGGER.debug(String.format("onDropSentryPrivilege, Authz Perm preUpdate [ %s ]",
- authzObj));
+ LOGGER.debug("onDropSentryPrivilege, Authz Perm preUpdate [ {} ]", authzObj);
return update;
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/024d99de/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java
index 5b8a572..a22b422 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java
@@ -19,6 +19,7 @@
package org.apache.sentry.provider.db;
import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.core.common.exception.SentryInvalidInputException;
import org.apache.sentry.core.common.exception.SentryUserException;
import org.apache.sentry.provider.db.service.persistent.SentryStore;
import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleAddGroupsRequest;
@@ -68,7 +69,8 @@ public interface SentryPolicyStorePlugin {
Update onDropSentryRole(TDropSentryRoleRequest tRequest) throws SentryPluginException;
- Update onRenameSentryPrivilege(TRenamePrivilegesRequest request) throws SentryPluginException;
+ Update onRenameSentryPrivilege(TRenamePrivilegesRequest request)
+ throws SentryPluginException, SentryInvalidInputException;
Update onDropSentryPrivilege(TDropPrivilegesRequest request) throws SentryPluginException;
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/024d99de/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MAuthzPathsSnapshotId.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MAuthzPathsSnapshotId.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MAuthzPathsSnapshotId.java
index d683c2c..d8d54f3 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MAuthzPathsSnapshotId.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MAuthzPathsSnapshotId.java
@@ -22,7 +22,7 @@ import javax.jdo.annotations.PrimaryKey;
/**
* This class is used to persist new authz paths snapshots IDs. An authz path snapshot ID is required by
- * the MAuthzPathsMapping to detect new HMS snapshots created by the HMSFollower.
+ * the MAuthzPathsMapping to detect new HMS snapshots created by the HmsFollower.
*/
@PersistenceCapable
public class MAuthzPathsSnapshotId {
http://git-wip-us.apache.org/repos/asf/sentry/blob/024d99de/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java
index 0d54548..166bec7 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java
@@ -20,15 +20,13 @@ package org.apache.sentry.provider.db.service.model;
/**
* Database backend store for HMS Notification ID's. All the notifications that are processed
* by sentry are stored.
- */
-
-/*
+ * <p>
* <p> HMS notification ID's are stored in separate table for three reasons</p>
* <ol>
* <li>SENTRY_PATH_CHANGE is not updated for every notification that is received from HMS. There
- * are cases where HMSFollower doesn't process notifications and skip's them. Depending on
+ * are cases where HmsFollower doesn't process notifications and skip's them. Depending on
* SENTRY_PATH_CHANGE information may not provide the last notification processed.</li>
- * <li> There could be cases where HMSFollower thread in multiple sentry servers acting as a
+ * <li> There could be cases where HmsFollower thread in multiple sentry servers acting as a
* leader and process HMS notifications. we need to avoid processing the notifications
* multiple times. This can be made sure by always having some number of notification
* information always regardless of purging interval.</li>
http://git-wip-us.apache.org/repos/asf/sentry/blob/024d99de/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java
index 4d852e6..409a557 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java
@@ -31,17 +31,17 @@ public class PathsImage {
// A full image of hiveObj to Paths mapping.
private final Map<String, Set<String>> pathImage;
- private final long curSeqNum;
+ private final long id;
private final long curImgNum;
- public PathsImage(Map<String, Set<String>> pathImage, long curSeqNum, long curImgNum) {
+ public PathsImage(Map<String, Set<String>> pathImage, long id, long curImgNum) {
this.pathImage = pathImage;
- this.curSeqNum = curSeqNum;
+ this.id = id;
this.curImgNum = curImgNum;
}
- public long getCurSeqNum() {
- return curSeqNum;
+ public long getId() {
+ return id;
}
public long getCurImgNum() {
http://git-wip-us.apache.org/repos/asf/sentry/blob/024d99de/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
index 979e45b..7b02e2c 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
@@ -107,7 +107,7 @@ import static org.apache.sentry.provider.db.service.persistent.QueryParamBuilder
* single node and rely on DB for multi-node synchronization.
* <p>
* This isn't much of a problem for path updates since they are
- * driven by HMSFollower which usually runs on a single leader
+ * driven by HmsFollower which usually runs on a single leader
* node, but permission updates originate from clients
* directly and may be highly concurrent.
* <p>
@@ -151,7 +151,7 @@ public class SentryStore {
private static final long COUNT_VALUE_UNKNOWN = -1L;
// Representation for unknown HMS notification ID
- private static final long NOTIFICATION_UNKNOWN = -1L;
+ public static final long NOTIFICATION_UNKNOWN = -1L;
private static final Set<String> ALL_ACTIONS = Sets.newHashSet(AccessConstants.ALL,
AccessConstants.SELECT, AccessConstants.INSERT, AccessConstants.ALTER,
@@ -169,8 +169,8 @@ public class SentryStore {
private final TransactionManager tm;
/**
- * counterWait is used to synchronize notifications between Thrift and HMSFollower.
- * Technically it doesn't belong here, but the only thing that connects HMSFollower
+ * counterWait is used to synchronize notifications between Thrift and HmsFollower.
+ * Technically it doesn't belong here, but the only thing that connects HmsFollower
* and Thrift API is SentryStore. An alternative could be a singleton CounterWait or
* some factory that returns CounterWait instances keyed by name, but this complicates
* things unnecessary.
@@ -2674,7 +2674,7 @@ public class SentryStore {
/**
* Persist an up-to-date HMS snapshot into Sentry DB in a single transaction.
*
- * @param authzPaths Mapping of hiveObj to < Paths <
+ * @param authzPaths paths to be be persisted
* @throws Exception
*/
public void persistFullPathsImage(final Map<String, Set<String>> authzPaths) throws Exception {
@@ -2685,7 +2685,6 @@ public class SentryStore {
long snapshotID = getCurrentAuthzPathsSnapshotID(pm);
long nextSnapshotID = snapshotID + 1;
-
pm.makePersistent(new MAuthzPathsSnapshotId(nextSnapshotID));
for (Map.Entry<String, Set<String>> authzPath : authzPaths.entrySet()) {
pm.makePersistent(new MAuthzPathsMapping(nextSnapshotID, authzPath.getKey(), authzPath.getValue()));
@@ -3704,7 +3703,7 @@ public class SentryStore {
*
* @param pm the PersistenceManager
* @return EMPTY_NOTIFICATION_ID(0) when there are no notifications processed.
- * else last NotificationID processed by HMSFollower
+ * else last NotificationID processed by HmsFollower
*/
static Long getLastProcessedNotificationIDCore(
PersistenceManager pm) {
http://git-wip-us.apache.org/repos/asf/sentry/blob/024d99de/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
index ad23334..cfd0e30 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
@@ -945,7 +945,10 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface {
} catch (SentryThriftAPIMismatchException e) {
LOGGER.error(e.getMessage(), e);
response.setStatus(Status.THRIFT_VERSION_MISMATCH(e.getMessage(), e));
- } catch (Exception e) {
+ } catch (SentryInvalidInputException e) {
+ response.setStatus(Status.InvalidInput(e.getMessage(), e));
+ }
+ catch (Exception e) {
String msg = "Unknown error for request: " + request + ", message: "
+ e.getMessage();
LOGGER.error(msg, e);
http://git-wip-us.apache.org/repos/asf/sentry/blob/024d99de/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
deleted file mode 100644
index 1b6ae18..0000000
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
+++ /dev/null
@@ -1,641 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.service.thrift;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Timer;
-import com.codahale.metrics.Timer.Context;
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
-import org.apache.hive.hcatalog.messaging.HCatEventMessage;
-import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
-import org.apache.sentry.core.common.exception.SentryInvalidHMSEventException;
-import org.apache.sentry.core.common.exception.SentryInvalidInputException;
-import org.apache.sentry.core.common.exception.SentryNoSuchObjectException;
-import org.apache.sentry.hdfs.PermissionsUpdate;
-import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges;
-import org.apache.sentry.provider.db.SentryPolicyStorePlugin;
-import org.apache.sentry.provider.db.service.persistent.SentryStore;
-import org.apache.sentry.provider.db.service.thrift.SentryMetrics;
-import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.sentry.binding.metastore.messaging.json.*;
-
-import javax.jdo.JDODataStoreException;
-import javax.security.auth.login.LoginException;
-
-import java.io.IOException;
-import java.net.SocketException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-
-import static com.codahale.metrics.MetricRegistry.name;
-import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_CREATE_WITH_POLICY_STORE;
-import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_DROP_WITH_POLICY_STORE;
-import static org.apache.sentry.hdfs.Updateable.Update;
-
-/**
- * HMSFollower is the thread which follows the Hive MetaStore state changes from Sentry.
- * It gets the full update and notification logs from HMS and applies it to
- * update permissions stored in Sentry using SentryStore and also update the < obj,path > state
- * stored for HDFS-Sentry sync.
- */
-@SuppressWarnings("PMD")
-public class HMSFollower implements Runnable, AutoCloseable {
- private static final Logger LOGGER = LoggerFactory.getLogger(HMSFollower.class);
- private HiveSimpleConnectionFactory hiveConnectionFactory;
- // Track the latest eventId of the event that has been logged. So we don't log the same message
- private long lastLoggedEventId = SentryStore.EMPTY_CHANGE_ID;
- private static boolean connectedToHMS = false;
- private HMSClient client;
- private final Configuration authzConf;
- private final SentryStore sentryStore;
- private String hiveInstance;
-
- private boolean needLogHMSSupportReady = true;
- private final LeaderStatusMonitor leaderMonitor;
-
- private static final String SNAPSHOT = "snapshot";
- /** Measures time to get full snapshot */
- private final Timer updateTimer = SentryMetrics.getInstance()
- .getTimer(name(FullUpdateInitializer.class, SNAPSHOT));
- /** Number of times update failed */
- private final Counter failedSnapshotsCount = SentryMetrics.getInstance()
- .getCounter(name(FullUpdateInitializer.class, "failed"));
-
- HMSFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor,
- HiveSimpleConnectionFactory hiveConnectionFactory) {
- authzConf = conf;
- this.leaderMonitor = leaderMonitor;
- sentryStore = store;
- this.hiveConnectionFactory = hiveConnectionFactory;
- }
-
- @VisibleForTesting
- HMSFollower(Configuration conf, SentryStore sentryStore, String hiveInstance)
- throws IOException, LoginException {
- this(conf, sentryStore, null, null);
- this.hiveInstance = hiveInstance;
- hiveConnectionFactory = new HiveSimpleConnectionFactory(conf, new HiveConf());
- hiveConnectionFactory.init();
- }
-
- @VisibleForTesting
- public static boolean isConnectedToHMS() {
- return connectedToHMS;
- }
-
- @Override
- public void close() {
- // Close any outstanding connections to HMS
- closeHMSConnection();
- try {
- hiveConnectionFactory.close();
- } catch (Exception e) {
- LOGGER.error("failed to close Hive Connection Factory", e);
- }
- }
-
- /**
- * Returns HMS Client if successful, returns null if HMS is not ready yet to take connections
- * Throws @LoginException if Kerberos context creation failed using Sentry's kerberos credentials
- * Throws @MetaException if there was a problem on creating an HMSClient
- */
- private HiveMetaStoreClient getMetaStoreClient()
- throws IOException, InterruptedException, MetaException {
- if (client == null) {
- client = hiveConnectionFactory.connect();
- connectedToHMS = true;
- }
- return client.getClient();
- }
-
- @Override
- public void run() {
- Long lastProcessedNotificationID;
- try {
- // Initializing lastProcessedNotificationID based on the latest persisted notification ID.
- lastProcessedNotificationID = sentryStore.getLastProcessedNotificationID();
- } catch (Exception e) {
- LOGGER.error("Failed to get the last processed notification id from sentry store, " +
- "Skipping the processing", e);
- return;
- }
- // Wake any clients connected to this service waiting for HMS already processed notifications.
- wakeUpWaitingClientsForSync(lastProcessedNotificationID);
- // Only the leader should listen to HMS updates
- if ((leaderMonitor != null) && !leaderMonitor.isLeader()) {
- // Close any outstanding connections to HMS
- closeHMSConnection();
- return;
- }
- processHiveMetastoreUpdates();
- }
-
- /**
- * Wakes up HMS waiters waiting for a specific event notification.
- *
- * @param eventID
- */
- private void wakeUpWaitingClientsForSync(long eventID) {
- CounterWait counterWait = sentryStore.getCounterWait();
-
- // Wake up any HMS waiters that are waiting for this ID.
- // counterWait should never be null, but tests mock SentryStore and a mocked one
- // doesn't have it.
- if (counterWait != null) {
- counterWait.update(eventID);
- }
- }
-
- /**
- * Processes new Hive Metastore notifications.
- *
- * If no notifications are processed yet, then it does a full initial snapshot of the Hive Metastore
- * followed by new notifications updates that could have happened after it.
- *
- * Clients connections waiting for an event notification will be woken up afterwards.
- */
- private void processHiveMetastoreUpdates() {
- try {
- // Decision of taking full snapshot is based on AuthzPathsMapping information persisted
- // in the sentry persistent store. If AuthzPathsMapping is empty, shapshot is needed.
- Long lastProcessedNotificationID;
- if (sentryStore.isAuthzPathsMappingEmpty()) {
- // TODO: expose time used for full update in the metrics
-
- // To ensure point-in-time snapshot consistency, need to make sure
- // there were no HMS updates while retrieving the snapshot.
- // In detail the logic is:
- //
- // 1. Read current HMS notification ID_initial
- // 2. Read HMS metadata state
- // 3. Read current notification ID_new
- // 4. If ID_initial != ID_new then the attempts for retrieving full HMS snapshot
- // will be dropped. A new attempts will be made after 500 milliseconds when
- // HMSFollower run again.
-
- CurrentNotificationEventId eventIDBefore = getMetaStoreClient().getCurrentNotificationEventId();
- LOGGER.info("Before fetching hive full snapshot, Current NotificationID = {}", eventIDBefore);
-
- Map<String, Set<String>> pathsFullSnapshot = fetchFullUpdate();
- if(pathsFullSnapshot.isEmpty()) {
- LOGGER.info("Hive full snapshot is Empty. Perhaps, HMS does not have any data");
- return;
- }
-
- CurrentNotificationEventId eventIDAfter = getMetaStoreClient().getCurrentNotificationEventId();
- LOGGER.info("After fetching hive full snapshot, Current NotificationID = {}", eventIDAfter);
-
- if (!eventIDBefore.equals(eventIDAfter)) {
- LOGGER.error("Fail to get a point-in-time hive full snapshot. Current ID = {}",
- eventIDAfter);
- return;
- }
-
- LOGGER.info("Successfully fetched hive full snapshot, Current NotificationID = {}",
- eventIDAfter);
- // As eventIDAfter is the last event that was processed, eventIDAfter is used to update
- // lastProcessedNotificationID instead of getting it from persistent store.
- lastProcessedNotificationID = eventIDAfter.getEventId();
- sentryStore.persistFullPathsImage(pathsFullSnapshot);
- sentryStore.persistLastProcessedNotificationID(eventIDAfter.getEventId());
- // Wake up any HMS waiters that could have been put on hold before getting the eventIDBefore value.
- wakeUpWaitingClientsForSync(lastProcessedNotificationID);
- } else {
- // Every time HMSFollower is scheduled to run, value should be updates based
- // on the value stored in database.
- lastProcessedNotificationID = sentryStore.getLastProcessedNotificationID();
- }
-
- // HMSFollower connected to HMS and it finished full snapshot if that was required
- // Log this message only once
- if (needLogHMSSupportReady && connectedToHMS) {
- LOGGER.info("Sentry HMS support is ready");
- needLogHMSSupportReady = false;
- }
-
- // HIVE-15761: Currently getNextNotification API may return an empty
- // NotificationEventResponse causing TProtocolException.
- // Workaround: Only processes the notification events newer than the last updated one.
- CurrentNotificationEventId eventId = getMetaStoreClient().getCurrentNotificationEventId();
- LOGGER.debug("Last Notification in HMS {} lastProcessedNotificationID is {}",
- eventId.getEventId(), lastProcessedNotificationID);
- if (eventId.getEventId() > lastProcessedNotificationID) {
- NotificationEventResponse response =
- getMetaStoreClient().getNextNotification(lastProcessedNotificationID, Integer.MAX_VALUE, null);
- if (response.isSetEvents()) {
- if (!response.getEvents().isEmpty()) {
- if (lastProcessedNotificationID != lastLoggedEventId) {
- // Only log when there are updates and the notification ID has changed.
- LOGGER.debug("lastProcessedNotificationID = {}. Processing {} events",
- lastProcessedNotificationID, response.getEvents().size());
- lastLoggedEventId = lastProcessedNotificationID;
- }
-
- processNotificationEvents(response.getEvents());
- }
- }
- }
- } catch (TException e) {
- // If the underlying exception is around socket exception, it is better to retry connection to HMS
- if (e.getCause() instanceof SocketException) {
- LOGGER.error("Encountered Socket Exception during fetching Notification entries, will reconnect to HMS", e);
- client.invalidate();
- closeHMSConnection();
- } else {
- LOGGER.error("ThriftException occured fetching Notification entries, will try", e);
- }
- } catch (SentryInvalidInputException | SentryInvalidHMSEventException e) {
- LOGGER.error("Encounter SentryInvalidInputException|SentryInvalidHMSEventException " +
- "while processing notification log", e);
- } catch (Throwable t) {
- // catching errors to prevent the executor to halt.
- LOGGER.error("Caught unexpected exception in HMSFollower! Caused by: " + t.getMessage(),
- t.getCause());
- t.printStackTrace();
- }
- }
-
- /**
- * Function to close HMS connection and any associated kerberos context (if applicable)
- */
- private void closeHMSConnection() {
- try {
- if (client != null) {
- LOGGER.info("Closing the HMS client connection");
- client.close();
- connectedToHMS = false;
- }
- } finally {
- client = null;
- }
- }
-
- /**
- * Retrieve a Hive full snapshot from HMS.
- *
- * @return HMS snapshot. Snapshot consists of a mapping from auth object name
- * to the set of paths corresponding to that name.
- * @throws InterruptedException
- * @throws TException
- * @throws ExecutionException
- */
- private Map<String, Set<String>> fetchFullUpdate()
- throws TException, ExecutionException {
- LOGGER.info("Request full HMS snapshot");
- try (FullUpdateInitializer updateInitializer =
- new FullUpdateInitializer(hiveConnectionFactory, authzConf);
- Context context = updateTimer.time()) {
- Map<String, Set<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot();
- LOGGER.info("Obtained full HMS snapshot");
- return pathsUpdate;
- } catch (Exception ignored) {
- failedSnapshotsCount.inc();
- // Caller will retry later
- return Collections.emptyMap();
- }
- }
-
- private boolean syncWithPolicyStore(HiveAuthzConf.AuthzConfVars syncConfVar) {
- return "true"
- .equalsIgnoreCase((authzConf.get(syncConfVar.getVar(), syncConfVar.getDefault())));
- }
-
- /**
- * Throws SentryInvalidHMSEventException if Notification event contains insufficient information
- */
- void processNotificationEvents(List<NotificationEvent> events) throws Exception {
- SentryJSONMessageDeserializer deserializer = new SentryJSONMessageDeserializer();
-
- boolean isNotificationProcessingSkipped = false;
- for (NotificationEvent event : events) {
- String dbName;
- String tableName;
- String oldLocation;
- String newLocation;
- String location;
- List<String> locations;
- NotificationProcessor notificationProcessor = new NotificationProcessor(sentryStore, LOGGER);
- try {
- LOGGER.debug("Processing notification with id {} and type {}", event.getEventId(),
- event.getEventType());
- switch (HCatEventMessage.EventType.valueOf(event.getEventType())) {
- case CREATE_DATABASE:
- SentryJSONCreateDatabaseMessage message =
- deserializer.getCreateDatabaseMessage(event.getMessage());
- dbName = message.getDB();
- location = message.getLocation();
- if ((dbName == null) || (location == null)) {
- isNotificationProcessingSkipped = true;
- LOGGER.error(String.format("Create database event " +
- "has incomplete information. dbName = %s location = %s",
- StringUtils.defaultIfBlank(dbName, "null"),
- StringUtils.defaultIfBlank(location, "null")));
- break;
- }
- if (syncWithPolicyStore(AUTHZ_SYNC_CREATE_WITH_POLICY_STORE)) {
- dropSentryDbPrivileges(dbName, event);
- }
- notificationProcessor.processCreateDatabase(dbName, location, event.getEventId());
- break;
- case DROP_DATABASE:
- SentryJSONDropDatabaseMessage dropDatabaseMessage =
- deserializer.getDropDatabaseMessage(event.getMessage());
- dbName = dropDatabaseMessage.getDB();
- location = dropDatabaseMessage.getLocation();
- if (dbName == null) {
- isNotificationProcessingSkipped = true;
- LOGGER.error("Drop database event has incomplete information: dbName = null");
- break;
- }
- if (syncWithPolicyStore(AUTHZ_SYNC_DROP_WITH_POLICY_STORE)) {
- dropSentryDbPrivileges(dbName, event);
- }
- notificationProcessor.processDropDatabase(dbName, location, event.getEventId());
- break;
- case CREATE_TABLE:
- SentryJSONCreateTableMessage createTableMessage = deserializer.getCreateTableMessage(event.getMessage());
- dbName = createTableMessage.getDB();
- tableName = createTableMessage.getTable();
- location = createTableMessage.getLocation();
- if ((dbName == null) || (tableName == null) || (location == null)) {
- isNotificationProcessingSkipped = true;
- LOGGER.error(String.format("Create table event " + "has incomplete information."
- + " dbName = %s, tableName = %s, location = %s",
- StringUtils.defaultIfBlank(dbName, "null"),
- StringUtils.defaultIfBlank(tableName, "null"),
- StringUtils.defaultIfBlank(location, "null")));
- break;
- }
- if (syncWithPolicyStore(AUTHZ_SYNC_CREATE_WITH_POLICY_STORE)) {
- dropSentryTablePrivileges(dbName, tableName, event);
- }
- notificationProcessor.processCreateTable(dbName, tableName, location, event.getEventId());
- break;
- case DROP_TABLE:
- SentryJSONDropTableMessage dropTableMessage = deserializer.getDropTableMessage(event.getMessage());
- dbName = dropTableMessage.getDB();
- tableName = dropTableMessage.getTable();
- if ((dbName == null) || (tableName == null)) {
- isNotificationProcessingSkipped = true;
- LOGGER.error(String.format("Drop table event " +
- "has incomplete information. dbName = %s, tableName = %s",
- StringUtils.defaultIfBlank(dbName, "null"),
- StringUtils.defaultIfBlank(tableName, "null")));
- break;
- }
- if (syncWithPolicyStore(AUTHZ_SYNC_DROP_WITH_POLICY_STORE)) {
- dropSentryTablePrivileges(dbName, tableName, event);
- }
- notificationProcessor.processDropTable(dbName, tableName, event.getEventId());
- break;
- case ALTER_TABLE:
- SentryJSONAlterTableMessage alterTableMessage = deserializer.getAlterTableMessage(event.getMessage());
-
- String oldDbName = alterTableMessage.getDB();
- String oldTableName = alterTableMessage.getTable();
- String newDbName = event.getDbName();
- String newTableName = event.getTableName();
- oldLocation = alterTableMessage.getOldLocation();
- newLocation = alterTableMessage.getNewLocation();
-
- if ((oldDbName == null) ||
- (oldTableName == null) ||
- (newDbName == null) ||
- (newTableName == null) ||
- (oldLocation == null) ||
- (newLocation == null)) {
- isNotificationProcessingSkipped = true;
- LOGGER.error(String.format("Alter table event " +
- "has incomplete information. oldDbName = %s, oldTableName = %s, oldLocation = %s, " +
- "newDbName = %s, newTableName = %s, newLocation = %s",
- StringUtils.defaultIfBlank(oldDbName, "null"),
- StringUtils.defaultIfBlank(oldTableName, "null"),
- StringUtils.defaultIfBlank(oldLocation, "null"),
- StringUtils.defaultIfBlank(newDbName, "null"),
- StringUtils.defaultIfBlank(newTableName, "null"),
- StringUtils.defaultIfBlank(newLocation, "null")));
- break;
- } else if ((oldDbName.equalsIgnoreCase(newDbName)) &&
- (oldTableName.equalsIgnoreCase(newTableName)) &&
- (oldLocation.equalsIgnoreCase(newLocation))) {
- isNotificationProcessingSkipped = true;
- LOGGER.info(String.format("Alter table notification ignored as neither name nor " +
- "location has changed: oldAuthzObj = %s, oldLocation = %s, newAuthzObj = %s, " +
- "newLocation = %s", oldDbName + "." + oldTableName , oldLocation,
- newDbName + "." + newTableName, newLocation));
- break;
- }
-
- if (!newDbName.equalsIgnoreCase(oldDbName) || !oldTableName.equalsIgnoreCase(newTableName)) {
- // Name has changed
- try {
- renamePrivileges(oldDbName, oldTableName, newDbName, newTableName);
- } catch (SentryNoSuchObjectException e) {
- LOGGER.info("Rename Sentry privilege ignored as there are no privileges on the table: %s.%s",
- oldDbName, oldTableName);
- } catch (Exception e) {
- isNotificationProcessingSkipped = true;
- LOGGER.info("Could not process Alter table event. Event: " + event.toString(), e);
- break;
- }
- }
- notificationProcessor.processAlterTable(oldDbName, newDbName, oldTableName,
- newTableName, oldLocation, newLocation, event.getEventId());
- break;
- case ADD_PARTITION:
- SentryJSONAddPartitionMessage addPartitionMessage =
- deserializer.getAddPartitionMessage(event.getMessage());
- dbName = addPartitionMessage.getDB();
- tableName = addPartitionMessage.getTable();
- locations = addPartitionMessage.getLocations();
- if ((dbName == null) || (tableName == null) || (locations == null)) {
- isNotificationProcessingSkipped = true;
- LOGGER.error(String.format("Create table event has incomplete information. " +
- "dbName = %s, tableName = %s, locations = %s",
- StringUtils.defaultIfBlank(dbName, "null"),
- StringUtils.defaultIfBlank(tableName, "null"),
- locations != null ? locations.toString() : "null"));
- break;
- }
- notificationProcessor.processAddPartition(dbName, tableName, locations, event.getEventId());
- break;
- case DROP_PARTITION:
- SentryJSONDropPartitionMessage dropPartitionMessage =
- deserializer.getDropPartitionMessage(event.getMessage());
- dbName = dropPartitionMessage.getDB();
- tableName = dropPartitionMessage.getTable();
- locations = dropPartitionMessage.getLocations();
- if ((dbName == null) || (tableName == null) || (locations == null)) {
- isNotificationProcessingSkipped = true;
- LOGGER.error(String.format("Drop partition event " +
- "has incomplete information. dbName = %s, tableName = %s, location = %s",
- StringUtils.defaultIfBlank(dbName, "null"),
- StringUtils.defaultIfBlank(tableName, "null"),
- locations != null ? locations.toString() : "null"));
- break;
- }
- notificationProcessor.processDropPartition(dbName, tableName, locations, event.getEventId());
-
- break;
- case ALTER_PARTITION:
- SentryJSONAlterPartitionMessage alterPartitionMessage =
- deserializer.getAlterPartitionMessage(event.getMessage());
- dbName = alterPartitionMessage.getDB();
- tableName = alterPartitionMessage.getTable();
- oldLocation = alterPartitionMessage.getOldLocation();
- newLocation = alterPartitionMessage.getNewLocation();
-
- if ((dbName == null) ||
- (tableName == null) ||
- (oldLocation == null) ||
- (newLocation == null)) {
- isNotificationProcessingSkipped = true;
- LOGGER.error(String.format("Alter partition event " +
- "has incomplete information. dbName = %s, tableName = %s, " +
- "oldLocation = %s, newLocation = %s",
- StringUtils.defaultIfBlank(dbName, "null"),
- StringUtils.defaultIfBlank(tableName, "null"),
- StringUtils.defaultIfBlank(oldLocation, "null"),
- StringUtils.defaultIfBlank(newLocation, "null")));
- break;
- } else if (oldLocation.equalsIgnoreCase(newLocation)) {
- isNotificationProcessingSkipped = true;
- LOGGER.info(String.format("Alter partition notification ignored as" +
- "location has not changed: AuthzObj = %s, Location = %s", dbName + "." +
- "." + tableName, oldLocation));
- break;
- }
-
- notificationProcessor.processAlterPartition(dbName, tableName, oldLocation,
- newLocation, event.getEventId());
- break;
- case INSERT:
- // TODO DO we need to do anything here?
- break;
- }
- } catch (Exception e) {
- if (e.getCause() instanceof JDODataStoreException) {
- LOGGER.info("Received JDO Storage Exception, Could be because of processing " +
- "duplicate notification");
- if (event.getEventId() <= sentryStore.getLastProcessedNotificationID()) {
- // Rest of the notifications need not be processed.
- throw e;
- }
- }
- sentryStore.persistLastProcessedNotificationID(event.getEventId());
- }
- if (isNotificationProcessingSkipped) {
- // Update the notification ID in the persistent store even when the notification is
- // not processed as the content in in the notification is not valid.
- // Continue processing the next notification.
- sentryStore.persistLastProcessedNotificationID(event.getEventId());
- isNotificationProcessingSkipped = false;
- }
- // Wake up any HMS waiters that are waiting for this ID.
- wakeUpWaitingClientsForSync(event.getEventId());
- }
- }
-
- private void dropSentryDbPrivileges(String dbName, NotificationEvent event) throws Exception {
- try {
- TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
- authorizable.setDb(dbName);
- sentryStore.dropPrivilege(authorizable, onDropSentryPrivilege(authorizable));
- } catch (SentryNoSuchObjectException e) {
- LOGGER.info("Drop Sentry privilege ignored as there are no privileges on the database: %s", dbName);
- } catch (Exception e) {
- throw new SentryInvalidInputException("Could not process Drop database event." +
- "Event: " + event.toString(), e);
- }
- }
-
- private void dropSentryTablePrivileges(String dbName, String tableName, NotificationEvent event) throws Exception {
- try {
- TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
- authorizable.setDb(dbName);
- authorizable.setTable(tableName);
- sentryStore.dropPrivilege(authorizable, onDropSentryPrivilege(authorizable));
- } catch (SentryNoSuchObjectException e) {
- LOGGER.info("Drop Sentry privilege ignored as there are no privileges on the table: %s.%s", dbName, tableName);
- } catch (Exception e) {
- throw new SentryInvalidInputException("Could not process Drop table event. Event: " + event.toString(), e);
- }
- }
-
- private void renamePrivileges(String oldDbName, String oldTableName, String newDbName, String newTableName) throws
- Exception {
- TSentryAuthorizable oldAuthorizable = new TSentryAuthorizable(hiveInstance);
- oldAuthorizable.setDb(oldDbName);
- oldAuthorizable.setTable(oldTableName);
- TSentryAuthorizable newAuthorizable = new TSentryAuthorizable(hiveInstance);
- newAuthorizable.setDb(newDbName);
- newAuthorizable.setTable(newTableName);
- Update update =
- onRenameSentryPrivilege(oldAuthorizable, newAuthorizable);
- sentryStore.renamePrivilege(oldAuthorizable, newAuthorizable, update);
- }
-
- @VisibleForTesting
- static Update onDropSentryPrivilege(TSentryAuthorizable authorizable) {
- PermissionsUpdate update = new PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false);
- String authzObj = getAuthzObj(authorizable);
- update.addPrivilegeUpdate(authzObj).putToDelPrivileges(PermissionsUpdate.ALL_ROLES, PermissionsUpdate.ALL_ROLES);
- return update;
- }
-
- @VisibleForTesting
- static Update onRenameSentryPrivilege(TSentryAuthorizable oldAuthorizable,
- TSentryAuthorizable newAuthorizable)
- throws SentryPolicyStorePlugin.SentryPluginException {
- String oldAuthz = getAuthzObj(oldAuthorizable);
- String newAuthz = getAuthzObj(newAuthorizable);
- PermissionsUpdate update = new PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false);
- TPrivilegeChanges privUpdate = update.addPrivilegeUpdate(PermissionsUpdate.RENAME_PRIVS);
- privUpdate.putToAddPrivileges(newAuthz, newAuthz);
- privUpdate.putToDelPrivileges(oldAuthz, oldAuthz);
- return update;
- }
-
- public static String getAuthzObj(TSentryAuthorizable authzble) {
- String authzObj = null;
- if (!SentryStore.isNULL(authzble.getDb())) {
- String dbName = authzble.getDb();
- String tblName = authzble.getTable();
- if (SentryStore.isNULL(tblName)) {
- authzObj = dbName;
- } else {
- authzObj = dbName + "." + tblName;
- }
- }
- return authzObj == null ? null : authzObj.toLowerCase();
- }
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/024d99de/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HmsFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HmsFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HmsFollower.java
new file mode 100644
index 0000000..a9d05b1
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HmsFollower.java
@@ -0,0 +1,275 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ <p>
+ http://www.apache.org/licenses/LICENSE-2.0
+ <p>
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.net.SocketException;
+
+import java.util.Collection;
+import javax.jdo.JDODataStoreException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
+import org.apache.sentry.provider.db.service.persistent.PathsImage;
+import org.apache.sentry.provider.db.service.persistent.SentryStore;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HmsFollower is the thread which follows the Hive MetaStore state changes from Sentry.
+ * It gets the full update and notification logs from HMS and applies it to
+ * update permissions stored in Sentry using SentryStore and also update the < obj,path > state
+ * stored for HDFS-Sentry sync.
+ */
+public class HmsFollower implements Runnable, AutoCloseable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(HmsFollower.class);
+ private static boolean connectedToHms = false;
+ private final SentryHmsClient client;
+ private final Configuration authzConf;
+ private final SentryStore sentryStore;
+ private final NotificationProcessor notificationProcessor;
+
+ private final LeaderStatusMonitor leaderMonitor;
+
+ /**
+ * Configuring Hms Follower thread.
+ *
+ * @param conf sentry configuration
+ * @param store sentry store
+ * @param leaderMonitor singleton instance of LeaderStatusMonitor
+ */
+ HmsFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor,
+ HiveSimpleConnectionFactory hiveConnectionFactory) {
+ this(conf, store, leaderMonitor, hiveConnectionFactory, null);
+ }
+
+ @VisibleForTesting
+ /**
+ * Constructor should be used only for testing purposes.
+ *
+ * @param conf sentry configuration
+ * @param store sentry store
+ * @param leaderMonitor
+ * @param authServerName Server that sentry is Authorizing
+ */
+ HmsFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor,
+ HiveSimpleConnectionFactory hiveConnectionFactory, String authServerName) {
+ LOGGER.info("HmsFollower is being initialized");
+ authzConf = conf;
+ this.leaderMonitor = leaderMonitor;
+ sentryStore = store;
+ if (authServerName == null) {
+ HiveConf hiveConf = new HiveConf();
+ authServerName = hiveConf.get(HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME.getVar());
+ }
+ notificationProcessor = new NotificationProcessor(sentryStore, authServerName, authzConf);
+ client = new SentryHmsClient(authzConf, hiveConnectionFactory);
+ }
+
+ @VisibleForTesting
+ public static boolean isConnectedToHms() {
+ return connectedToHms;
+ }
+
+ @Override
+ public void close() {
+ if (client != null) {
+ // Close any outstanding connections to HMS
+ try {
+ client.disconnect();
+ } catch (Exception failure) {
+ LOGGER.error("Failed to close the Sentry Hms Client", failure);
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ long lastProcessedNotificationId;
+ try {
+ // Initializing lastProcessedNotificationId based on the latest persisted notification ID.
+ lastProcessedNotificationId = sentryStore.getLastProcessedNotificationID();
+ } catch (Exception e) {
+ LOGGER.error("Failed to get the last processed notification id from sentry store, "
+ + "Skipping the processing", e);
+ return;
+ }
+ // Wake any clients connected to this service waiting for HMS already processed notifications.
+ wakeUpWaitingClientsForSync(lastProcessedNotificationId);
+ // Only the leader should listen to HMS updates
+ if ((leaderMonitor != null) && !leaderMonitor.isLeader()) {
+ // Close any outstanding connections to HMS
+ close();
+ return;
+ }
+ syncupWithHms(lastProcessedNotificationId);
+ }
+
+ /**
+ * Processes new Hive Metastore notifications.
+ *
+ * <p>If no notifications are processed yet, then it
+ * does a full initial snapshot of the Hive Metastore followed by new notifications updates that
+ * could have happened after it.
+ *
+ * <p>Clients connections waiting for an event notification will be
+ * woken up afterwards.
+ */
+ private void syncupWithHms(long notificationId) {
+ try {
+ client.connect();
+ connectedToHms = true;
+ } catch (Throwable e) {
+ LOGGER.error("HmsFollower cannot connect to HMS!!", e);
+ return;
+ }
+
+ try {
+ long lastProcessedNotificationId = notificationId;
+ // Create a full HMS snapshot if there is none
+ // Decision of taking full snapshot is based on AuthzPathsMapping information persisted
+ // in the sentry persistent store. If AuthzPathsMapping is empty, snapshot is needed.
+ if (sentryStore.isAuthzPathsMappingEmpty()) {
+ lastProcessedNotificationId = createFullSnapshot();
+ if (lastProcessedNotificationId == SentryStore.EMPTY_NOTIFICATION_ID) {
+ return;
+ }
+ }
+ // Get the new notification from HMS and process them.
+ processNotifications(client.getNotifications(lastProcessedNotificationId));
+ } catch (TException e) {
+ // If the underlying exception is around socket exception,
+ // it is better to retry connection to HMS
+ if (e.getCause() instanceof SocketException) {
+ LOGGER.error("Encountered Socket Exception during fetching Notification entries,"
+ + " will attempt to reconnect to HMS after configured interval", e);
+ close();
+ } else {
+ LOGGER.error("ThriftException occurred communicating with HMS", e);
+ }
+ } catch (Throwable t) {
+ // catching errors to prevent the executor to halt.
+ LOGGER.error("Exception in HmsFollower! Caused by: " + t.getMessage(),
+ t);
+ }
+ }
+
+ /**
+ * Request for full snapshot and persists it if there is no snapshot available in the
+ * sentry store. Also, wakes-up any waiting clients.
+ *
+ * @return ID of last notification processed.
+ * @throws Exception if there are failures
+ */
+ private long createFullSnapshot() throws Exception {
+ LOGGER.debug("Attempting to take full HMS snapshot");
+ PathsImage snapshotInfo = client.getFullSnapshot();
+ if (snapshotInfo.getPathImage().isEmpty()) {
+ return snapshotInfo.getId();
+ }
+ try {
+ LOGGER.debug("Persisting HMS path full snapshot");
+ sentryStore.persistFullPathsImage(snapshotInfo.getPathImage());
+ sentryStore.persistLastProcessedNotificationID(snapshotInfo.getId());
+ } catch (Exception failure) {
+ LOGGER.error("Received exception while persisting HMS path full snapshot ");
+ throw failure;
+ }
+ // Wake up any HMS waiters that could have been put on hold before getting the
+ // eventIDBefore value.
+ wakeUpWaitingClientsForSync(snapshotInfo.getId());
+ // HmsFollower connected to HMS and it finished full snapshot if that was required
+ // Log this message only once
+ LOGGER.info("Sentry HMS support is ready");
+ return snapshotInfo.getId();
+ }
+
+ /**
+ * Process the collection of notifications and wake up any waiting clients.
+ * Also, persists the notification ID regardless of processing result.
+ *
+ * @param events list of event to be processed
+ * @throws Exception if the complete notification list is not processed because of JDO Exception
+ */
+ void processNotifications(Collection<NotificationEvent> events) throws Exception {
+ boolean isNotificationProcessed;
+ if (events.isEmpty()) {
+ return;
+ }
+ for (NotificationEvent event : events) {
+ isNotificationProcessed = false;
+ try {
+ // Only the leader should process the notifications
+ if ((leaderMonitor != null) && !leaderMonitor.isLeader()) {
+ return;
+ }
+ isNotificationProcessed = notificationProcessor.processNotificationEvent(event);
+ } catch (Exception e) {
+ if (e.getCause() instanceof JDODataStoreException) {
+ LOGGER.info("Received JDO Storage Exception, Could be because of processing "
+ + "duplicate notification");
+ if (event.getEventId() <= sentryStore.getLastProcessedNotificationID()) {
+ // Rest of the notifications need not be processed.
+ LOGGER.error("Received event with Id: {} which is smaller then the ID "
+ + "persisted in store", event.getEventId());
+ break;
+ }
+ } else {
+ LOGGER.error("Processing the notification with ID:{} failed with exception {}",
+ event.getEventId(), e);
+ }
+ }
+ if (!isNotificationProcessed) {
+ try {
+ // Update the notification ID in the persistent store even when the notification is
+ // not processed as the content in in the notification is not valid.
+ // Continue processing the next notification.
+ LOGGER.debug("Explicitly Persisting Notification ID:{}", event.getEventId());
+ sentryStore.persistLastProcessedNotificationID(event.getEventId());
+ } catch (Exception failure) {
+ LOGGER.error("Received exception while persisting the notification ID "
+ + event.getEventId());
+ throw failure;
+ }
+ }
+ // Wake up any HMS waiters that are waiting for this ID.
+ wakeUpWaitingClientsForSync(event.getEventId());
+ }
+ }
+
+ /**
+ * Wakes up HMS waiters waiting for a specific event notification.
+ *
+ * @param eventId Id of a notification
+ */
+ private void wakeUpWaitingClientsForSync(long eventId) {
+ CounterWait counterWait = sentryStore.getCounterWait();
+
+ // Wake up any HMS waiters that are waiting for this ID.
+ // counterWait should never be null, but tests mock SentryStore and a mocked one
+ // doesn't have it.
+ if (counterWait != null) {
+ counterWait.update(eventId);
+ }
+ }
+}