You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ka...@apache.org on 2017/07/18 14:31:29 UTC
[2/3] sentry git commit: SENTRY-1852: Refactor HMSFollower without
renaming files (Kalyan Kumar Kalvagadda reviewed by Vamsee Yarlagadda, Na Li,
Sergio Pena and Alexander Kolbasov)
http://git-wip-us.apache.org/repos/asf/sentry/blob/4cdb506c/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java
index 6762de7..3a5ba46 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java
@@ -15,82 +15,275 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.sentry.service.thrift;
+import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_CREATE_WITH_POLICY_STORE;
+import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_DROP_WITH_POLICY_STORE;
+
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hive.hcatalog.messaging.HCatEventMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAddPartitionMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterPartitionMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterTableMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateDatabaseMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateTableMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropDatabaseMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropPartitionMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropTableMessage;
+import org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageDeserializer;
+import org.apache.sentry.core.common.exception.SentryInvalidHMSEventException;
+import org.apache.sentry.core.common.exception.SentryInvalidInputException;
+import org.apache.sentry.core.common.exception.SentryNoSuchObjectException;
import org.apache.sentry.hdfs.PathsUpdate;
+import org.apache.sentry.hdfs.PermissionsUpdate;
import org.apache.sentry.hdfs.SentryMalformedPathException;
-import org.apache.sentry.core.common.exception.SentryInvalidHMSEventException;
+import org.apache.sentry.hdfs.Updateable.Update;
+import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges;
import org.apache.sentry.provider.db.service.persistent.SentryStore;
+import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
/**
* NotificationProcessor processes various notification events generated from
- * the Hive MetaStore state change, and applies these changes on the complete
+ * the Hive MetaStore state change, and applies these changes to the complete
* HMS Paths snapshot or delta update stored in Sentry using SentryStore.
- * <p>
- * NotificationProcessor should not skip processing notification events for any reason.
+ *
+ * <p>NotificationProcessor should not skip processing notification events for any reason.
* If some notification events are to be skipped, appropriate logic should be added in
* HMSFollower before invoking NotificationProcessor.
*/
-class NotificationProcessor {
+final class NotificationProcessor {
- private final Logger LOGGER;
+ private static final Logger LOGGER = LoggerFactory.getLogger(NotificationProcessor.class);
private final SentryStore sentryStore;
+ private final SentryJSONMessageDeserializer deserializer;
+ private final String authServerName;
+ // These variables can be updated even after object is instantiated, for testing purposes.
+ private boolean syncStoreOnCreate = false;
+ private boolean syncStoreOnDrop = false;
- NotificationProcessor(SentryStore sentryStore, Logger LOGGER) {
- this.LOGGER = LOGGER;
+ /**
+ * Configuring notification processor.
+ *
+ * @param sentryStore sentry backend store
+ * @param authServerName Server that sentry is authorizing
+ * @param conf sentry configuration
+ */
+ NotificationProcessor(SentryStore sentryStore, String authServerName,
+ Configuration conf) {
this.sentryStore = sentryStore;
+ deserializer = new SentryJSONMessageDeserializer();
+ this.authServerName = authServerName;
+ syncStoreOnCreate = Boolean
+ .parseBoolean(conf.get(AUTHZ_SYNC_CREATE_WITH_POLICY_STORE.getVar(),
+ AUTHZ_SYNC_CREATE_WITH_POLICY_STORE.getDefault()));
+ syncStoreOnDrop = Boolean.parseBoolean(conf.get(AUTHZ_SYNC_DROP_WITH_POLICY_STORE.getVar(),
+ AUTHZ_SYNC_DROP_WITH_POLICY_STORE.getDefault()));
+ }
+
+ /**
+ * Split path into components on the "/" character.
+ * The path should not start with "/".
+ * This is consumed by Thrift interface, so the return result should be
+ * {@code List<String>}
+ *
+ * @param path input oath e.g. {@code foo/bar}
+ * @return list of components, e.g. [foo, bar]
+ */
+ private static List<String> splitPath(String path) {
+ return (Lists.newArrayList(path.split("/")));
+ }
+
+ /**
+ * Constructs permission update to be persisted for drop event that can be persisted
+ * from thrift object.
+ *
+ * @param authorizable thrift object that is dropped.
+ * @return update to be persisted
+ * @throws SentryInvalidInputException if the required fields are set in argument provided
+ */
+ @VisibleForTesting
+ static Update getPermUpdatableOnDrop(TSentryAuthorizable authorizable)
+ throws SentryInvalidInputException {
+ PermissionsUpdate update = new PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false);
+ String authzObj = SentryServiceUtil.getAuthzObj(authorizable);
+ update.addPrivilegeUpdate(authzObj)
+ .putToDelPrivileges(PermissionsUpdate.ALL_ROLES, PermissionsUpdate.ALL_ROLES);
+ return update;
+ }
+
+ /**
+ * Constructs permission update to be persisted for rename event that can be persisted from thrift
+ * object.
+ *
+ * @param oldAuthorizable old thrift object
+ * @param newAuthorizable new thrift object
+ * @return update to be persisted
+ * @throws SentryInvalidInputException if the required fields are set in arguments provided
+ */
+ @VisibleForTesting
+ static Update getPermUpdatableOnRename(TSentryAuthorizable oldAuthorizable,
+ TSentryAuthorizable newAuthorizable)
+ throws SentryInvalidInputException {
+ String oldAuthz = SentryServiceUtil.getAuthzObj(oldAuthorizable);
+ String newAuthz = SentryServiceUtil.getAuthzObj(newAuthorizable);
+ PermissionsUpdate update = new PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false);
+ TPrivilegeChanges privUpdate = update.addPrivilegeUpdate(PermissionsUpdate.RENAME_PRIVS);
+ privUpdate.putToAddPrivileges(newAuthz, newAuthz);
+ privUpdate.putToDelPrivileges(oldAuthz, oldAuthz);
+ return update;
+ }
+
+ /**
+ * This function is only used for testing purposes.
+ *
+ * @param value to be set
+ */
+ @SuppressWarnings("SameParameterValue")
+ @VisibleForTesting
+ void setSyncStoreOnCreate(boolean value) {
+ syncStoreOnCreate = value;
+ }
+
+ /**
+ * This function is only used for testing purposes.
+ *
+ * @param value to be set
+ */
+ @SuppressWarnings("SameParameterValue")
+ @VisibleForTesting
+ void setSyncStoreOnDrop(boolean value) {
+ syncStoreOnDrop = value;
+ }
+
+ /**
+ * Processes the event and persist to sentry store.
+ *
+ * @param event to be processed
+ * @return true, if the event is persisted to sentry store. false, if the event is not persisted.
+ * @throws Exception if there is an error processing the event.
+ */
+ boolean processNotificationEvent(NotificationEvent event) throws Exception {
+ LOGGER
+ .debug("Processing event with id:{} and Type:{}", event.getEventId(), event.getEventType());
+ switch (HCatEventMessage.EventType.valueOf(event.getEventType())) {
+ case CREATE_DATABASE:
+ return processCreateDatabase(event);
+ case DROP_DATABASE:
+ return processDropDatabase(event);
+ case CREATE_TABLE:
+ return processCreateTable(event);
+ case DROP_TABLE:
+ return processDropTable(event);
+ case ALTER_TABLE:
+ return processAlterTable(event);
+ case ADD_PARTITION:
+ return processAddPartition(event);
+ case DROP_PARTITION:
+ return processDropPartition(event);
+ case ALTER_PARTITION:
+ return processAlterPartition(event);
+ case INSERT:
+ return false;
+ default:
+ LOGGER.error("Notification with ID:{} has invalid event type: {}", event.getEventId(),
+ event.getEventType());
+ return false;
+ }
}
/**
* Processes "create database" notification event, and applies its corresponding
* snapshot change as well as delta path update into Sentry DB.
*
- * @param dbName database name
- * @param location database location
- * @param seqNum notification event ID
+ * @param event notification event to be processed.
* @throws Exception if encounters errors while persisting the path change
*/
- void processCreateDatabase(String dbName, String location, long seqNum) throws Exception {
+ private boolean processCreateDatabase(NotificationEvent event) throws Exception {
+ SentryJSONCreateDatabaseMessage message =
+ deserializer.getCreateDatabaseMessage(event.getMessage());
+ String dbName = message.getDB();
+ String location = message.getLocation();
+ if ((dbName == null) || (location == null)) {
+ LOGGER.error("Create database event "
+ + "has incomplete information. dbName: {} location: {}",
+ StringUtils.defaultIfBlank(dbName, "null"),
+ StringUtils.defaultIfBlank(location, "null"));
+ return false;
+ }
List<String> locations = Collections.singletonList(location);
- addPaths(dbName, locations, seqNum);
+ addPaths(dbName, locations, event.getEventId());
+ if (syncStoreOnCreate) {
+ dropSentryDbPrivileges(dbName, event);
+ }
+ return true;
}
/**
* Processes "drop database" notification event, and applies its corresponding
* snapshot change as well as delta path update into Sentry DB.
*
- * @param dbName database name
- * @param location database location
- * @param seqNum notification event ID
+ * @param event notification event to be processed.
* @throws Exception if encounters errors while persisting the path change
*/
- void processDropDatabase(String dbName, String location, long seqNum) throws Exception {
+ private boolean processDropDatabase(NotificationEvent event) throws Exception {
+ SentryJSONDropDatabaseMessage dropDatabaseMessage =
+ deserializer.getDropDatabaseMessage(event.getMessage());
+ String dbName = dropDatabaseMessage.getDB();
+ String location = dropDatabaseMessage.getLocation();
+ if (dbName == null) {
+ LOGGER.error("Drop database event has incomplete information: dbName = null");
+ return false;
+ }
+ if (syncStoreOnDrop) {
+ dropSentryDbPrivileges(dbName, event);
+ }
List<String> locations = Collections.singletonList(location);
- removePaths(dbName, locations, seqNum);
+ removePaths(dbName, locations, event.getEventId());
+ return true;
}
/**
* Processes "create table" notification event, and applies its corresponding
* snapshot change as well as delta path update into Sentry DB.
*
- * @param dbName database name
- * @param tableName table name
- * @param location table location
- * @param seqNum notification event ID
+ * @param event notification event to be processed.
* @throws Exception if encounters errors while persisting the path change
*/
- void processCreateTable(String dbName, String tableName, String location, long seqNum)
- throws Exception {
- String authzObj = dbName + "." + tableName;
+ private boolean processCreateTable(NotificationEvent event)
+ throws Exception {
+ SentryJSONCreateTableMessage createTableMessage = deserializer
+ .getCreateTableMessage(event.getMessage());
+ String dbName = createTableMessage.getDB();
+ String tableName = createTableMessage.getTable();
+ String location = createTableMessage.getLocation();
+ if ((dbName == null) || (tableName == null) || (location == null)) {
+ LOGGER.error(String.format("Create table event " + "has incomplete information."
+ + " dbName = %s, tableName = %s, location = %s",
+ StringUtils.defaultIfBlank(dbName, "null"),
+ StringUtils.defaultIfBlank(tableName, "null"),
+ StringUtils.defaultIfBlank(location, "null")));
+ return false;
+ }
+ if (syncStoreOnCreate) {
+ dropSentryTablePrivileges(dbName, tableName, event);
+ }
+ String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName);
List<String> locations = Collections.singletonList(location);
- addPaths(authzObj, locations, seqNum);
+ addPaths(authzObj, locations, event.getEventId());
+ return true;
}
/**
@@ -98,86 +291,185 @@ class NotificationProcessor {
* the table as well. And applies its corresponding snapshot change as well
* as delta path update into Sentry DB.
*
- * @param dbName database name
- * @param tableName table name
- * @param seqNum notification event ID
+ * @param event notification event to be processed.
* @throws Exception if encounters errors while persisting the path change
*/
- void processDropTable(String dbName, String tableName, long seqNum) throws Exception {
- String authzObj = dbName + "." + tableName;
- removeAllPaths(authzObj, seqNum);
+ private boolean processDropTable(NotificationEvent event) throws Exception {
+ SentryJSONDropTableMessage dropTableMessage = deserializer
+ .getDropTableMessage(event.getMessage());
+ String dbName = dropTableMessage.getDB();
+ String tableName = dropTableMessage.getTable();
+ if ((dbName == null) || (tableName == null)) {
+ LOGGER.error("Drop table event "
+ + "has incomplete information. dbName: {}, tableName: {}",
+ StringUtils.defaultIfBlank(dbName, "null"),
+ StringUtils.defaultIfBlank(tableName, "null"));
+ return false;
+ }
+ if (syncStoreOnDrop) {
+ dropSentryTablePrivileges(dbName, tableName, event);
+ }
+ String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName);
+ removeAllPaths(authzObj, event.getEventId());
+ return true;
}
/**
* Processes "alter table" notification event, and applies its corresponding
* snapshot change as well as delta path update into Sentry DB.
*
- * @param oldDbName old database name
- * @param newDbName new database name
- * @param oldTableName old table name
- * @param newTableName new table name
- * @param oldLocation old table location
- * @param newLocation new table location
- * @param seqNum notification event ID
+ * @param event notification event to be processed.
* @throws Exception if encounters errors while persisting the path change
*/
- void processAlterTable(String oldDbName, String newDbName, String oldTableName,
- String newTableName, String oldLocation, String newLocation, long seqNum)
- throws Exception {
+ private boolean processAlterTable(NotificationEvent event) throws Exception {
+ SentryJSONAlterTableMessage alterTableMessage =
+ deserializer.getAlterTableMessage(event.getMessage());
+ String oldDbName = alterTableMessage.getDB();
+ String oldTableName = alterTableMessage.getTable();
+ String newDbName = event.getDbName();
+ String newTableName = event.getTableName();
+ String oldLocation = alterTableMessage.getOldLocation();
+ String newLocation = alterTableMessage.getNewLocation();
+
+ if ((oldDbName == null)
+ || (oldTableName == null)
+ || (newDbName == null)
+ || (newTableName == null)
+ || (oldLocation == null)
+ || (newLocation == null)) {
+ LOGGER.error(String.format("Alter table event "
+ + "has incomplete information. oldDbName = %s, oldTableName = %s, oldLocation = %s, "
+ + "newDbName = %s, newTableName = %s, newLocation = %s",
+ StringUtils.defaultIfBlank(oldDbName, "null"),
+ StringUtils.defaultIfBlank(oldTableName, "null"),
+ StringUtils.defaultIfBlank(oldLocation, "null"),
+ StringUtils.defaultIfBlank(newDbName, "null"),
+ StringUtils.defaultIfBlank(newTableName, "null"),
+ StringUtils.defaultIfBlank(newLocation, "null")));
+ return false;
+ }
+
+ if ((oldDbName.equals(newDbName))
+ && (oldTableName.equals(newTableName))
+ && (oldLocation.equals(newLocation))) {
+ LOGGER.error(String.format("Alter table notification ignored as neither name nor "
+ + "location has changed: oldAuthzObj = %s, oldLocation = %s, newAuthzObj = %s, "
+ + "newLocation = %s", oldDbName + "." + oldTableName, oldLocation,
+ newDbName + "." + newTableName, newLocation));
+ return false;
+ }
+
+ if (!newDbName.equalsIgnoreCase(oldDbName) || !oldTableName.equalsIgnoreCase(newTableName)) {
+ // Name has changed
+ try {
+ renamePrivileges(oldDbName, oldTableName, newDbName, newTableName);
+ } catch (SentryNoSuchObjectException e) {
+ LOGGER.info("Rename Sentry privilege ignored as there are no privileges on the table:"
+ + " {}.{}", oldDbName, oldTableName);
+ } catch (Exception e) {
+ LOGGER.info("Could not process Alter table event. Event: {}", event.toString(), e);
+ return false;
+ }
+ }
String oldAuthzObj = oldDbName + "." + oldTableName;
String newAuthzObj = newDbName + "." + newTableName;
- renameAuthzPath(oldAuthzObj, newAuthzObj, oldLocation, newLocation, seqNum);
+ renameAuthzPath(oldAuthzObj, newAuthzObj, oldLocation, newLocation, event.getEventId());
+ return true;
}
/**
* Processes "add partition" notification event, and applies its corresponding
* snapshot change as well as delta path update into Sentry DB.
*
- * @param dbName database name
- * @param tableName table name
- * @param locations partition locations
- * @param seqNum notification event ID
+ * @param event notification event to be processed.
* @throws Exception if encounters errors while persisting the path change
*/
- void processAddPartition(String dbName, String tableName,
- Collection<String> locations, long seqNum)
- throws Exception {
- String authzObj = dbName + "." + tableName;
- addPaths(authzObj, locations, seqNum);
+ private boolean processAddPartition(NotificationEvent event)
+ throws Exception {
+ SentryJSONAddPartitionMessage addPartitionMessage =
+ deserializer.getAddPartitionMessage(event.getMessage());
+ String dbName = addPartitionMessage.getDB();
+ String tableName = addPartitionMessage.getTable();
+ List<String> locations = addPartitionMessage.getLocations();
+ if ((dbName == null) || (tableName == null) || (locations == null)) {
+ LOGGER.error(String.format("Create table event has incomplete information. "
+ + "dbName = %s, tableName = %s, locations = %s",
+ StringUtils.defaultIfBlank(dbName, "null"),
+ StringUtils.defaultIfBlank(tableName, "null"),
+ locations != null ? locations.toString() : "null"));
+ return false;
+ }
+ String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName);
+ addPaths(authzObj, locations, event.getEventId());
+ return true;
}
/**
* Processes "drop partition" notification event, and applies its corresponding
* snapshot change as well as delta path update into Sentry DB.
*
- * @param dbName database name
- * @param tableName table name
- * @param locations partition locations
- * @param seqNum notification event ID
+ * @param event notification event to be processed.
* @throws Exception if encounters errors while persisting the path change
*/
- void processDropPartition(String dbName, String tableName,
- Collection<String> locations, long seqNum)
- throws Exception {
- String authzObj = dbName + "." + tableName;
- removePaths(authzObj, locations, seqNum);
+ private boolean processDropPartition(NotificationEvent event)
+ throws Exception {
+ SentryJSONDropPartitionMessage dropPartitionMessage =
+ deserializer.getDropPartitionMessage(event.getMessage());
+ String dbName = dropPartitionMessage.getDB();
+ String tableName = dropPartitionMessage.getTable();
+ List<String> locations = dropPartitionMessage.getLocations();
+ if ((dbName == null) || (tableName == null) || (locations == null)) {
+ LOGGER.error(String.format("Drop partition event "
+ + "has incomplete information. dbName = %s, tableName = %s, location = %s",
+ StringUtils.defaultIfBlank(dbName, "null"),
+ StringUtils.defaultIfBlank(tableName, "null"),
+ locations != null ? locations.toString() : "null"));
+ return false;
+ }
+ String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName);
+ removePaths(authzObj, locations, event.getEventId());
+ return true;
}
/**
* Processes "alter partition" notification event, and applies its corresponding
* snapshot change as well as delta path update into Sentry DB.
*
- * @param dbName database name
- * @param tableName table name
- * @param oldLocation old partition location
- * @param newLocation new partition location
- * @param seqNum notification event ID
+ * @param event notification event to be processed.
* @throws Exception if encounters errors while persisting the path change
*/
- void processAlterPartition(String dbName, String tableName, String oldLocation,
- String newLocation, long seqNum) throws Exception {
+ private boolean processAlterPartition(NotificationEvent event) throws Exception {
+ SentryJSONAlterPartitionMessage alterPartitionMessage =
+ deserializer.getAlterPartitionMessage(event.getMessage());
+ String dbName = alterPartitionMessage.getDB();
+ String tableName = alterPartitionMessage.getTable();
+ String oldLocation = alterPartitionMessage.getOldLocation();
+ String newLocation = alterPartitionMessage.getNewLocation();
+
+ if ((dbName == null)
+ || (tableName == null)
+ || (oldLocation == null)
+ || (newLocation == null)) {
+ LOGGER.error(String.format("Alter partition event "
+ + "has incomplete information. dbName = %s, tableName = %s, "
+ + "oldLocation = %s, newLocation = %s",
+ StringUtils.defaultIfBlank(dbName, "null"),
+ StringUtils.defaultIfBlank(tableName, "null"),
+ StringUtils.defaultIfBlank(oldLocation, "null"),
+ StringUtils.defaultIfBlank(newLocation, "null")));
+ return false;
+ }
+
+ if (oldLocation.equals(newLocation)) {
+ LOGGER.info(String.format("Alter partition notification ignored as"
+ + "location has not changed: AuthzObj = %s, Location = %s", dbName + "."
+ + "." + tableName, oldLocation));
+ return false;
+ }
+
String oldAuthzObj = dbName + "." + tableName;
- renameAuthzPath(oldAuthzObj, oldAuthzObj, oldLocation, newLocation, seqNum);
+ renameAuthzPath(oldAuthzObj, oldAuthzObj, oldLocation, newLocation, event.getEventId());
+ return true;
}
/**
@@ -187,10 +479,9 @@ class NotificationProcessor {
* @param authzObj the given authzObj
* @param locations a set of paths need to be added
* @param seqNum notification event ID
- * @throws Exception
*/
private void addPaths(String authzObj, Collection<String> locations, long seqNum)
- throws Exception {
+ throws Exception {
// AuthzObj is case insensitive
authzObj = authzObj.toLowerCase();
@@ -201,13 +492,13 @@ class NotificationProcessor {
for (String location : locations) {
String pathTree = getPath(location);
if (pathTree == null) {
- LOGGER.debug("#### HMS Path Update ["
+ LOGGER.debug("HMS Path Update ["
+ "OP : addPath, "
+ "authzObj : " + authzObj + ", "
+ "path : " + location + "] - nothing to add" + ", "
+ "notification event ID: " + seqNum + "]");
} else {
- LOGGER.debug("#### HMS Path Update ["
+ LOGGER.debug("HMS Path Update ["
+ "OP : addPath, " + "authzObj : "
+ authzObj + ", "
+ "path : " + location + ", "
@@ -226,10 +517,9 @@ class NotificationProcessor {
* @param authzObj the given authzObj
* @param locations a set of paths need to be removed
* @param seqNum notification event ID
- * @throws Exception
*/
private void removePaths(String authzObj, Collection<String> locations, long seqNum)
- throws Exception {
+ throws Exception {
// AuthzObj is case insensitive
authzObj = authzObj.toLowerCase();
@@ -238,13 +528,13 @@ class NotificationProcessor {
for (String location : locations) {
String pathTree = getPath(location);
if (pathTree == null) {
- LOGGER.debug("#### HMS Path Update ["
+ LOGGER.debug("HMS Path Update ["
+ "OP : removePath, "
+ "authzObj : " + authzObj + ", "
+ "path : " + location + "] - nothing to remove" + ", "
+ "notification event ID: " + seqNum + "]");
} else {
- LOGGER.debug("#### HMS Path Update ["
+ LOGGER.debug("HMS Path Update ["
+ "OP : removePath, "
+ "authzObj : " + authzObj + ", "
+ "path : " + location + ", "
@@ -263,14 +553,13 @@ class NotificationProcessor {
*
* @param authzObj the given authzObj to be deleted
* @param seqNum notification event ID
- * @throws Exception
*/
private void removeAllPaths(String authzObj, long seqNum)
- throws Exception {
+ throws Exception {
// AuthzObj is case insensitive
authzObj = authzObj.toLowerCase();
- LOGGER.debug("#### HMS Path Update ["
+ LOGGER.debug("HMS Path Update ["
+ "OP : removeAllPaths, "
+ "authzObj : " + authzObj + ", "
+ "notification event ID: " + seqNum + "]");
@@ -289,21 +578,19 @@ class NotificationProcessor {
* @param newAuthzObj the new name to be changed to
* @param oldLocation a existing path of the given authzObj
* @param newLocation a new path to be changed to
- * @param seqNum
- * @throws Exception
*/
private void renameAuthzPath(String oldAuthzObj, String newAuthzObj, String oldLocation,
- String newLocation, long seqNum) throws Exception {
+ String newLocation, long seqNum) throws Exception {
// AuthzObj is case insensitive
oldAuthzObj = oldAuthzObj.toLowerCase();
newAuthzObj = newAuthzObj.toLowerCase();
String oldPathTree = getPath(oldLocation);
String newPathTree = getPath(newLocation);
- LOGGER.debug("#### HMS Path Update ["
+ LOGGER.debug("HMS Path Update ["
+ "OP : renameAuthzObject, "
+ "oldAuthzObj : " + oldAuthzObj + ", "
- + "newAuthzObj : " + newAuthzObj + ", "
+ + "newAuthzObj : " + newAuthzObj + ", "
+ "oldLocation : " + oldLocation + ", "
+ "newLocation : " + newLocation + ", "
+ "notification event ID: " + seqNum + "]");
@@ -323,20 +610,10 @@ class NotificationProcessor {
// Both name and location has changed
// - Alter table rename for managed table
sentryStore.renameAuthzPathsMapping(oldAuthzObj, newAuthzObj, oldPathTree,
- newPathTree, update);
+ newPathTree, update);
}
- } else if (oldPathTree != null) {
- PathsUpdate update = new PathsUpdate(seqNum, false);
- update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree));
- sentryStore.deleteAuthzPathsMapping(oldAuthzObj,
- Collections.singleton(oldPathTree),
- update);
- } else if (newPathTree != null) {
- PathsUpdate update = new PathsUpdate(seqNum, false);
- update.newPathChange(newAuthzObj).addToAddPaths(splitPath(newPathTree));
- sentryStore.addAuthzPathsMapping(newAuthzObj,
- Collections.singleton(newPathTree),
- update);
+ } else {
+ updateAuthzPathsMapping(oldAuthzObj, oldPathTree, newAuthzObj, newPathTree, seqNum);
}
} else if (!oldLocation.equals(newLocation)) {
// Only Location has changed, e.g. Alter table set location
@@ -346,27 +623,35 @@ class NotificationProcessor {
update.newPathChange(oldAuthzObj).addToAddPaths(splitPath(newPathTree));
sentryStore.updateAuthzPathsMapping(oldAuthzObj, oldPathTree,
newPathTree, update);
- } else if (oldPathTree != null) {
- PathsUpdate update = new PathsUpdate(seqNum, false);
- update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree));
- sentryStore.deleteAuthzPathsMapping(oldAuthzObj,
- Collections.singleton(oldPathTree),
- update);
- } else if (newPathTree != null) {
- PathsUpdate update = new PathsUpdate(seqNum, false);
- update.newPathChange(oldAuthzObj).addToAddPaths(splitPath(newPathTree));
- sentryStore.addAuthzPathsMapping(oldAuthzObj,
- Collections.singleton(newPathTree),
- update);
+ } else {
+ updateAuthzPathsMapping(oldAuthzObj, oldPathTree, newAuthzObj, newPathTree, seqNum);
}
} else {
LOGGER.error("Update Notification for Auhorizable object {}, with no change, skipping",
- oldAuthzObj);
- throw new SentryInvalidHMSEventException("Update Notification for Authorizable object" +
- "with no change");
+ oldAuthzObj);
+ throw new SentryInvalidHMSEventException("Update Notification for Authorizable object"
+ + "with no change");
}
}
+ private void updateAuthzPathsMapping(String oldAuthzObj, String oldPathTree,
+ String newAuthzObj, String newPathTree, long seqNum) throws Exception {
+ if (oldPathTree != null) {
+ PathsUpdate update = new PathsUpdate(seqNum, false);
+ update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree));
+ sentryStore.deleteAuthzPathsMapping(oldAuthzObj,
+ Collections.singleton(oldPathTree),
+ update);
+ } else if (newPathTree != null) {
+ PathsUpdate update = new PathsUpdate(seqNum, false);
+ update.newPathChange(newAuthzObj).addToAddPaths(splitPath(newPathTree));
+ sentryStore.addAuthzPathsMapping(newAuthzObj,
+ Collections.singleton(newPathTree),
+ update);
+ }
+
+ }
+
/**
* Get path tree from a given path. It return null if encounters
* SentryMalformedPathException which indicates a malformed path.
@@ -383,15 +668,45 @@ class NotificationProcessor {
return null;
}
- /**
- * Split path into components on the "/" character.
- * The path should not start with "/".
- * This is consumed by Thrift interface, so the return result should be
- * {@code List<String>}
- * @param path input oath e.g. {@code foo/bar}
- * @return list of commponents, e.g. [foo, bar]
- */
- private List<String> splitPath(String path) {
- return (Lists.newArrayList(path.split("/")));
+ private void dropSentryDbPrivileges(String dbName, NotificationEvent event) {
+ try {
+ TSentryAuthorizable authorizable = new TSentryAuthorizable(authServerName);
+ authorizable.setDb(dbName);
+ sentryStore.dropPrivilege(authorizable, getPermUpdatableOnDrop(authorizable));
+ } catch (SentryNoSuchObjectException e) {
+ LOGGER.info("Drop Sentry privilege ignored as there are no privileges on the database: {}",
+ dbName);
+ } catch (Exception e) {
+ LOGGER.error("Could not process Drop database event." + "Event: " + event.toString(), e);
+ }
+ }
+
+ private void dropSentryTablePrivileges(String dbName, String tableName,
+ NotificationEvent event) {
+ try {
+ TSentryAuthorizable authorizable = new TSentryAuthorizable(authServerName);
+ authorizable.setDb(dbName);
+ authorizable.setTable(tableName);
+ sentryStore.dropPrivilege(authorizable, getPermUpdatableOnDrop(authorizable));
+ } catch (SentryNoSuchObjectException e) {
+ LOGGER.info("Drop Sentry privilege ignored as there are no privileges on the table: {}.{}",
+ dbName, tableName);
+ } catch (Exception e) {
+ LOGGER.error("Could not process Drop table event. Event: " + event.toString(), e);
+ }
+ }
+
+ private void renamePrivileges(String oldDbName, String oldTableName, String newDbName,
+ String newTableName) throws
+ Exception {
+ TSentryAuthorizable oldAuthorizable = new TSentryAuthorizable(authServerName);
+ oldAuthorizable.setDb(oldDbName);
+ oldAuthorizable.setTable(oldTableName);
+ TSentryAuthorizable newAuthorizable = new TSentryAuthorizable(authServerName);
+ newAuthorizable.setDb(newDbName);
+ newAuthorizable.setTable(newTableName);
+ Update update =
+ getPermUpdatableOnRename(oldAuthorizable, newAuthorizable);
+ sentryStore.renamePrivilege(oldAuthorizable, newAuthorizable, update);
}
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/4cdb506c/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java
new file mode 100644
index 0000000..05518e8
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java
@@ -0,0 +1,244 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ <p>
+ http://www.apache.org/licenses/LICENSE-2.0
+ <p>
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import static com.codahale.metrics.MetricRegistry.name;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Timer;
+import com.codahale.metrics.Timer.Context;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.io.IOException;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.sentry.provider.db.service.persistent.PathsImage;
+import org.apache.sentry.provider.db.service.persistent.SentryStore;
+import org.apache.sentry.provider.db.service.thrift.SentryMetrics;
+
+import org.apache.thrift.TException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Wrapper class for <Code>HiveMetaStoreClient</Code>
+ *
+ * <p>Abstracts communication with HMS and exposes APi's to connect/disconnect to HMS and to
+ * request HMS snapshots and also for new notifications.
+ */
+final class SentryHMSClient implements AutoCloseable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SentryHMSClient.class);
+ private final Configuration conf;
+ private HiveMetaStoreClient client = null;
+ private HiveConnectionFactory hiveConnectionFactory;
+
+ private static final String SNAPSHOT = "snapshot";
+ /** Measures time to get full snapshot. */
+ private final Timer updateTimer = SentryMetrics.getInstance()
+ .getTimer(name(FullUpdateInitializer.class, SNAPSHOT));
+ /** Number of times update failed. */
+ private final Counter failedSnapshotsCount = SentryMetrics.getInstance()
+ .getCounter(name(FullUpdateInitializer.class, "failed"));
+
+ SentryHMSClient(Configuration conf, HiveConnectionFactory hiveConnectionFactory) {
+ this.conf = conf;
+ this.hiveConnectionFactory = hiveConnectionFactory;
+ }
+
+ /**
+ * Used only for testing purposes.
+ *x
+ * @param client HiveMetaStoreClient to be initialized
+ */
+ @VisibleForTesting
+ void setClient(HiveMetaStoreClient client) {
+ this.client = client;
+ }
+
+ /**
+ * Used to know if the client is connected to HMS
+ *
+ * @return true if the client is connected to HMS false, if client is not connected.
+ */
+ boolean isConnected() {
+ return client != null;
+ }
+
+ /**
+ * Connects to HMS by creating HiveMetaStoreClient.
+ *
+ * @throws IOException if could not establish connection
+ * @throws InterruptedException if connection was interrupted
+ * @throws MetaException if other errors happened
+ */
+ void connect()
+ throws IOException, InterruptedException, MetaException {
+ if (client != null) {
+ return;
+ }
+ client = hiveConnectionFactory.connect().getClient();
+ }
+
+ /**
+ * Disconnects the HMS client.
+ */
+ public void disconnect() throws Exception {
+ try {
+ if (client != null) {
+ LOGGER.info("Closing the HMS client connection");
+ client.close();
+ }
+ } catch (Exception e) {
+ LOGGER.error("failed to close Hive Connection Factory", e);
+ } finally {
+ client = null;
+ }
+ }
+
+ /**
+ * Closes the HMS client.
+ *
+ * <p>This is similar to disconnect. As this class implements AutoClosable, close should be
+ * implemented.
+ */
+ public void close() throws Exception {
+ disconnect();
+ }
+
+ /**
+ * Creates HMS full snapshot.
+ *
+ * @return Full path snapshot and the last notification id on success
+ */
+ PathsImage getFullSnapshot() {
+ try {
+ if (client == null) {
+ LOGGER.error("Client is not connected to HMS");
+ return new PathsImage(Collections.<String, Set<String>>emptyMap(),
+ SentryStore.EMPTY_NOTIFICATION_ID, SentryStore.EMPTY_PATHS_SNAPSHOT_ID);
+ }
+
+ CurrentNotificationEventId eventIdBefore = client.getCurrentNotificationEventId();
+ Map<String, Set<String>> pathsFullSnapshot = fetchFullUpdate();
+ if (pathsFullSnapshot.isEmpty()) {
+ return new PathsImage(pathsFullSnapshot, SentryStore.EMPTY_NOTIFICATION_ID,
+ SentryStore.EMPTY_PATHS_SNAPSHOT_ID);
+ }
+
+ CurrentNotificationEventId eventIdAfter = client.getCurrentNotificationEventId();
+ LOGGER.info("NotificationID, Before Snapshot: {}, After Snapshot {}",
+ eventIdBefore.getEventId(), eventIdAfter.getEventId());
+ // To ensure point-in-time snapshot consistency, need to make sure
+ // there were no HMS updates while retrieving the snapshot. If there are updates, snapshot
+ // is discarded. New attempt will be made after 500 milliseconds when
+ // HMSFollower runs again.
+ if (!eventIdBefore.equals(eventIdAfter)) {
+ LOGGER.error("Snapshot discarded, updates to HMS data while shapshot is being taken."
+ + "ID Before: {}. ID After: {}", eventIdBefore.getEventId(), eventIdAfter.getEventId());
+ return new PathsImage(Collections.<String, Set<String>>emptyMap(),
+ SentryStore.EMPTY_NOTIFICATION_ID, SentryStore.EMPTY_PATHS_SNAPSHOT_ID);
+ }
+
+ LOGGER.info("Successfully fetched hive full snapshot, Current NotificationID: {}.",
+ eventIdAfter);
+ // As eventIDAfter is the last event that was processed, eventIDAfter is used to update
+ // lastProcessedNotificationID instead of getting it from persistent store.
+ return new PathsImage(pathsFullSnapshot, eventIdAfter.getEventId(),
+ SentryStore.EMPTY_PATHS_SNAPSHOT_ID);
+ } catch (TException failure) {
+ LOGGER.error("Failed to communicate to HMS");
+ return new PathsImage(Collections.<String, Set<String>>emptyMap(),
+ SentryStore.EMPTY_NOTIFICATION_ID, SentryStore.EMPTY_PATHS_SNAPSHOT_ID);
+ }
+ }
+
+ /**
+ * Retrieve a Hive full snapshot from HMS.
+ *
+ * @return HMS snapshot. Snapshot consists of a mapping from auth object name to the set of paths
+ * corresponding to that name.
+ */
+ private Map<String, Set<String>> fetchFullUpdate() {
+ LOGGER.info("Request full HMS snapshot");
+ try (FullUpdateInitializer updateInitializer =
+ new FullUpdateInitializer(hiveConnectionFactory, conf);
+ Context context = updateTimer.time()) {
+ Map<String, Set<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot();
+ LOGGER.info("Obtained full HMS snapshot");
+ return pathsUpdate;
+ } catch (Exception ignored) {
+ failedSnapshotsCount.inc();
+ LOGGER.error("Snapshot created failed ", ignored);
+ return Collections.emptyMap();
+ }
+ }
+
+ /**
+ * Returns all HMS notifications with ID greater than the specified one
+ *
+ * @param notificationId ID of the last notification that was processed.
+ * @return Collection of new events to be synced
+ */
+ Collection<NotificationEvent> getNotifications(long notificationId) throws Exception {
+ if (client == null) {
+ LOGGER.error("Client is not connected to HMS");
+ return Collections.emptyList();
+ }
+ LOGGER.debug("Checking for notifications beyond {}", notificationId);
+ // HIVE-15761: Currently getNextNotification API may return an empty
+ // NotificationEventResponse causing TProtocolException.
+ // Workaround: Only processes the notification events newer than the last updated one.
+ CurrentNotificationEventId eventId = client.getCurrentNotificationEventId();
+ LOGGER.debug("ID of Last HMS notifications is: {}", eventId.getEventId());
+ if (eventId.getEventId() < notificationId) {
+ LOGGER.error("Last notification of HMS is smaller than what sentry processed, Something is"
+ + "wrong. Sentry will request a full Snapshot");
+ // TODO Path Mapping info should be cleared so that HMSFollower would request for full
+ // snapshot in the subsequent run.
+ return Collections.emptyList();
+ }
+
+ if (eventId.getEventId() == notificationId) {
+ return Collections.emptyList();
+ }
+
+ NotificationEventResponse response =
+ client.getNextNotification(notificationId, Integer.MAX_VALUE, null);
+ if (response.isSetEvents()) {
+ LOGGER.debug("Last Id processed:{}. Received collection of notifications, Size:{}",
+ notificationId, response.getEvents().size());
+ return response.getEvents();
+ }
+
+ return Collections.emptyList();
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/4cdb506c/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
index 6014a79..10d55dc 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
@@ -296,7 +296,6 @@ public class SentryService implements Callable, SigUtils.SigListener {
hiveConnectionFactory = new HiveSimpleConnectionFactory(conf, new HiveConf());
hiveConnectionFactory.init();
hmsFollower = new HMSFollower(conf, sentryStore, leaderMonitor, hiveConnectionFactory);
-
long initDelay = conf.getLong(ServerConfig.SENTRY_HMSFOLLOWER_INIT_DELAY_MILLS,
ServerConfig.SENTRY_HMSFOLLOWER_INIT_DELAY_MILLS_DEFAULT);
long period = conf.getLong(ServerConfig.SENTRY_HMSFOLLOWER_INTERVAL_MILLS,
@@ -350,7 +349,7 @@ public class SentryService implements Callable, SigUtils.SigListener {
try {
// close connections
hmsFollower.close();
- } catch (RuntimeException ex) {
+ } catch (Exception ex) {
LOGGER.error("HMSFollower.close() failed", ex);
} finally {
hmsFollower = null;
http://git-wip-us.apache.org/repos/asf/sentry/blob/4cdb506c/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceUtil.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceUtil.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceUtil.java
index 9c3e485..5826766 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceUtil.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceUtil.java
@@ -30,9 +30,12 @@ import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.sentry.core.common.exception.SentryInvalidInputException;
import org.apache.sentry.core.common.utils.SentryConstants;
import org.apache.sentry.core.common.utils.KeyValue;
import org.apache.sentry.core.common.utils.PolicyFileConstants;
+import org.apache.sentry.provider.db.service.persistent.SentryStore;
+import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable;
import org.apache.sentry.provider.db.service.thrift.TSentryGrantOption;
import org.apache.sentry.provider.db.service.thrift.TSentryPrivilege;
import org.apache.sentry.service.thrift.ServiceConstants.PrivilegeScope;
@@ -219,8 +222,38 @@ public final class SentryServiceUtil {
return hiveConf.get(METASTOREURIS.varname);
}
+ /**
+ * Derives object name from database and table names by concatenating them
+ *
+ * @param authorizable for which is name is to be derived
+ * @return authorizable name
+ * @throws SentryInvalidInputException if argument provided does not have all the
+ * required fields set.
+ */
+ public static String getAuthzObj(TSentryAuthorizable authorizable)
+ throws SentryInvalidInputException {
+ return getAuthzObj(authorizable.getDb(), authorizable.getTable());
+ }
+
+ /**
+ * Derives object name from database and table names by concatenating them
+ *
+ * @param dbName
+ * @param tblName
+ * @return authorizable name
+ * @throws SentryInvalidInputException if argument provided does not have all the
+ * required fields set.
+ */
+ public static String getAuthzObj(String dbName, String tblName)
+ throws SentryInvalidInputException {
+ if (SentryStore.isNULL(dbName)) {
+ throw new SentryInvalidInputException("Invalif input, DB name is missing");
+ }
+ return SentryStore.isNULL(tblName) ? dbName.toLowerCase() :
+ (dbName + "." + tblName).toLowerCase();
+ }
+
private SentryServiceUtil() {
// Make constructor private to avoid instantiation
}
-
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/4cdb506c/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
index 51f6c5d..a8ebf7c 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
@@ -2697,7 +2697,6 @@ public class TestSentryStore extends org.junit.Assert {
@Test
public void testRenameUpdateAfterReplacingANewPathsImage() throws Exception {
Map<String, Set<String>> authzPaths = new HashMap<>();
-
// First image to persist (this will be replaced later)
authzPaths.put("db1.table1", Sets.newHashSet("/user/hive/warehouse/db2.db/table1.1",
"/user/hive/warehouse/db2.db/table1.2"));
http://git-wip-us.apache.org/repos/asf/sentry/blob/4cdb506c/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
index 66ad2a1..d67c162 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
@@ -16,75 +16,115 @@
*/
package org.apache.sentry.service.thrift;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hive.hcatalog.messaging.HCatEventMessage;
import org.apache.hive.hcatalog.messaging.HCatEventMessage.EventType;
import org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageFactory;
import org.apache.sentry.hdfs.Updateable;
import org.apache.sentry.provider.db.service.persistent.SentryStore;
import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable;
-
-import java.util.Arrays;
-
-import org.junit.Test;
+import org.junit.BeforeClass;
import org.junit.Ignore;
+import org.junit.Test;
import org.mockito.Mockito;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.reset;
+import javax.security.auth.login.LoginException;
public class TestHMSFollower {
- SentryJSONMessageFactory messageFactory = new SentryJSONMessageFactory();
- SentryStore sentryStore = Mockito.mock(SentryStore.class);
- final static String hiveInstance = "server2";
+ private final static String hiveInstance = "server2";
+ private final static Configuration configuration = new Configuration();
+ private final SentryJSONMessageFactory messageFactory = new SentryJSONMessageFactory();
+ private final SentryStore sentryStore = Mockito.mock(SentryStore.class);
+ private static HiveSimpleConnectionFactory hiveConnectionFactory;
+
+ @BeforeClass
+ public static void setup() throws IOException, LoginException {
+ hiveConnectionFactory = new HiveSimpleConnectionFactory(configuration, new HiveConf());
+ hiveConnectionFactory.init();
+ configuration.set("sentry.hive.sync.create", "true");
+ }
+
+ /**
+ * Constructs create database event and makes sure that appropriate sentry store API's
+ * are invoke when the event is processed by hms follower.
+ *
+ * @throws Exception
+ */
@Test
public void testCreateDatabase() throws Exception {
String dbName = "db1";
// Create notification events
- NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.CREATE_DATABASE.toString(),
- messageFactory.buildCreateDatabaseMessage(new Database(dbName, null, "hdfs:///db1", null)).toString());
+ NotificationEvent notificationEvent = new NotificationEvent(1, 0,
+ HCatEventMessage.EventType.CREATE_DATABASE.toString(),
+ messageFactory.buildCreateDatabaseMessage(new Database(dbName, null, "hdfs:///db1", null))
+ .toString());
List<NotificationEvent> events = new ArrayList<>();
events.add(notificationEvent);
-
- Configuration configuration = new Configuration();
- HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance);
- hmsFollower.processNotificationEvents(events);
+ HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
+ hiveConnectionFactory, hiveInstance);
+ hmsFollower.processNotifications(events);
TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
authorizable.setServer(hiveInstance);
authorizable.setDb("db1");
- verify(sentryStore, times(0)).dropPrivilege(authorizable, HMSFollower.onDropSentryPrivilege(authorizable));
+ verify(sentryStore, times(1))
+ .dropPrivilege(authorizable, NotificationProcessor.getPermUpdatableOnDrop(authorizable));
}
+ /**
+ * Constructs drop database event and makes sure that appropriate sentry store API's
+ * are invoke when the event is processed by hms follower.
+ *
+ * @throws Exception
+ */
@Test
public void testDropDatabase() throws Exception {
String dbName = "db1";
// Create notification events
- NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.DROP_DATABASE.toString(),
- messageFactory.buildDropDatabaseMessage(new Database(dbName, null, "hdfs:///db1", null)).toString());
+ NotificationEvent notificationEvent = new NotificationEvent(1, 0,
+ HCatEventMessage.EventType.DROP_DATABASE.toString(),
+ messageFactory.buildDropDatabaseMessage(new Database(dbName, null, "hdfs:///db1", null))
+ .toString());
List<NotificationEvent> events = new ArrayList<>();
events.add(notificationEvent);
- Configuration configuration = new Configuration();
- HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance);
- hmsFollower.processNotificationEvents(events);
+ HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
+ hiveConnectionFactory, hiveInstance);
+ hmsFollower.processNotifications(events);
TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
authorizable.setServer(hiveInstance);
authorizable.setDb("db1");
- verify(sentryStore, times(1)).dropPrivilege(authorizable, HMSFollower.onDropSentryPrivilege(authorizable));
+ verify(sentryStore, times(1))
+ .dropPrivilege(authorizable, NotificationProcessor.getPermUpdatableOnDrop(authorizable));
}
+ /**
+ * Constructs create table event and makes sure that appropriate sentry store API's
+ * are invoke when the event is processed by hms follower.
+ *
+ * @throws Exception
+ */
@Test
public void testCreateTable() throws Exception {
String dbName = "db1";
@@ -93,23 +133,33 @@ public class TestHMSFollower {
// Create notification events
StorageDescriptor sd = new StorageDescriptor();
sd.setLocation("hdfs:///db1.db/table1");
- NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.CREATE_TABLE.toString(),
- messageFactory.buildCreateTableMessage(new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null)).toString());
+ NotificationEvent notificationEvent = new NotificationEvent(1, 0,
+ HCatEventMessage.EventType.CREATE_TABLE.toString(),
+ messageFactory.buildCreateTableMessage(
+ new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null))
+ .toString());
List<NotificationEvent> events = new ArrayList<>();
events.add(notificationEvent);
- Configuration configuration = new Configuration();
- HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance);
- hmsFollower.processNotificationEvents(events);
+ HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
+ hiveConnectionFactory, hiveInstance);
+ hmsFollower.processNotifications(events);
TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
authorizable.setServer(hiveInstance);
authorizable.setDb("db1");
authorizable.setTable(tableName);
- verify(sentryStore, times(0)).dropPrivilege(authorizable, HMSFollower.onDropSentryPrivilege(authorizable));
+ verify(sentryStore, times(1))
+ .dropPrivilege(authorizable, NotificationProcessor.getPermUpdatableOnDrop(authorizable));
}
+ /**
+ * Constructs drop table event and makes sure that appropriate sentry store API's
+ * are invoke when the event is processed by hms follower.
+ *
+ * @throws Exception
+ */
@Test
public void testDropTable() throws Exception {
String dbName = "db1";
@@ -118,23 +168,33 @@ public class TestHMSFollower {
// Create notification events
StorageDescriptor sd = new StorageDescriptor();
sd.setLocation("hdfs:///db1.db/table1");
- NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.DROP_TABLE.toString(),
- messageFactory.buildDropTableMessage(new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null)).toString());
+ NotificationEvent notificationEvent = new NotificationEvent(1, 0,
+ HCatEventMessage.EventType.DROP_TABLE.toString(),
+ messageFactory.buildDropTableMessage(
+ new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null))
+ .toString());
List<NotificationEvent> events = new ArrayList<>();
events.add(notificationEvent);
- Configuration configuration = new Configuration();
- HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance);
- hmsFollower.processNotificationEvents(events);
+ HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
+ hiveConnectionFactory, hiveInstance);
+ hmsFollower.processNotifications(events);
TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
authorizable.setServer(hiveInstance);
authorizable.setDb("db1");
authorizable.setTable(tableName);
- verify(sentryStore, times(1)).dropPrivilege(authorizable, HMSFollower.onDropSentryPrivilege(authorizable));
+ verify(sentryStore, times(1))
+ .dropPrivilege(authorizable, NotificationProcessor.getPermUpdatableOnDrop(authorizable));
}
+ /**
+ * Constructs rename table event and makes sure that appropriate sentry store API's
+ * are invoke when the event is processed by hms follower.
+ *
+ * @throws Exception
+ */
@Test
public void testRenameTable() throws Exception {
String dbName = "db1";
@@ -146,18 +206,20 @@ public class TestHMSFollower {
// Create notification events
StorageDescriptor sd = new StorageDescriptor();
sd.setLocation("hdfs:///db1.db/table1");
- NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.ALTER_TABLE.toString(),
- messageFactory.buildAlterTableMessage(
- new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null),
- new Table(newTableName, newDbName, null, 0, 0, 0, sd, null, null, null, null, null)).toString());
+ NotificationEvent notificationEvent = new NotificationEvent(1, 0,
+ HCatEventMessage.EventType.ALTER_TABLE.toString(),
+ messageFactory.buildAlterTableMessage(
+ new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null),
+ new Table(newTableName, newDbName, null, 0, 0, 0, sd, null, null, null, null, null))
+ .toString());
notificationEvent.setDbName(newDbName);
notificationEvent.setTableName(newTableName);
List<NotificationEvent> events = new ArrayList<>();
events.add(notificationEvent);
- Configuration configuration = new Configuration();
- HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance);
- hmsFollower.processNotificationEvents(events);
+ HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
+ hiveConnectionFactory, hiveInstance);
+ hmsFollower.processNotifications(events);
TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
authorizable.setServer(hiveInstance);
@@ -169,11 +231,20 @@ public class TestHMSFollower {
newAuthorizable.setDb(newDbName);
newAuthorizable.setTable(newTableName);
- verify(sentryStore, times(1)).renamePrivilege(authorizable, newAuthorizable, HMSFollower.onRenameSentryPrivilege(authorizable, newAuthorizable));
+ verify(sentryStore, times(1)).renamePrivilege(authorizable, newAuthorizable,
+ NotificationProcessor.getPermUpdatableOnRename(authorizable, newAuthorizable));
}
@Ignore
+ /**
+ * Constructs a bunch of events and passed to processor of hms follower. One of those is alter
+ * partition event with out actually changing anything(invalid event). Idea is to make sure that
+ * hms follower calls appropriate sentry store API's for the events processed by hms follower
+ * after processing the invalid alter partition event.
+ *
+ * @throws Exception
+ */
@Test
public void testAlterPartitionWithInvalidEvent() throws Exception {
String dbName = "db1";
@@ -181,35 +252,38 @@ public class TestHMSFollower {
String tableName2 = "table2";
long inputEventId = 1;
List<NotificationEvent> events = new ArrayList<>();
- NotificationEvent notificationEvent = null;
+ NotificationEvent notificationEvent;
List<FieldSchema> partCols;
- StorageDescriptor sd = null;
+ StorageDescriptor sd;
Mockito.doNothing().when(sentryStore).persistLastProcessedNotificationID(Mockito.anyLong());
+ //noinspection unchecked
Mockito.doNothing().when(sentryStore).addAuthzPathsMapping(Mockito.anyString(),
- Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
+ Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
Configuration configuration = new Configuration();
- HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance);
-
+ HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
+ hiveConnectionFactory, hiveInstance);
// Create a table
sd = new StorageDescriptor();
sd.setLocation("hdfs://db1.db/table1");
- partCols = new ArrayList<FieldSchema>();
+ partCols = new ArrayList<>();
partCols.add(new FieldSchema("ds", "string", ""));
- Table table = new Table(tableName1, dbName, null, 0, 0, 0, sd, partCols, null, null, null, null);
+ Table table = new Table(tableName1, dbName, null, 0, 0, 0, sd, partCols, null, null, null,
+ null);
notificationEvent = new NotificationEvent(inputEventId, 0,
- HCatEventMessage.EventType.CREATE_TABLE.toString(),
- messageFactory.buildCreateTableMessage(table).toString());
+ HCatEventMessage.EventType.CREATE_TABLE.toString(),
+ messageFactory.buildCreateTableMessage(table).toString());
notificationEvent.setDbName(dbName);
notificationEvent.setTableName(tableName1);
events.add(notificationEvent);
inputEventId += 1;
// Process the notification
- hmsFollower.processNotificationEvents(events);
+ hmsFollower.processNotifications(events);
// Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification
// and persistLastProcessedNotificationID was not invoked.
+ //noinspection unchecked
verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(),
- Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
+ Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
reset(sentryStore);
events.clear();
@@ -218,21 +292,22 @@ public class TestHMSFollower {
List<Partition> partitions = new ArrayList<>();
StorageDescriptor invalidSd = new StorageDescriptor();
invalidSd.setLocation(null);
- Partition partition = new Partition(Arrays.asList("today"), dbName, tableName1,
- 0, 0, sd, null);
+ Partition partition = new Partition(Collections.singletonList("today"), dbName, tableName1,
+ 0, 0, sd, null);
partitions.add(partition);
notificationEvent = new NotificationEvent(inputEventId, 0, EventType.ADD_PARTITION.toString(),
- messageFactory.buildAddPartitionMessage(table, partitions).toString());
+ messageFactory.buildAddPartitionMessage(table, partitions).toString());
notificationEvent.setDbName(dbName);
notificationEvent.setTableName(tableName1);
events.add(notificationEvent);
inputEventId += 1;
//Process the notification
- hmsFollower.processNotificationEvents(events);
+ hmsFollower.processNotifications(events);
// Make sure that addAuthzPathsMapping was invoked once to handle ADD_PARTITION notification
// and persistLastProcessedNotificationID was not invoked.
+ //noinspection unchecked
verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(),
- Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
+ Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
reset(sentryStore);
events.clear();
@@ -241,13 +316,13 @@ public class TestHMSFollower {
// This is an invalid event and should be processed by sentry store.
// Event Id should be explicitly persisted using persistLastProcessedNotificationID
notificationEvent = new NotificationEvent(inputEventId, 0, EventType.ALTER_PARTITION.toString(),
- messageFactory.buildAlterPartitionMessage(partition, partition).toString());
+ messageFactory.buildAlterPartitionMessage(partition, partition).toString());
notificationEvent.setDbName(dbName);
notificationEvent.setTableName(tableName1);
events.add(notificationEvent);
inputEventId += 1;
// Process the notification
- hmsFollower.processNotificationEvents(events);
+ hmsFollower.processNotifications(events);
// Make sure that persistLastProcessedNotificationID is invoked explicitly.
verify(sentryStore, times(1)).persistLastProcessedNotificationID(inputEventId - 1);
reset(sentryStore);
@@ -255,21 +330,21 @@ public class TestHMSFollower {
// Create a alter notification with some actual change.
sd = new StorageDescriptor();
- sd.setLocation("hdfs://user/hive/wareshouse/db1.db/table1");
+ sd.setLocation("hdfs://user/hive/warehouse/db1.db/table1");
Partition updatedPartition = new Partition(partition);
updatedPartition.setSd(sd);
notificationEvent = new NotificationEvent(inputEventId, 0, EventType.ALTER_PARTITION.toString(),
- messageFactory.buildAlterPartitionMessage(partition, updatedPartition).toString());
+ messageFactory.buildAlterPartitionMessage(partition, updatedPartition).toString());
notificationEvent.setDbName(dbName);
notificationEvent.setTableName(tableName1);
events.add(notificationEvent);
inputEventId += 1;
// Process the notification
- hmsFollower.processNotificationEvents(events);
+ hmsFollower.processNotifications(events);
// Make sure that updateAuthzPathsMapping was invoked once to handle ALTER_PARTITION
// notification and persistLastProcessedNotificationID was not invoked.
verify(sentryStore, times(1)).updateAuthzPathsMapping(Mockito.anyString(),
- Mockito.anyString(), Mockito.anyString(), Mockito.any(Updateable.Update.class));
+ Mockito.anyString(), Mockito.anyString(), Mockito.any(Updateable.Update.class));
verify(sentryStore, times(0)).persistLastProcessedNotificationID(inputEventId - 1);
reset(sentryStore);
events.clear();
@@ -277,24 +352,34 @@ public class TestHMSFollower {
// Create a table
sd = new StorageDescriptor();
sd.setLocation("hdfs://db1.db/table2");
- partCols = new ArrayList<FieldSchema>();
+ partCols = new ArrayList<>();
partCols.add(new FieldSchema("ds", "string", ""));
- Table table1 = new Table(tableName2, dbName, null, 0, 0, 0, sd, partCols, null, null, null, null);
+ Table table1 = new Table(tableName2, dbName, null, 0, 0, 0, sd, partCols, null, null, null,
+ null);
notificationEvent = new NotificationEvent(inputEventId, 0,
- HCatEventMessage.EventType.CREATE_TABLE.toString(),
- messageFactory.buildCreateTableMessage(table1).toString());
+ HCatEventMessage.EventType.CREATE_TABLE.toString(),
+ messageFactory.buildCreateTableMessage(table1).toString());
notificationEvent.setDbName(dbName);
notificationEvent.setTableName(tableName2);
events.add(notificationEvent);
// Process the notification
- hmsFollower.processNotificationEvents(events);
+ hmsFollower.processNotifications(events);
// Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification
// and persistLastProcessedNotificationID was not invoked.
+ //noinspection unchecked
verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(),
- Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
+ Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
}
+ /**
+ * Constructs a bunch of events and passed to processor of hms follower. One of those is alter
+ * table event with out actually changing anything(invalid event). Idea is to make sure that
+ * hms follower calls appropriate sentry store API's for the events processed by hms follower
+ * after processing the invalid alter table event.
+ *
+ * @throws Exception
+ */
@Test
public void testAlterTableWithInvalidEvent() throws Exception {
String dbName = "db1";
@@ -302,61 +387,67 @@ public class TestHMSFollower {
String tableName2 = "table2";
long inputEventId = 1;
List<NotificationEvent> events = new ArrayList<>();
- NotificationEvent notificationEvent = null;
+ NotificationEvent notificationEvent;
List<FieldSchema> partCols;
- StorageDescriptor sd = null;
+ StorageDescriptor sd;
Mockito.doNothing().when(sentryStore).persistLastProcessedNotificationID(Mockito.anyLong());
+ //noinspection unchecked
Mockito.doNothing().when(sentryStore).addAuthzPathsMapping(Mockito.anyString(),
- Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
+ Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
Configuration configuration = new Configuration();
- HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance);
+ HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
+ hiveConnectionFactory, hiveInstance);
// Create a table
sd = new StorageDescriptor();
sd.setLocation("hdfs://db1.db/table1");
- partCols = new ArrayList<FieldSchema>();
+ partCols = new ArrayList<>();
partCols.add(new FieldSchema("ds", "string", ""));
- Table table = new Table(tableName1, dbName, null, 0, 0, 0, sd, partCols, null, null, null, null);
+ Table table = new Table(tableName1, dbName, null, 0, 0, 0, sd, partCols, null, null, null,
+ null);
notificationEvent = new NotificationEvent(inputEventId, 0,
- HCatEventMessage.EventType.CREATE_TABLE.toString(),
- messageFactory.buildCreateTableMessage(table).toString());
+ HCatEventMessage.EventType.CREATE_TABLE.toString(),
+ messageFactory.buildCreateTableMessage(table).toString());
notificationEvent.setDbName(dbName);
notificationEvent.setTableName(tableName1);
events.add(notificationEvent);
inputEventId += 1;
// Process the notification
- hmsFollower.processNotificationEvents(events);
+ hmsFollower.processNotifications(events);
// Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification
// and persistLastProcessedNotificationID was not invoked.
+ //noinspection unchecked
verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(),
- Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
+ Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
reset(sentryStore);
events.clear();
-
- // Create alter table notification with actuall changing anything.
+ // Create alter table notification with out actually changing anything.
// This notification should not be processed by sentry server
// Notification should be persisted explicitly
- notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.ALTER_TABLE.toString(),
- messageFactory.buildAlterTableMessage(
- new Table(tableName1, dbName, null, 0, 0, 0, sd, null, null, null, null, null),
- new Table(tableName1, dbName, null, 0, 0, 0, sd, null, null, null, null, null)).toString());
+ notificationEvent = new NotificationEvent(1, 0,
+ HCatEventMessage.EventType.ALTER_TABLE.toString(),
+ messageFactory.buildAlterTableMessage(
+ new Table(tableName1, dbName, null, 0, 0, 0, sd, null, null, null, null, null),
+ new Table(tableName1, dbName, null, 0, 0, 0, sd, null, null, null, null, null))
+ .toString());
notificationEvent.setDbName(dbName);
notificationEvent.setTableName(tableName1);
events = new ArrayList<>();
events.add(notificationEvent);
inputEventId += 1;
// Process the notification
- hmsFollower.processNotificationEvents(events);
+ hmsFollower.processNotifications(events);
// Make sure that renameAuthzObj and deleteAuthzPathsMapping were not invoked
// to handle CREATE_TABLE notification
// and persistLastProcessedNotificationID is explicitly invoked
verify(sentryStore, times(0)).renameAuthzObj(Mockito.anyString(), Mockito.anyString(),
- Mockito.any(Updateable.Update.class));
+ Mockito.any(Updateable.Update.class));
+ //noinspection unchecked
verify(sentryStore, times(0)).deleteAuthzPathsMapping(Mockito.anyString(),
- Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
+ Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
verify(sentryStore, times(1)).persistLastProcessedNotificationID(Mockito.anyLong());
reset(sentryStore);
events.clear();
@@ -364,21 +455,78 @@ public class TestHMSFollower {
// Create a table
sd = new StorageDescriptor();
sd.setLocation("hdfs://db1.db/table2");
- partCols = new ArrayList<FieldSchema>();
+ partCols = new ArrayList<>();
partCols.add(new FieldSchema("ds", "string", ""));
- Table table1 = new Table(tableName2, dbName, null, 0, 0, 0, sd, partCols, null, null, null, null);
+ Table table1 = new Table(tableName2, dbName, null, 0, 0, 0, sd, partCols, null, null, null,
+ null);
notificationEvent = new NotificationEvent(inputEventId, 0,
- HCatEventMessage.EventType.CREATE_TABLE.toString(),
- messageFactory.buildCreateTableMessage(table1).toString());
+ HCatEventMessage.EventType.CREATE_TABLE.toString(),
+ messageFactory.buildCreateTableMessage(table1).toString());
notificationEvent.setDbName(dbName);
notificationEvent.setTableName(tableName2);
events.add(notificationEvent);
// Process the notification
- hmsFollower.processNotificationEvents(events);
+ hmsFollower.processNotifications(events);
// Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification
// and persistLastProcessedNotificationID was not invoked.
+ //noinspection unchecked
verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(),
- Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
+ Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
}
+
+ /**
+ * Constructs a two events and passed to processor of hms follower. First one being create table
+ * event with location information(Invalid Event). Idea is to make sure that hms follower calls
+ * appropriate sentry store API's for the event processed by hms follower after processing the
+ * invalid create table event.
+ *
+ * @throws Exception
+ */
+ public void testCreateTableAfterInvalidEvent() throws Exception {
+ String dbName = "db1";
+ String tableName = "table1";
+ long inputEventId = 1;
+
+ Mockito.doNothing().when(sentryStore).persistLastProcessedNotificationID(Mockito.anyLong());
+ //noinspection unchecked
+ Mockito.doNothing().when(sentryStore)
+ .addAuthzPathsMapping(Mockito.anyString(), Mockito.anyCollection(),
+ Mockito.any(Updateable.Update.class));
+
+ // Create invalid notification event. The location of the storage descriptor is null, which is invalid for creating table
+ StorageDescriptor invalidSd = new StorageDescriptor();
+ invalidSd.setLocation(null);
+ NotificationEvent invalidNotificationEvent = new NotificationEvent(inputEventId, 0,
+ HCatEventMessage.EventType.CREATE_TABLE.toString(),
+ messageFactory.buildCreateTableMessage(
+ new Table(tableName, dbName, null, 0, 0, 0, invalidSd, null, null, null, null, null))
+ .toString());
+
+ // Create valid notification event
+ StorageDescriptor sd = new StorageDescriptor();
+ sd.setLocation("hdfs://db1.db/table1");
+ inputEventId += 1;
+ NotificationEvent notificationEvent = new NotificationEvent(inputEventId, 0,
+ HCatEventMessage.EventType.CREATE_TABLE.toString(),
+ messageFactory.buildCreateTableMessage(
+ new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null))
+ .toString());
+ List<NotificationEvent> events = new ArrayList<>();
+ events.add(invalidNotificationEvent);
+ events.add(notificationEvent);
+
+ Configuration configuration = new Configuration();
+ HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
+ hiveConnectionFactory, hiveInstance);
+ hmsFollower.processNotifications(events);
+
+ // invalid event updates notification ID directly
+ verify(sentryStore, times(1)).persistLastProcessedNotificationID(inputEventId - 1);
+
+ // next valid event update path, which updates notification ID
+ //noinspection unchecked
+ verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), Mockito.anyCollection(),
+ Mockito.any(Updateable.Update.class));
+ }
}