You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ka...@apache.org on 2017/07/13 23:56:04 UTC
[1/3] sentry git commit: Revert "SENTRY-1769 Refactor HMSFollower
Class (Kalyan Kumar Kalvagadda reviewed by Vamsee Yarlagadda, Na Li,
Sergio Pena and Alexander Kolbasov)"
Repository: sentry
Updated Branches:
refs/heads/sentry-ha-redesign e5bb466ef -> c56f48cc6
http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestNotificationProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestNotificationProcessor.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestNotificationProcessor.java
deleted file mode 100644
index c6c9448..0000000
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestNotificationProcessor.java
+++ /dev/null
@@ -1,465 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.service.thrift;
-
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hive.hcatalog.messaging.HCatEventMessage;
-import org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageFactory;
-import org.apache.sentry.hdfs.Updateable;
-import org.apache.sentry.provider.db.service.persistent.SentryStore;
-import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable;
-import org.junit.After;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-// TODO 1. More tests should be added here.
-// TODO 2. Tests using actual sentry store where.
-@SuppressWarnings("unused")
-public class TestNotificationProcessor {
-
- private static final SentryStore sentryStore = Mockito.mock(SentryStore.class);
- private final static String hiveInstance = "server2";
- private final static Configuration conf = new Configuration();
- private final SentryJSONMessageFactory messageFactory = new SentryJSONMessageFactory();
- private NotificationProcessor notificationProcessor;
-
- @BeforeClass
- public static void setup() {
- conf.set("sentry.hive.sync.create", "true");
- conf.set("sentry.hive.sync.drop", "true");
- }
-
- @After
- public void resetConf() {
- conf.set("sentry.hive.sync.create", "true");
- conf.set("sentry.hive.sync.drop", "true");
- reset(sentryStore);
- }
-
- @Test
- /*
- Makes sure that appropriate sentry store methods are invoked when create database event is
- processed.
-
- Also, checks the hive sync configuration.
- */
- public void testCreateDatabase() throws Exception {
- long seqNum = 1;
- String dbName = "db1";
- String uriPrefix = "hdfs:///";
- String location = "user/hive/warehouse";
- NotificationEvent notificationEvent;
- TSentryAuthorizable authorizable;
- notificationProcessor = new NotificationProcessor(sentryStore,
- hiveInstance, conf);
-
- // Create notification event
- notificationEvent = new NotificationEvent(seqNum, 0,
- HCatEventMessage.EventType.CREATE_DATABASE.toString(),
- messageFactory.buildCreateDatabaseMessage(new Database(dbName,
- null, uriPrefix + location, null)).toString());
-
- notificationProcessor.processNotificationEvent(notificationEvent);
-
- authorizable = new TSentryAuthorizable(hiveInstance);
- authorizable.setServer(hiveInstance);
- authorizable.setDb("db1");
- //noinspection unchecked
- verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(),
- Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
-
- verify(sentryStore, times(1)).dropPrivilege(authorizable,
- NotificationProcessor.getPermUpdatableOnDrop(authorizable));
- reset(sentryStore);
-
- //Change the configuration and make sure that exiting privileges are not dropped
- notificationProcessor.setSyncStoreOnCreate(false);
- dbName = "db2";
- notificationEvent = new NotificationEvent(1, 0,
- HCatEventMessage.EventType.CREATE_DATABASE.toString(),
- messageFactory.buildCreateDatabaseMessage(new Database(dbName,
- null, "hdfs:///db2", null)).toString());
-
- notificationProcessor.processNotificationEvent(notificationEvent);
-
- authorizable = new TSentryAuthorizable(hiveInstance);
- authorizable.setServer(hiveInstance);
- authorizable.setDb(dbName);
-
- //noinspection unchecked
- verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(),
- Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
- //making sure that privileges are not dropped
- verify(sentryStore, times(0)).dropPrivilege(authorizable,
- NotificationProcessor.getPermUpdatableOnDrop(authorizable));
-
- }
-
- @Test
- /*
- Makes sure that appropriate sentry store methods are invoked when drop database event is
- processed.
-
- Also, checks the hive sync configuration.
- */
- public void testDropDatabase() throws Exception {
- String dbName = "db1";
-
- notificationProcessor = new NotificationProcessor(sentryStore,
- hiveInstance, conf);
-
- // Create notification event
- NotificationEvent notificationEvent = new NotificationEvent(1, 0,
- HCatEventMessage.EventType.DROP_DATABASE.toString(),
- messageFactory.buildDropDatabaseMessage(new Database(dbName, null,
- "hdfs:///db1", null)).toString());
-
- notificationProcessor.processNotificationEvent(notificationEvent);
-
- TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
- authorizable.setServer(hiveInstance);
- authorizable.setDb("db1");
-
- //noinspection unchecked
- verify(sentryStore, times(1)).deleteAuthzPathsMapping(Mockito.anyString(),
- Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
- verify(sentryStore, times(1)).dropPrivilege(authorizable,
- NotificationProcessor.getPermUpdatableOnDrop(authorizable));
- reset(sentryStore);
-
- // Change the configuration and make sure that exiting privileges are not dropped
- notificationProcessor.setSyncStoreOnDrop(false);
- dbName = "db2";
- // Create notification event
- notificationEvent = new NotificationEvent(1, 0,
- HCatEventMessage.EventType.DROP_DATABASE.toString(),
- messageFactory.buildDropDatabaseMessage(new Database(dbName, null,
- "hdfs:///db2", null)).toString());
-
- notificationProcessor.processNotificationEvent(notificationEvent);
-
- authorizable = new TSentryAuthorizable(hiveInstance);
- authorizable.setServer(hiveInstance);
- authorizable.setDb(dbName);
-
- //noinspection unchecked
- verify(sentryStore, times(1)).deleteAuthzPathsMapping(Mockito.anyString(),
- Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
- verify(sentryStore, times(0)).dropPrivilege(authorizable,
- NotificationProcessor.getPermUpdatableOnDrop(authorizable));
- }
-
- @Test
- /*
- Makes sure that appropriate sentry store methods are invoked when create table event is
- processed.
-
- Also, checks the hive sync configuration.
- */
- public void testCreateTable() throws Exception {
- String dbName = "db1";
- String tableName = "table1";
-
- notificationProcessor = new NotificationProcessor(sentryStore,
- hiveInstance, conf);
-
- // Create notification event
- StorageDescriptor sd = new StorageDescriptor();
- sd.setLocation("hdfs:///db1.db/table1");
- NotificationEvent notificationEvent =
- new NotificationEvent(1, 0, HCatEventMessage.EventType.CREATE_TABLE.toString(),
- messageFactory.buildCreateTableMessage(new Table(tableName,
- dbName, null, 0, 0, 0, sd, null, null, null, null, null)).toString());
-
- notificationProcessor.processNotificationEvent(notificationEvent);
-
- TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
- authorizable.setServer(hiveInstance);
- authorizable.setDb("db1");
- authorizable.setTable(tableName);
-
- //noinspection unchecked
- verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(),
- Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
-
- verify(sentryStore, times(1)).dropPrivilege(authorizable,
- NotificationProcessor.getPermUpdatableOnDrop(authorizable));
- reset(sentryStore);
-
- // Change the configuration and make sure that existing privileges are not dropped
- notificationProcessor.setSyncStoreOnCreate(false);
-
- // Create notification event
- dbName = "db2";
- tableName = "table2";
- sd = new StorageDescriptor();
- sd.setLocation("hdfs:///db1.db/table2");
- notificationEvent =
- new NotificationEvent(1, 0, HCatEventMessage.EventType.CREATE_TABLE.toString(),
- messageFactory.buildCreateTableMessage(new Table(tableName,
- dbName, null, 0, 0, 0, sd, null, null, null, null, null)).toString());
-
- notificationProcessor.processNotificationEvent(notificationEvent);
-
- authorizable = new TSentryAuthorizable(hiveInstance);
- authorizable.setServer(hiveInstance);
- authorizable.setDb(dbName);
- authorizable.setTable(tableName);
-
- //noinspection unchecked
- verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(),
- Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
- // Making sure that privileges are not dropped
- verify(sentryStore, times(0)).dropPrivilege(authorizable,
- NotificationProcessor.getPermUpdatableOnDrop(authorizable));
- }
-
- @Test
- /*
- Makes sure that appropriate sentry store methods are invoked when drop table event is
- processed.
-
- Also, checks the hive sync configuration.
- */
- public void testDropTable() throws Exception {
- String dbName = "db1";
- String tableName = "table1";
-
- Configuration authConf = new Configuration();
- notificationProcessor = new NotificationProcessor(sentryStore,
- hiveInstance, authConf);
-
- // Create notification event
- StorageDescriptor sd = new StorageDescriptor();
- sd.setLocation("hdfs:///db1.db/table1");
- NotificationEvent notificationEvent = new NotificationEvent(1, 0,
- HCatEventMessage.EventType.DROP_TABLE.toString(),
- messageFactory.buildDropTableMessage(new Table(tableName,
- dbName, null, 0, 0, 0, sd, null, null, null, null, null)).toString());
-
- notificationProcessor.processNotificationEvent(notificationEvent);
-
- TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
- authorizable.setServer(hiveInstance);
- authorizable.setDb("db1");
- authorizable.setTable(tableName);
-
- verify(sentryStore, times(1)).deleteAllAuthzPathsMapping(Mockito.anyString(),
- Mockito.any(Updateable.Update.class));
-
- verify(sentryStore, times(1)).dropPrivilege(authorizable,
- NotificationProcessor.getPermUpdatableOnDrop(authorizable));
- }
-
- @Test
- /*
- Makes sure that appropriate sentry store methods are invoked when alter tables event is
- processed.
- */
- public void testAlterTable() throws Exception {
- String dbName = "db1";
- String tableName = "table1";
-
- String newDbName = "db1";
- String newTableName = "table2";
-
- Configuration authConf = new Configuration();
- notificationProcessor = new NotificationProcessor(sentryStore,
- hiveInstance, authConf);
-
- // Create notification event
- StorageDescriptor sd = new StorageDescriptor();
- sd.setLocation("hdfs:///db1.db/table1");
- NotificationEvent notificationEvent = new NotificationEvent(1, 0,
- HCatEventMessage.EventType.ALTER_TABLE.toString(),
- messageFactory.buildAlterTableMessage(
- new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null),
- new Table(newTableName, newDbName, null, 0, 0, 0, sd, null, null, null, null, null))
- .toString());
- notificationEvent.setDbName(newDbName);
- notificationEvent.setTableName(newTableName);
-
- notificationProcessor.processNotificationEvent(notificationEvent);
-
- TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
- authorizable.setServer(hiveInstance);
- authorizable.setDb(dbName);
- authorizable.setTable(tableName);
-
- TSentryAuthorizable newAuthorizable = new TSentryAuthorizable(hiveInstance);
- authorizable.setServer(hiveInstance);
- newAuthorizable.setDb(newDbName);
- newAuthorizable.setTable(newTableName);
-
- verify(sentryStore, times(1)).renameAuthzObj(Mockito.anyString(), Mockito.anyString(),
- Mockito.any(Updateable.Update.class));
-
- verify(sentryStore, times(1)).renamePrivilege(authorizable, newAuthorizable,
- NotificationProcessor.getPermUpdatableOnRename(authorizable, newAuthorizable));
- }
-
- @Test
- /*
- Makes sure that appropriate sentry store methods are invoked when alter tables event is
- processed.
- */
- public void testRenameTableWithLocationUpdate() throws Exception {
- String dbName = "db1";
- String tableName = "table1";
-
- String newDbName = "db1";
- String newTableName = "table2";
-
- Configuration authConf = new Configuration();
- notificationProcessor = new NotificationProcessor(sentryStore,
- hiveInstance, authConf);
-
- // Create notification event
- StorageDescriptor sd = new StorageDescriptor();
- sd.setLocation("hdfs:///db1.db/table1");
- StorageDescriptor new_sd = new StorageDescriptor();
- new_sd.setLocation("hdfs:///db1.db/table2");
- NotificationEvent notificationEvent = new NotificationEvent(1, 0,
- HCatEventMessage.EventType.ALTER_TABLE.toString(),
- messageFactory.buildAlterTableMessage(
- new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null),
- new Table(newTableName, newDbName, null, 0, 0, 0, new_sd, null, null, null, null, null))
- .toString());
- notificationEvent.setDbName(newDbName);
- notificationEvent.setTableName(newTableName);
-
- notificationProcessor.processNotificationEvent(notificationEvent);
-
- TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
- authorizable.setServer(hiveInstance);
- authorizable.setDb(dbName);
- authorizable.setTable(tableName);
-
- TSentryAuthorizable newAuthorizable = new TSentryAuthorizable(hiveInstance);
- authorizable.setServer(hiveInstance);
- newAuthorizable.setDb(newDbName);
- newAuthorizable.setTable(newTableName);
-
- verify(sentryStore, times(1)).renameAuthzPathsMapping(Mockito.anyString(), Mockito.anyString(),
- Mockito.anyString(), Mockito.anyString(), Mockito.any(Updateable.Update.class));
-
- verify(sentryStore, times(1)).renamePrivilege(authorizable, newAuthorizable,
- NotificationProcessor.getPermUpdatableOnRename(authorizable, newAuthorizable));
- }
-
- @Test
- /*
- Test to made sure that sentry store is not invoked when invalid alter table event is
- processed.
- */
- public void testAlterTableWithInvalidEvent() throws Exception {
- String dbName = "db1";
- String tableName1 = "table1";
- String tableName2 = "table2";
- long inputEventId = 1;
- NotificationEvent notificationEvent;
- List<FieldSchema> partCols;
- StorageDescriptor sd;
- Mockito.doNothing().when(sentryStore).persistLastProcessedNotificationID(Mockito.anyLong());
- //noinspection unchecked
- Mockito.doNothing().when(sentryStore).addAuthzPathsMapping(Mockito.anyString(),
- Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
-
- Configuration authConf = new Configuration();
- notificationProcessor = new NotificationProcessor(sentryStore,
- hiveInstance, authConf);
-
- // Create a table
- sd = new StorageDescriptor();
- sd.setLocation("hdfs://db1.db/table1");
- partCols = new ArrayList<>();
- partCols.add(new FieldSchema("ds", "string", ""));
- Table table = new Table(tableName1, dbName, null, 0, 0, 0, sd, partCols,
- null, null, null, null);
- notificationEvent = new NotificationEvent(inputEventId, 0,
- HCatEventMessage.EventType.CREATE_TABLE.toString(),
- messageFactory.buildCreateTableMessage(table).toString());
- notificationEvent.setDbName(dbName);
- notificationEvent.setTableName(tableName1);
- inputEventId += 1;
- // Process the notification
- notificationProcessor.processNotificationEvent(notificationEvent);
- // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification
- // and persistLastProcessedNotificationID was not invoked.
- //noinspection unchecked
- verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(),
- Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
- reset(sentryStore);
-
- // Create alter table notification with out actually changing anything.
- // This notification should not be processed by sentry server
- // Notification should be persisted explicitly
- notificationEvent = new NotificationEvent(1, 0,
- HCatEventMessage.EventType.ALTER_TABLE.toString(),
- messageFactory.buildAlterTableMessage(
- new Table(tableName1, dbName, null, 0, 0, 0, sd, null, null, null, null, null),
- new Table(tableName1, dbName, null, 0, 0, 0, sd, null,
- null, null, null, null)).toString());
- notificationEvent.setDbName(dbName);
- notificationEvent.setTableName(tableName1);
- inputEventId += 1;
- // Process the notification
- notificationProcessor.processNotificationEvent(notificationEvent);
- // Make sure that renameAuthzObj and deleteAuthzPathsMapping were not invoked
- // to handle CREATE_TABLE notification
- // and persistLastProcessedNotificationID is explicitly invoked
- verify(sentryStore, times(0)).renameAuthzObj(Mockito.anyString(), Mockito.anyString(),
- Mockito.any(Updateable.Update.class));
- //noinspection unchecked
- verify(sentryStore, times(0)).deleteAuthzPathsMapping(Mockito.anyString(),
- Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
- reset(sentryStore);
-
- // Create a table
- sd = new StorageDescriptor();
- sd.setLocation("hdfs://db1.db/table2");
- partCols = new ArrayList<>();
- partCols.add(new FieldSchema("ds", "string", ""));
- Table table1 = new Table(tableName2, dbName, null, 0, 0, 0, sd,
- partCols, null, null, null, null);
- notificationEvent = new NotificationEvent(inputEventId, 0,
- HCatEventMessage.EventType.CREATE_TABLE.toString(),
- messageFactory.buildCreateTableMessage(table1).toString());
- notificationEvent.setDbName(dbName);
- notificationEvent.setTableName(tableName2);
- // Process the notification
- notificationProcessor.processNotificationEvent(notificationEvent);
- // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification
- // and persistLastProcessedNotificationID was not invoked.
- //noinspection unchecked
- verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(),
- Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
- }
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestSentryHmsClient.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestSentryHmsClient.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestSentryHmsClient.java
deleted file mode 100644
index 3cc6541..0000000
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestSentryHmsClient.java
+++ /dev/null
@@ -1,470 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sentry.service.thrift;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Random;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
-import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hive.hcatalog.messaging.HCatEventMessage;
-import org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageFactory;
-import org.apache.sentry.provider.db.service.persistent.PathsImage;
-import org.apache.thrift.TException;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import javax.security.auth.login.LoginException;
-
-/**
- * Test mocks HiveMetaStoreClient class and tests SentryHmsClient.
- */
-public class TestSentryHmsClient {
-
- private static final Configuration conf = new Configuration();
- private static final SentryJSONMessageFactory messageFactory = new SentryJSONMessageFactory();
- private static SentryHmsClient client;
- private static MockHMSClientFactory hiveConnectionFactory;
-
- /**
- * Create mock database with the given name
- *
- * @param name Database name
- * @return Mock database object
- */
- private static Database makeDb(String name) {
- Database db = Mockito.mock(Database.class);
- Mockito.when(db.getName()).thenReturn(name);
- Mockito.when(db.getLocationUri()).thenReturn("hdfs:///" + name);
- return db;
- }
-
- /**
- * Create mock table
- *
- * @param dbName db for this table
- * @param tableName name of the table
- * @return mock table object
- */
- private static Table makeTable(String dbName, String tableName) {
- Table table = Mockito.mock(Table.class);
- Mockito.when(table.getDbName()).thenReturn(dbName);
- Mockito.when(table.getTableName()).thenReturn(tableName);
- StorageDescriptor sd = Mockito.mock(StorageDescriptor.class);
- Mockito.when(sd.getLocation()).thenReturn(
- String.format("hdfs:///%s/%s", dbName, tableName));
- Mockito.when(table.getSd()).thenReturn(sd);
- return table;
- }
-
- /**
- * Create mock partition
- *
- * @param dbName database for this partition
- * @param tableName table for this partition
- * @param partName partition name
- * @return mock partition object
- */
- private static Partition makePartition(String dbName, String tableName, String partName) {
- Partition partition = Mockito.mock(Partition.class);
- StorageDescriptor sd = Mockito.mock(StorageDescriptor.class);
- Mockito.when(sd.getLocation()).thenReturn(
- String.format("hdfs:///%s/%s/%s", dbName, tableName, partName));
- Mockito.when(partition.getSd()).thenReturn(sd);
- return partition;
- }
-
- /**
- * Creates create database notification
- *
- * @return NotificationEvent
- */
- private static NotificationEvent getCreateDatabaseNotification(long id) {
- Random rand = new Random();
- int n = rand.nextInt(100) + 1;
- String dbName = "db" + n;
- return new NotificationEvent(id, 0, HCatEventMessage.EventType.CREATE_DATABASE.toString(),
- messageFactory
- .buildCreateDatabaseMessage(new Database(dbName, null, "hdfs:///" + dbName, null))
- .toString());
- }
-
- /**
- * Creates drop database notification
- *
- * @return NotificationEvent
- */
- private static NotificationEvent getDropDatabaseNotification(long id) {
- Random rand = new Random();
- int n = rand.nextInt(100) + 1;
- String dbName = "db" + n;
- return new NotificationEvent(id, 0, HCatEventMessage.EventType.DROP_DATABASE.toString(),
- messageFactory
- .buildDropDatabaseMessage(new Database(dbName, null, "hdfs:///" + dbName, null))
- .toString());
- }
-
- @BeforeClass
- static public void initialize() throws IOException, LoginException {
- hiveConnectionFactory = new MockHMSClientFactory();
- client = new SentryHmsClient(conf, (HiveConnectionFactory)hiveConnectionFactory);
- }
-
- /**
- * Creating snapshot when SentryHmsClient is not connected to HMS
- */
- @Test
- public void testSnapshotCreationWithOutClientConnected() throws Exception {
- // Make sure that client is not connected
- Assert.assertFalse(client.isConnected());
- PathsImage snapshotInfo = client.getFullSnapshot();
- Assert.assertTrue(snapshotInfo.getPathImage().isEmpty());
- }
-
- /**
- * Creating snapshot when HMS doesn't have any data
- */
- @Test
- public void testSnapshotCreationWithNoHmsData() throws Exception {
- MockClient mockClient = new MockClient(new HiveSnapshot(), 1);
- client.setClient(mockClient.client);
- // Make sure that client is connected
- Assert.assertTrue(client.isConnected());
- PathsImage snapshotInfo = client.getFullSnapshot();
- Assert.assertTrue(snapshotInfo.getPathImage().isEmpty());
- }
-
- /**
- * Creating a snapshot when there is data but there are updates to HMS data mean while
- */
- @Test
- public void testSnapshotCreationWhenDataIsActivelyUpdated() throws Exception {
- HiveTable tab21 = new HiveTable("tab21");
- HiveTable tab31 = new HiveTable("tab31").add("part311").add("part312");
- HiveDb db3 = new HiveDb("db3", Lists.newArrayList(tab31));
- HiveDb db2 = new HiveDb("db2", Lists.newArrayList(tab21));
- HiveDb db1 = new HiveDb("db1");
- HiveSnapshot snap = new HiveSnapshot().add(db1).add(db2).add(db3);
- final MockClient mockClient = new MockClient(snap, 1);
-
- client.setClient(mockClient.client);
- hiveConnectionFactory.setClient(mockClient);
- // Make sure that client is connected
- Assert.assertTrue(client.isConnected());
- PathsImage snapshotInfo = client.getFullSnapshot();
- // Make sure that snapshot is not empty
- Assert.assertTrue(!snapshotInfo.getPathImage().isEmpty());
-
- Mockito.when(mockClient.client.getCurrentNotificationEventId()).
- thenAnswer(new Answer<CurrentNotificationEventId>() {
- @Override
- public CurrentNotificationEventId answer(InvocationOnMock invocation)
- throws Throwable {
- return new CurrentNotificationEventId(mockClient.incrementNotificationEventId());
- }
-
- });
-
- snapshotInfo = client.getFullSnapshot();
- Assert.assertTrue(snapshotInfo.getPathImage().isEmpty());
- }
-
- /**
- * Creating a snapshot when there is data in HMS.
- */
- @Test
- public void testSnapshotCreationSuccess() throws Exception {
- HiveTable tab21 = new HiveTable("tab21");
- HiveTable tab31 = new HiveTable("tab31");
- HiveDb db3 = new HiveDb("db3", Lists.newArrayList(tab31));
- HiveDb db2 = new HiveDb("db2", Lists.newArrayList(tab21));
- HiveDb db1 = new HiveDb("db1");
- HiveSnapshot snap = new HiveSnapshot().add(db1).add(db2).add(db3);
- MockClient mockClient = new MockClient(snap, 1);
- Mockito.when(mockClient.client.getCurrentNotificationEventId()).
- thenReturn(new CurrentNotificationEventId(mockClient.eventId));
- client.setClient(mockClient.client);
- hiveConnectionFactory.setClient(mockClient);
- // Make sure that client is connected
- Assert.assertTrue(client.isConnected());
-
- PathsImage snapshotInfo = client.getFullSnapshot();
- Assert.assertEquals(5, snapshotInfo.getPathImage().size());
- Assert.assertEquals(Sets.newHashSet("db1"), snapshotInfo.getPathImage().get("db1"));
- Assert.assertEquals(Sets.newHashSet("db2"), snapshotInfo.getPathImage().get("db2"));
- Assert.assertEquals(Sets.newHashSet("db3"), snapshotInfo.getPathImage().get("db3"));
- Assert.assertEquals(Sets.newHashSet("db2/tab21"),
- snapshotInfo.getPathImage().get("db2.tab21"));
- Assert.assertEquals(Sets.newHashSet("db3/tab31"), snapshotInfo.getPathImage().get("db3.tab31"));
- Assert.assertEquals(snapshotInfo.getId(), mockClient.eventId);
-
- }
-
- /**
- * Test scenario when there is no HMS connection
- * Getting new notifications
- */
- @Test
- public void testGetNewNotificationsWithOutClientConnected() throws Exception {
- HiveTable tab21 = new HiveTable("tab21");
- HiveTable tab31 = new HiveTable("tab31");
- HiveDb db3 = new HiveDb("db3", Lists.newArrayList(tab31));
- HiveDb db2 = new HiveDb("db2", Lists.newArrayList(tab21));
- HiveDb db1 = new HiveDb("db1");
- client.setClient(null);
- HiveSnapshot snap = new HiveSnapshot().add(db1).add(db2).add(db3);
- MockClient mockClient = new MockClient(snap, 100);
- Mockito.when(mockClient.client.getCurrentNotificationEventId()).
- thenReturn(new CurrentNotificationEventId(mockClient.eventId));
- // Make sure that client is not connected
- Assert.assertTrue(!client.isConnected());
- Collection<NotificationEvent> events = client.getNotifications(100);
- Assert.assertTrue(events.isEmpty());
-
- }
-
- /**
- * Test scenario where there are no notifications
- * Getting new notifications
- */
- @Test
- public void testGetNewNotificationsWithNoHmsUpdates() throws Exception {
- HiveTable tab21 = new HiveTable("tab21");
- HiveTable tab31 = new HiveTable("tab31");
- HiveDb db3 = new HiveDb("db3", Lists.newArrayList(tab31));
- HiveDb db2 = new HiveDb("db2", Lists.newArrayList(tab21));
- HiveDb db1 = new HiveDb("db1");
- HiveSnapshot snap = new HiveSnapshot().add(db1).add(db2).add(db3);
- MockClient mockClient = new MockClient(snap, 100);
- Mockito.when(mockClient.client.getCurrentNotificationEventId()).
- thenReturn(new CurrentNotificationEventId(mockClient.eventId));
- client.setClient(mockClient.client);
- hiveConnectionFactory.setClient(mockClient);
- // Make sure that client is connected
- Assert.assertTrue(client.isConnected());
- Collection<NotificationEvent> events = client.getNotifications(100);
- Assert.assertTrue(events.isEmpty());
- }
-
- /**
- * Test scenario where there are notifications
- * Getting new notifications
- */
- @Test
- public void testGetNewNotificationsSuccess() throws Exception {
- final MockClient mockClient = new MockClient(new HiveSnapshot(), 100);
- client.setClient(mockClient.client);
- hiveConnectionFactory.setClient(mockClient);
- // Make sure that client is connected
- Assert.assertTrue(client.isConnected());
-
- Mockito.when(mockClient.client.getCurrentNotificationEventId()).
- thenAnswer(new Answer<CurrentNotificationEventId>() {
- @Override
- public CurrentNotificationEventId answer(InvocationOnMock invocation)
- throws Throwable {
- return new CurrentNotificationEventId(mockClient.incrementNotificationEventId());
- }
- });
- Mockito.when(mockClient.client.getNextNotification(Mockito.anyLong(), Mockito.anyInt(),
- Mockito.any(NotificationFilter.class))).
- thenAnswer(new Answer<NotificationEventResponse>() {
- @Override
- public NotificationEventResponse answer(InvocationOnMock invocation)
- throws Throwable {
- long id = 1;
- List<NotificationEvent> events = new ArrayList<>();
- events.add(getCreateDatabaseNotification(id++));
- events.add(getDropDatabaseNotification(id++));
- return new NotificationEventResponse(events);
- }
- });
-
- Collection<NotificationEvent> events = client.getNotifications(100);
- long id = 1;
- for (NotificationEvent event : events) {
- Assert.assertEquals(event.getEventId(), id++);
- }
- Assert.assertTrue(events.size() == 2);
- }
-
- /**
- * Representation of a Hive table. A table has a name and a list of partitions.
- */
- private static class HiveTable {
-
- private final String name;
- private final List<String> partitions;
-
- HiveTable(String name) {
- this.name = name;
- this.partitions = new ArrayList<>();
- }
-
- HiveTable add(String partition) {
- partitions.add(partition);
- return this;
- }
- }
-
- /**
- * Representation of a Hive database. A database has a name and a list of tables
- */
- private static class HiveDb {
-
- final String name;
- Collection<HiveTable> tables;
-
- @SuppressWarnings("SameParameterValue")
- HiveDb(String name) {
- this.name = name;
- tables = new ArrayList<>();
- }
-
- HiveDb(String name, Collection<HiveTable> tables) {
- this.name = name;
- this.tables = tables;
- if (this.tables == null) {
- this.tables = new ArrayList<>();
- }
- }
-
- void add(HiveTable table) {
- this.tables.add(table);
- }
- }
-
- /**
- * Representation of a full Hive snapshot. A snapshot is collection of databases
- */
- private static class HiveSnapshot {
-
- final List<HiveDb> databases = new ArrayList<>();
-
- HiveSnapshot() {
- }
-
- HiveSnapshot(Collection<HiveDb> dblist) {
- if (dblist != null) {
- databases.addAll(dblist);
- }
- }
-
- HiveSnapshot add(HiveDb db) {
- this.databases.add(db);
- return this;
- }
- }
-
- /**
- * Mock for HMSClientFactory
- */
- private static class MockHMSClientFactory implements HiveConnectionFactory {
-
- private HiveMetaStoreClient mClient;
-
- public MockHMSClientFactory() {
- mClient = null;
- }
-
- void setClient(MockClient mockClient) {
- this.mClient = mockClient.client;
- }
- @Override
- public HMSClient connect() throws IOException, InterruptedException, MetaException {
- return new HMSClient(mClient);
- }
-
- @Override
- public void close() throws Exception {
- }
- }
-
- /**
- * Convert Hive snapshot to mock client that will return proper values
- * for the snapshot.
- */
- private static class MockClient {
-
- public HiveMetaStoreClient client;
- public long eventId;
-
- MockClient(HiveSnapshot snapshot, long eventId) throws TException {
- this.eventId = eventId;
- client = Mockito.mock(HiveMetaStoreClient.class);
- List<String> dbNames = new ArrayList<>(snapshot.databases.size());
- // Walk over all databases and mock appropriate objects
- for (HiveDb mdb : snapshot.databases) {
- String dbName = mdb.name;
- dbNames.add(dbName);
- Database db = makeDb(dbName);
- Mockito.when(client.getDatabase(dbName)).thenReturn(db);
- List<String> tableNames = new ArrayList<>(mdb.tables.size());
- // Walk over all tables for the database and mock appropriate objects
- for (HiveTable table : mdb.tables) {
- String tableName = table.name;
- tableNames.add(tableName);
- Table mockTable = makeTable(dbName, tableName);
- Mockito.when(client.getTableObjectsByName(dbName,
- Lists.newArrayList(tableName)))
- .thenReturn(Lists.newArrayList(mockTable));
- Mockito.when(client.listPartitionNames(dbName, tableName, (short) -1))
- .thenReturn(table.partitions);
- // Walk across all partitions and mock appropriate objects
- for (String partName : table.partitions) {
- Partition p = makePartition(dbName, tableName, partName);
- Mockito.when(client.getPartitionsByNames(dbName, tableName,
- Lists.<String>newArrayList(partName)))
- .thenReturn(Lists.<Partition>newArrayList(p));
- }
- }
- Mockito.when(client.getAllTables(dbName)).thenReturn(tableNames);
- }
- // Return all database names
- Mockito.when(client.getAllDatabases()).thenReturn(dbNames);
- Mockito.when(client.getCurrentNotificationEventId()).
- thenReturn(new CurrentNotificationEventId(eventId));
-
- }
-
- public Long incrementNotificationEventId() {
- eventId = eventId + 1;
- return eventId;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestDbPrivilegeCleanupOnDrop.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestDbPrivilegeCleanupOnDrop.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestDbPrivilegeCleanupOnDrop.java
index d619623..b9330cc 100644
--- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestDbPrivilegeCleanupOnDrop.java
+++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestDbPrivilegeCleanupOnDrop.java
@@ -31,7 +31,7 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
-import org.apache.sentry.service.thrift.HmsFollower;
+import org.apache.sentry.service.thrift.HMSFollower;
import org.apache.sentry.tests.e2e.hive.AbstractTestWithStaticConfiguration;
import org.junit.BeforeClass;
import org.junit.Before;
@@ -78,7 +78,7 @@ public class TestDbPrivilegeCleanupOnDrop extends
to.close();
// Check the HMS connection only when notification log is enabled.
if (enableNotificationLog) {
- while (!HmsFollower.isConnectedToHms()) {
+ while (!HMSFollower.isConnectedToHMS()) {
Thread.sleep(1000);
}
}
[2/3] sentry git commit: Revert "SENTRY-1769 Refactor HMSFollower
Class (Kalyan Kumar Kalvagadda reviewed by Vamsee Yarlagadda, Na Li,
Sergio Pena and Alexander Kolbasov)"
Posted by ka...@apache.org.
http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java
index a49d8c6..6762de7 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/NotificationProcessor.java
@@ -15,275 +15,82 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.sentry.service.thrift;
-import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_CREATE_WITH_POLICY_STORE;
-import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_DROP_WITH_POLICY_STORE;
-
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hive.hcatalog.messaging.HCatEventMessage;
-import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAddPartitionMessage;
-import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterPartitionMessage;
-import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterTableMessage;
-import org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateDatabaseMessage;
-import org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateTableMessage;
-import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropDatabaseMessage;
-import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropPartitionMessage;
-import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropTableMessage;
-import org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageDeserializer;
-import org.apache.sentry.core.common.exception.SentryInvalidHMSEventException;
-import org.apache.sentry.core.common.exception.SentryInvalidInputException;
-import org.apache.sentry.core.common.exception.SentryNoSuchObjectException;
import org.apache.sentry.hdfs.PathsUpdate;
-import org.apache.sentry.hdfs.PermissionsUpdate;
import org.apache.sentry.hdfs.SentryMalformedPathException;
-import org.apache.sentry.hdfs.Updateable.Update;
-import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges;
+import org.apache.sentry.core.common.exception.SentryInvalidHMSEventException;
import org.apache.sentry.provider.db.service.persistent.SentryStore;
-import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
/**
* NotificationProcessor processes various notification events generated from
- * the Hive MetaStore state change, and applies these changes to the complete
+ * the Hive MetaStore state change, and applies these changes on the complete
* HMS Paths snapshot or delta update stored in Sentry using SentryStore.
- *
- * <p>NotificationProcessor should not skip processing notification events for any reason.
+ * <p>
+ * NotificationProcessor should not skip processing notification events for any reason.
* If some notification events are to be skipped, appropriate logic should be added in
- * HmsFollower before invoking NotificationProcessor.
+ * HMSFollower before invoking NotificationProcessor.
*/
-final class NotificationProcessor {
+class NotificationProcessor {
- private static final Logger LOGGER = LoggerFactory.getLogger(NotificationProcessor.class);
+ private final Logger LOGGER;
private final SentryStore sentryStore;
- private final SentryJSONMessageDeserializer deserializer;
- private final String authServerName;
- // These variables can be updated even after object is instantiated, for testing purposes.
- private boolean syncStoreOnCreate = false;
- private boolean syncStoreOnDrop = false;
- /**
- * Configuring notification processor.
- *
- * @param sentryStore sentry backend store
- * @param authServerName Server that sentry is authorizing
- * @param conf sentry configuration
- */
- NotificationProcessor(SentryStore sentryStore, String authServerName,
- Configuration conf) {
+ NotificationProcessor(SentryStore sentryStore, Logger LOGGER) {
+ this.LOGGER = LOGGER;
this.sentryStore = sentryStore;
- deserializer = new SentryJSONMessageDeserializer();
- this.authServerName = authServerName;
- syncStoreOnCreate = Boolean
- .parseBoolean(conf.get(AUTHZ_SYNC_CREATE_WITH_POLICY_STORE.getVar(),
- AUTHZ_SYNC_CREATE_WITH_POLICY_STORE.getDefault()));
- syncStoreOnDrop = Boolean.parseBoolean(conf.get(AUTHZ_SYNC_DROP_WITH_POLICY_STORE.getVar(),
- AUTHZ_SYNC_DROP_WITH_POLICY_STORE.getDefault()));
- }
-
- /**
- * Split path into components on the "/" character.
- * The path should not start with "/".
- * This is consumed by Thrift interface, so the return result should be
- * {@code List<String>}
- *
- * @param path input oath e.g. {@code foo/bar}
- * @return list of components, e.g. [foo, bar]
- */
- private static List<String> splitPath(String path) {
- return (Lists.newArrayList(path.split("/")));
- }
-
- /**
- * Constructs permission update to be persisted for drop event that can be persisted
- * from thrift object.
- *
- * @param authorizable thrift object that is dropped.
- * @return update to be persisted
- * @throws SentryInvalidInputException if the required fields are set in argument provided
- */
- @VisibleForTesting
- static Update getPermUpdatableOnDrop(TSentryAuthorizable authorizable)
- throws SentryInvalidInputException {
- PermissionsUpdate update = new PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false);
- String authzObj = SentryServiceUtil.getAuthzObj(authorizable);
- update.addPrivilegeUpdate(authzObj)
- .putToDelPrivileges(PermissionsUpdate.ALL_ROLES, PermissionsUpdate.ALL_ROLES);
- return update;
- }
-
- /**
- * Constructs permission update to be persisted for rename event that can be persisted from thrift
- * object.
- *
- * @param oldAuthorizable old thrift object
- * @param newAuthorizable new thrift object
- * @return update to be persisted
- * @throws SentryInvalidInputException if the required fields are set in arguments provided
- */
- @VisibleForTesting
- static Update getPermUpdatableOnRename(TSentryAuthorizable oldAuthorizable,
- TSentryAuthorizable newAuthorizable)
- throws SentryInvalidInputException {
- String oldAuthz = SentryServiceUtil.getAuthzObj(oldAuthorizable);
- String newAuthz = SentryServiceUtil.getAuthzObj(newAuthorizable);
- PermissionsUpdate update = new PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false);
- TPrivilegeChanges privUpdate = update.addPrivilegeUpdate(PermissionsUpdate.RENAME_PRIVS);
- privUpdate.putToAddPrivileges(newAuthz, newAuthz);
- privUpdate.putToDelPrivileges(oldAuthz, oldAuthz);
- return update;
- }
-
- /**
- * This function is only used for testing purposes.
- *
- * @param value to be set
- */
- @SuppressWarnings("SameParameterValue")
- @VisibleForTesting
- void setSyncStoreOnCreate(boolean value) {
- syncStoreOnCreate = value;
- }
-
- /**
- * This function is only used for testing purposes.
- *
- * @param value to be set
- */
- @SuppressWarnings("SameParameterValue")
- @VisibleForTesting
- void setSyncStoreOnDrop(boolean value) {
- syncStoreOnDrop = value;
- }
-
- /**
- * Processes the event and persist to sentry store.
- *
- * @param event to be processed
- * @return true, if the event is persisted to sentry store. false, if the event is not persisted.
- * @throws Exception if there is an error processing the event.
- */
- boolean processNotificationEvent(NotificationEvent event) throws Exception {
- LOGGER
- .debug("Processing event with id:{} and Type:{}", event.getEventId(), event.getEventType());
- switch (HCatEventMessage.EventType.valueOf(event.getEventType())) {
- case CREATE_DATABASE:
- return processCreateDatabase(event);
- case DROP_DATABASE:
- return processDropDatabase(event);
- case CREATE_TABLE:
- return processCreateTable(event);
- case DROP_TABLE:
- return processDropTable(event);
- case ALTER_TABLE:
- return processAlterTable(event);
- case ADD_PARTITION:
- return processAddPartition(event);
- case DROP_PARTITION:
- return processDropPartition(event);
- case ALTER_PARTITION:
- return processAlterPartition(event);
- case INSERT:
- return false;
- default:
- LOGGER.error("Notification with ID:{} has invalid event type: {}", event.getEventId(),
- event.getEventType());
- return false;
- }
}
/**
* Processes "create database" notification event, and applies its corresponding
* snapshot change as well as delta path update into Sentry DB.
*
- * @param event notification event to be processed.
+ * @param dbName database name
+ * @param location database location
+ * @param seqNum notification event ID
* @throws Exception if encounters errors while persisting the path change
*/
- private boolean processCreateDatabase(NotificationEvent event) throws Exception {
- SentryJSONCreateDatabaseMessage message =
- deserializer.getCreateDatabaseMessage(event.getMessage());
- String dbName = message.getDB();
- String location = message.getLocation();
- if ((dbName == null) || (location == null)) {
- LOGGER.error("Create database event "
- + "has incomplete information. dbName: {} location: {}",
- StringUtils.defaultIfBlank(dbName, "null"),
- StringUtils.defaultIfBlank(location, "null"));
- return false;
- }
+ void processCreateDatabase(String dbName, String location, long seqNum) throws Exception {
List<String> locations = Collections.singletonList(location);
- addPaths(dbName, locations, event.getEventId());
- if (syncStoreOnCreate) {
- dropSentryDbPrivileges(dbName, event);
- }
- return true;
+ addPaths(dbName, locations, seqNum);
}
/**
* Processes "drop database" notification event, and applies its corresponding
* snapshot change as well as delta path update into Sentry DB.
*
- * @param event notification event to be processed.
+ * @param dbName database name
+ * @param location database location
+ * @param seqNum notification event ID
* @throws Exception if encounters errors while persisting the path change
*/
- private boolean processDropDatabase(NotificationEvent event) throws Exception {
- SentryJSONDropDatabaseMessage dropDatabaseMessage =
- deserializer.getDropDatabaseMessage(event.getMessage());
- String dbName = dropDatabaseMessage.getDB();
- String location = dropDatabaseMessage.getLocation();
- if (dbName == null) {
- LOGGER.error("Drop database event has incomplete information: dbName = null");
- return false;
- }
- if (syncStoreOnDrop) {
- dropSentryDbPrivileges(dbName, event);
- }
+ void processDropDatabase(String dbName, String location, long seqNum) throws Exception {
List<String> locations = Collections.singletonList(location);
- removePaths(dbName, locations, event.getEventId());
- return true;
+ removePaths(dbName, locations, seqNum);
}
/**
* Processes "create table" notification event, and applies its corresponding
* snapshot change as well as delta path update into Sentry DB.
*
- * @param event notification event to be processed.
+ * @param dbName database name
+ * @param tableName table name
+ * @param location table location
+ * @param seqNum notification event ID
* @throws Exception if encounters errors while persisting the path change
*/
- private boolean processCreateTable(NotificationEvent event)
- throws Exception {
- SentryJSONCreateTableMessage createTableMessage = deserializer
- .getCreateTableMessage(event.getMessage());
- String dbName = createTableMessage.getDB();
- String tableName = createTableMessage.getTable();
- String location = createTableMessage.getLocation();
- if ((dbName == null) || (tableName == null) || (location == null)) {
- LOGGER.error(String.format("Create table event " + "has incomplete information."
- + " dbName = %s, tableName = %s, location = %s",
- StringUtils.defaultIfBlank(dbName, "null"),
- StringUtils.defaultIfBlank(tableName, "null"),
- StringUtils.defaultIfBlank(location, "null")));
- return false;
- }
- if (syncStoreOnCreate) {
- dropSentryTablePrivileges(dbName, tableName, event);
- }
- String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName);
+ void processCreateTable(String dbName, String tableName, String location, long seqNum)
+ throws Exception {
+ String authzObj = dbName + "." + tableName;
List<String> locations = Collections.singletonList(location);
- addPaths(authzObj, locations, event.getEventId());
- return true;
+ addPaths(authzObj, locations, seqNum);
}
/**
@@ -291,185 +98,86 @@ final class NotificationProcessor {
* the table as well. And applies its corresponding snapshot change as well
* as delta path update into Sentry DB.
*
- * @param event notification event to be processed.
+ * @param dbName database name
+ * @param tableName table name
+ * @param seqNum notification event ID
* @throws Exception if encounters errors while persisting the path change
*/
- private boolean processDropTable(NotificationEvent event) throws Exception {
- SentryJSONDropTableMessage dropTableMessage = deserializer
- .getDropTableMessage(event.getMessage());
- String dbName = dropTableMessage.getDB();
- String tableName = dropTableMessage.getTable();
- if ((dbName == null) || (tableName == null)) {
- LOGGER.error("Drop table event "
- + "has incomplete information. dbName: {}, tableName: {}",
- StringUtils.defaultIfBlank(dbName, "null"),
- StringUtils.defaultIfBlank(tableName, "null"));
- return false;
- }
- if (syncStoreOnDrop) {
- dropSentryTablePrivileges(dbName, tableName, event);
- }
- String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName);
- removeAllPaths(authzObj, event.getEventId());
- return true;
+ void processDropTable(String dbName, String tableName, long seqNum) throws Exception {
+ String authzObj = dbName + "." + tableName;
+ removeAllPaths(authzObj, seqNum);
}
/**
* Processes "alter table" notification event, and applies its corresponding
* snapshot change as well as delta path update into Sentry DB.
*
- * @param event notification event to be processed.
+ * @param oldDbName old database name
+ * @param newDbName new database name
+ * @param oldTableName old table name
+ * @param newTableName new table name
+ * @param oldLocation old table location
+ * @param newLocation new table location
+ * @param seqNum notification event ID
* @throws Exception if encounters errors while persisting the path change
*/
- private boolean processAlterTable(NotificationEvent event) throws Exception {
- SentryJSONAlterTableMessage alterTableMessage =
- deserializer.getAlterTableMessage(event.getMessage());
- String oldDbName = alterTableMessage.getDB();
- String oldTableName = alterTableMessage.getTable();
- String newDbName = event.getDbName();
- String newTableName = event.getTableName();
- String oldLocation = alterTableMessage.getOldLocation();
- String newLocation = alterTableMessage.getNewLocation();
-
- if ((oldDbName == null)
- || (oldTableName == null)
- || (newDbName == null)
- || (newTableName == null)
- || (oldLocation == null)
- || (newLocation == null)) {
- LOGGER.error(String.format("Alter table event "
- + "has incomplete information. oldDbName = %s, oldTableName = %s, oldLocation = %s, "
- + "newDbName = %s, newTableName = %s, newLocation = %s",
- StringUtils.defaultIfBlank(oldDbName, "null"),
- StringUtils.defaultIfBlank(oldTableName, "null"),
- StringUtils.defaultIfBlank(oldLocation, "null"),
- StringUtils.defaultIfBlank(newDbName, "null"),
- StringUtils.defaultIfBlank(newTableName, "null"),
- StringUtils.defaultIfBlank(newLocation, "null")));
- return false;
- }
-
- if ((oldDbName.equals(newDbName))
- && (oldTableName.equals(newTableName))
- && (oldLocation.equals(newLocation))) {
- LOGGER.error(String.format("Alter table notification ignored as neither name nor "
- + "location has changed: oldAuthzObj = %s, oldLocation = %s, newAuthzObj = %s, "
- + "newLocation = %s", oldDbName + "." + oldTableName, oldLocation,
- newDbName + "." + newTableName, newLocation));
- return false;
- }
-
- if (!newDbName.equalsIgnoreCase(oldDbName) || !oldTableName.equalsIgnoreCase(newTableName)) {
- // Name has changed
- try {
- renamePrivileges(oldDbName, oldTableName, newDbName, newTableName);
- } catch (SentryNoSuchObjectException e) {
- LOGGER.info("Rename Sentry privilege ignored as there are no privileges on the table:"
- + " {}.{}", oldDbName, oldTableName);
- } catch (Exception e) {
- LOGGER.info("Could not process Alter table event. Event: {}", event.toString(), e);
- return false;
- }
- }
+ void processAlterTable(String oldDbName, String newDbName, String oldTableName,
+ String newTableName, String oldLocation, String newLocation, long seqNum)
+ throws Exception {
String oldAuthzObj = oldDbName + "." + oldTableName;
String newAuthzObj = newDbName + "." + newTableName;
- renameAuthzPath(oldAuthzObj, newAuthzObj, oldLocation, newLocation, event.getEventId());
- return true;
+ renameAuthzPath(oldAuthzObj, newAuthzObj, oldLocation, newLocation, seqNum);
}
/**
* Processes "add partition" notification event, and applies its corresponding
* snapshot change as well as delta path update into Sentry DB.
*
- * @param event notification event to be processed.
+ * @param dbName database name
+ * @param tableName table name
+ * @param locations partition locations
+ * @param seqNum notification event ID
* @throws Exception if encounters errors while persisting the path change
*/
- private boolean processAddPartition(NotificationEvent event)
- throws Exception {
- SentryJSONAddPartitionMessage addPartitionMessage =
- deserializer.getAddPartitionMessage(event.getMessage());
- String dbName = addPartitionMessage.getDB();
- String tableName = addPartitionMessage.getTable();
- List<String> locations = addPartitionMessage.getLocations();
- if ((dbName == null) || (tableName == null) || (locations == null)) {
- LOGGER.error(String.format("Create table event has incomplete information. "
- + "dbName = %s, tableName = %s, locations = %s",
- StringUtils.defaultIfBlank(dbName, "null"),
- StringUtils.defaultIfBlank(tableName, "null"),
- locations != null ? locations.toString() : "null"));
- return false;
- }
- String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName);
- addPaths(authzObj, locations, event.getEventId());
- return true;
+ void processAddPartition(String dbName, String tableName,
+ Collection<String> locations, long seqNum)
+ throws Exception {
+ String authzObj = dbName + "." + tableName;
+ addPaths(authzObj, locations, seqNum);
}
/**
* Processes "drop partition" notification event, and applies its corresponding
* snapshot change as well as delta path update into Sentry DB.
*
- * @param event notification event to be processed.
+ * @param dbName database name
+ * @param tableName table name
+ * @param locations partition locations
+ * @param seqNum notification event ID
* @throws Exception if encounters errors while persisting the path change
*/
- private boolean processDropPartition(NotificationEvent event)
- throws Exception {
- SentryJSONDropPartitionMessage dropPartitionMessage =
- deserializer.getDropPartitionMessage(event.getMessage());
- String dbName = dropPartitionMessage.getDB();
- String tableName = dropPartitionMessage.getTable();
- List<String> locations = dropPartitionMessage.getLocations();
- if ((dbName == null) || (tableName == null) || (locations == null)) {
- LOGGER.error(String.format("Drop partition event "
- + "has incomplete information. dbName = %s, tableName = %s, location = %s",
- StringUtils.defaultIfBlank(dbName, "null"),
- StringUtils.defaultIfBlank(tableName, "null"),
- locations != null ? locations.toString() : "null"));
- return false;
- }
- String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName);
- removePaths(authzObj, locations, event.getEventId());
- return true;
+ void processDropPartition(String dbName, String tableName,
+ Collection<String> locations, long seqNum)
+ throws Exception {
+ String authzObj = dbName + "." + tableName;
+ removePaths(authzObj, locations, seqNum);
}
/**
* Processes "alter partition" notification event, and applies its corresponding
* snapshot change as well as delta path update into Sentry DB.
*
- * @param event notification event to be processed.
+ * @param dbName database name
+ * @param tableName table name
+ * @param oldLocation old partition location
+ * @param newLocation new partition location
+ * @param seqNum notification event ID
* @throws Exception if encounters errors while persisting the path change
*/
- private boolean processAlterPartition(NotificationEvent event) throws Exception {
- SentryJSONAlterPartitionMessage alterPartitionMessage =
- deserializer.getAlterPartitionMessage(event.getMessage());
- String dbName = alterPartitionMessage.getDB();
- String tableName = alterPartitionMessage.getTable();
- String oldLocation = alterPartitionMessage.getOldLocation();
- String newLocation = alterPartitionMessage.getNewLocation();
-
- if ((dbName == null)
- || (tableName == null)
- || (oldLocation == null)
- || (newLocation == null)) {
- LOGGER.error(String.format("Alter partition event "
- + "has incomplete information. dbName = %s, tableName = %s, "
- + "oldLocation = %s, newLocation = %s",
- StringUtils.defaultIfBlank(dbName, "null"),
- StringUtils.defaultIfBlank(tableName, "null"),
- StringUtils.defaultIfBlank(oldLocation, "null"),
- StringUtils.defaultIfBlank(newLocation, "null")));
- return false;
- }
-
- if (oldLocation.equals(newLocation)) {
- LOGGER.info(String.format("Alter partition notification ignored as"
- + "location has not changed: AuthzObj = %s, Location = %s", dbName + "."
- + "." + tableName, oldLocation));
- return false;
- }
-
+ void processAlterPartition(String dbName, String tableName, String oldLocation,
+ String newLocation, long seqNum) throws Exception {
String oldAuthzObj = dbName + "." + tableName;
- renameAuthzPath(oldAuthzObj, oldAuthzObj, oldLocation, newLocation, event.getEventId());
- return true;
+ renameAuthzPath(oldAuthzObj, oldAuthzObj, oldLocation, newLocation, seqNum);
}
/**
@@ -479,9 +187,10 @@ final class NotificationProcessor {
* @param authzObj the given authzObj
* @param locations a set of paths need to be added
* @param seqNum notification event ID
+ * @throws Exception
*/
private void addPaths(String authzObj, Collection<String> locations, long seqNum)
- throws Exception {
+ throws Exception {
// AuthzObj is case insensitive
authzObj = authzObj.toLowerCase();
@@ -492,13 +201,13 @@ final class NotificationProcessor {
for (String location : locations) {
String pathTree = getPath(location);
if (pathTree == null) {
- LOGGER.debug("HMS Path Update ["
+ LOGGER.debug("#### HMS Path Update ["
+ "OP : addPath, "
+ "authzObj : " + authzObj + ", "
+ "path : " + location + "] - nothing to add" + ", "
+ "notification event ID: " + seqNum + "]");
} else {
- LOGGER.debug("HMS Path Update ["
+ LOGGER.debug("#### HMS Path Update ["
+ "OP : addPath, " + "authzObj : "
+ authzObj + ", "
+ "path : " + location + ", "
@@ -517,9 +226,10 @@ final class NotificationProcessor {
* @param authzObj the given authzObj
* @param locations a set of paths need to be removed
* @param seqNum notification event ID
+ * @throws Exception
*/
private void removePaths(String authzObj, Collection<String> locations, long seqNum)
- throws Exception {
+ throws Exception {
// AuthzObj is case insensitive
authzObj = authzObj.toLowerCase();
@@ -528,13 +238,13 @@ final class NotificationProcessor {
for (String location : locations) {
String pathTree = getPath(location);
if (pathTree == null) {
- LOGGER.debug("HMS Path Update ["
+ LOGGER.debug("#### HMS Path Update ["
+ "OP : removePath, "
+ "authzObj : " + authzObj + ", "
+ "path : " + location + "] - nothing to remove" + ", "
+ "notification event ID: " + seqNum + "]");
} else {
- LOGGER.debug("HMS Path Update ["
+ LOGGER.debug("#### HMS Path Update ["
+ "OP : removePath, "
+ "authzObj : " + authzObj + ", "
+ "path : " + location + ", "
@@ -553,13 +263,14 @@ final class NotificationProcessor {
*
* @param authzObj the given authzObj to be deleted
* @param seqNum notification event ID
+ * @throws Exception
*/
private void removeAllPaths(String authzObj, long seqNum)
- throws Exception {
+ throws Exception {
// AuthzObj is case insensitive
authzObj = authzObj.toLowerCase();
- LOGGER.debug("HMS Path Update ["
+ LOGGER.debug("#### HMS Path Update ["
+ "OP : removeAllPaths, "
+ "authzObj : " + authzObj + ", "
+ "notification event ID: " + seqNum + "]");
@@ -578,19 +289,21 @@ final class NotificationProcessor {
* @param newAuthzObj the new name to be changed to
* @param oldLocation a existing path of the given authzObj
* @param newLocation a new path to be changed to
+ * @param seqNum
+ * @throws Exception
*/
private void renameAuthzPath(String oldAuthzObj, String newAuthzObj, String oldLocation,
- String newLocation, long seqNum) throws Exception {
+ String newLocation, long seqNum) throws Exception {
// AuthzObj is case insensitive
oldAuthzObj = oldAuthzObj.toLowerCase();
newAuthzObj = newAuthzObj.toLowerCase();
String oldPathTree = getPath(oldLocation);
String newPathTree = getPath(newLocation);
- LOGGER.debug("HMS Path Update ["
+ LOGGER.debug("#### HMS Path Update ["
+ "OP : renameAuthzObject, "
+ "oldAuthzObj : " + oldAuthzObj + ", "
- + "newAuthzObj : " + newAuthzObj + ", "
+ + "newAuthzObj : " + newAuthzObj + ", "
+ "oldLocation : " + oldLocation + ", "
+ "newLocation : " + newLocation + ", "
+ "notification event ID: " + seqNum + "]");
@@ -610,10 +323,20 @@ final class NotificationProcessor {
// Both name and location has changed
// - Alter table rename for managed table
sentryStore.renameAuthzPathsMapping(oldAuthzObj, newAuthzObj, oldPathTree,
- newPathTree, update);
+ newPathTree, update);
}
- } else {
- updateAuthzPathsMapping(oldAuthzObj, oldPathTree, newAuthzObj, newPathTree, seqNum);
+ } else if (oldPathTree != null) {
+ PathsUpdate update = new PathsUpdate(seqNum, false);
+ update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree));
+ sentryStore.deleteAuthzPathsMapping(oldAuthzObj,
+ Collections.singleton(oldPathTree),
+ update);
+ } else if (newPathTree != null) {
+ PathsUpdate update = new PathsUpdate(seqNum, false);
+ update.newPathChange(newAuthzObj).addToAddPaths(splitPath(newPathTree));
+ sentryStore.addAuthzPathsMapping(newAuthzObj,
+ Collections.singleton(newPathTree),
+ update);
}
} else if (!oldLocation.equals(newLocation)) {
// Only Location has changed, e.g. Alter table set location
@@ -623,35 +346,27 @@ final class NotificationProcessor {
update.newPathChange(oldAuthzObj).addToAddPaths(splitPath(newPathTree));
sentryStore.updateAuthzPathsMapping(oldAuthzObj, oldPathTree,
newPathTree, update);
- } else {
- updateAuthzPathsMapping(oldAuthzObj, oldPathTree, newAuthzObj, newPathTree, seqNum);
+ } else if (oldPathTree != null) {
+ PathsUpdate update = new PathsUpdate(seqNum, false);
+ update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree));
+ sentryStore.deleteAuthzPathsMapping(oldAuthzObj,
+ Collections.singleton(oldPathTree),
+ update);
+ } else if (newPathTree != null) {
+ PathsUpdate update = new PathsUpdate(seqNum, false);
+ update.newPathChange(oldAuthzObj).addToAddPaths(splitPath(newPathTree));
+ sentryStore.addAuthzPathsMapping(oldAuthzObj,
+ Collections.singleton(newPathTree),
+ update);
}
} else {
LOGGER.error("Update Notification for Auhorizable object {}, with no change, skipping",
- oldAuthzObj);
- throw new SentryInvalidHMSEventException("Update Notification for Authorizable object"
- + "with no change");
+ oldAuthzObj);
+ throw new SentryInvalidHMSEventException("Update Notification for Authorizable object" +
+ "with no change");
}
}
- private void updateAuthzPathsMapping(String oldAuthzObj, String oldPathTree,
- String newAuthzObj, String newPathTree, long seqNum) throws Exception {
- if (oldPathTree != null) {
- PathsUpdate update = new PathsUpdate(seqNum, false);
- update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree));
- sentryStore.deleteAuthzPathsMapping(oldAuthzObj,
- Collections.singleton(oldPathTree),
- update);
- } else if (newPathTree != null) {
- PathsUpdate update = new PathsUpdate(seqNum, false);
- update.newPathChange(newAuthzObj).addToAddPaths(splitPath(newPathTree));
- sentryStore.addAuthzPathsMapping(newAuthzObj,
- Collections.singleton(newPathTree),
- update);
- }
-
- }
-
/**
* Get path tree from a given path. It return null if encounters
* SentryMalformedPathException which indicates a malformed path.
@@ -668,45 +383,15 @@ final class NotificationProcessor {
return null;
}
- private void dropSentryDbPrivileges(String dbName, NotificationEvent event) {
- try {
- TSentryAuthorizable authorizable = new TSentryAuthorizable(authServerName);
- authorizable.setDb(dbName);
- sentryStore.dropPrivilege(authorizable, getPermUpdatableOnDrop(authorizable));
- } catch (SentryNoSuchObjectException e) {
- LOGGER.info("Drop Sentry privilege ignored as there are no privileges on the database: {}",
- dbName);
- } catch (Exception e) {
- LOGGER.error("Could not process Drop database event." + "Event: " + event.toString(), e);
- }
- }
-
- private void dropSentryTablePrivileges(String dbName, String tableName,
- NotificationEvent event) {
- try {
- TSentryAuthorizable authorizable = new TSentryAuthorizable(authServerName);
- authorizable.setDb(dbName);
- authorizable.setTable(tableName);
- sentryStore.dropPrivilege(authorizable, getPermUpdatableOnDrop(authorizable));
- } catch (SentryNoSuchObjectException e) {
- LOGGER.info("Drop Sentry privilege ignored as there are no privileges on the table: {}.{}",
- dbName, tableName);
- } catch (Exception e) {
- LOGGER.error("Could not process Drop table event. Event: " + event.toString(), e);
- }
- }
-
- private void renamePrivileges(String oldDbName, String oldTableName, String newDbName,
- String newTableName) throws
- Exception {
- TSentryAuthorizable oldAuthorizable = new TSentryAuthorizable(authServerName);
- oldAuthorizable.setDb(oldDbName);
- oldAuthorizable.setTable(oldTableName);
- TSentryAuthorizable newAuthorizable = new TSentryAuthorizable(authServerName);
- newAuthorizable.setDb(newDbName);
- newAuthorizable.setTable(newTableName);
- Update update =
- getPermUpdatableOnRename(oldAuthorizable, newAuthorizable);
- sentryStore.renamePrivilege(oldAuthorizable, newAuthorizable, update);
+ /**
+ * Split path into components on the "/" character.
+ * The path should not start with "/".
+ * This is consumed by Thrift interface, so the return result should be
+ * {@code List<String>}
+ * @param path input oath e.g. {@code foo/bar}
+ * @return list of commponents, e.g. [foo, bar]
+ */
+ private List<String> splitPath(String path) {
+ return (Lists.newArrayList(path.split("/")));
}
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHmsClient.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHmsClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHmsClient.java
deleted file mode 100644
index 29a85d7..0000000
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHmsClient.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
- <p>
- http://www.apache.org/licenses/LICENSE-2.0
- <p>
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
-
-package org.apache.sentry.service.thrift;
-
-import static com.codahale.metrics.MetricRegistry.name;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Timer;
-import com.codahale.metrics.Timer.Context;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import java.io.IOException;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
-import org.apache.sentry.provider.db.service.persistent.PathsImage;
-import org.apache.sentry.provider.db.service.persistent.SentryStore;
-import org.apache.sentry.provider.db.service.thrift.SentryMetrics;
-
-import org.apache.thrift.TException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Wrapper class for <Code>HiveMetaStoreClient</Code>
- *
- * <p>Abstracts communication with HMS and exposes APi's to connect/disconnect to HMS and to
- * request HMS snapshots and also for new notifications.
- */
-final class SentryHmsClient implements AutoCloseable {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(SentryHmsClient.class);
- private final Configuration conf;
- private HiveMetaStoreClient client = null;
- private HiveConnectionFactory hiveConnectionFactory;
-
- private static final String SNAPSHOT = "snapshot";
- /** Measures time to get full snapshot. */
- private final Timer updateTimer = SentryMetrics.getInstance()
- .getTimer(name(FullUpdateInitializer.class, SNAPSHOT));
- /** Number of times update failed. */
- private final Counter failedSnapshotsCount = SentryMetrics.getInstance()
- .getCounter(name(FullUpdateInitializer.class, "failed"));
-
- SentryHmsClient(Configuration conf, HiveConnectionFactory hiveConnectionFactory) {
- this.conf = conf;
- this.hiveConnectionFactory = hiveConnectionFactory;
- }
-
- /**
- * Used only for testing purposes.
- *x
- * @param client HiveMetaStoreClient to be initialized
- */
- @VisibleForTesting
- void setClient(HiveMetaStoreClient client) {
- this.client = client;
- }
-
- /**
- * Used to know if the client is connected to HMS
- *
- * @return true if the client is connected to HMS false, if client is not connected.
- */
- boolean isConnected() {
- return client != null;
- }
-
- /**
- * Connects to HMS by creating HiveMetaStoreClient.
- *
- * @throws IOException if could not establish connection
- * @throws InterruptedException if connection was interrupted
- * @throws MetaException if other errors happened
- */
- void connect()
- throws IOException, InterruptedException, MetaException {
- if (client != null) {
- return;
- }
- client = hiveConnectionFactory.connect().getClient();
- }
-
- /**
- * Disconnects the HMS client.
- */
- public void disconnect() throws Exception {
- try {
- if (client != null) {
- LOGGER.info("Closing the HMS client connection");
- client.close();
- }
- } catch (Exception e) {
- LOGGER.error("failed to close Hive Connection Factory", e);
- } finally {
- client = null;
- }
- }
-
- /**
- * Closes the HMS client.
- *
- * <p>This is similar to disconnect. As this class implements AutoClosable, close should be
- * implemented.
- */
- public void close() throws Exception {
- disconnect();
- }
-
- /**
- * Creates HMS full snapshot.
- *
- * @return Full path snapshot and the last notification id on success
- */
- PathsImage getFullSnapshot() {
- try {
- if (client == null) {
- LOGGER.error("Client is not connected to HMS");
- return new PathsImage(Collections.<String, Set<String>>emptyMap(),
- SentryStore.EMPTY_NOTIFICATION_ID, SentryStore.EMPTY_PATHS_SNAPSHOT_ID);
- }
-
- CurrentNotificationEventId eventIdBefore = client.getCurrentNotificationEventId();
- Map<String, Set<String>> pathsFullSnapshot = fetchFullUpdate();
- if (pathsFullSnapshot.isEmpty()) {
- return new PathsImage(pathsFullSnapshot, SentryStore.EMPTY_NOTIFICATION_ID,
- SentryStore.EMPTY_PATHS_SNAPSHOT_ID);
- }
-
- CurrentNotificationEventId eventIdAfter = client.getCurrentNotificationEventId();
- LOGGER.info("NotificationID, Before Snapshot: {}, After Snapshot {}",
- eventIdBefore.getEventId(), eventIdAfter.getEventId());
- // To ensure point-in-time snapshot consistency, need to make sure
- // there were no HMS updates while retrieving the snapshot. If there are updates, snapshot
- // is discarded. New attempt will be made after 500 milliseconds when
- // HmsFollower runs again.
- if (!eventIdBefore.equals(eventIdAfter)) {
- LOGGER.error("Snapshot discarded, updates to HMS data while shapshot is being taken."
- + "ID Before: {}. ID After: {}", eventIdBefore.getEventId(), eventIdAfter.getEventId());
- return new PathsImage(Collections.<String, Set<String>>emptyMap(),
- SentryStore.EMPTY_NOTIFICATION_ID, SentryStore.EMPTY_PATHS_SNAPSHOT_ID);
- }
-
- LOGGER.info("Successfully fetched hive full snapshot, Current NotificationID: {}.",
- eventIdAfter);
- // As eventIDAfter is the last event that was processed, eventIDAfter is used to update
- // lastProcessedNotificationID instead of getting it from persistent store.
- return new PathsImage(pathsFullSnapshot, eventIdAfter.getEventId(),
- SentryStore.EMPTY_PATHS_SNAPSHOT_ID);
- } catch (TException failure) {
- LOGGER.error("Failed to communicate to HMS");
- return new PathsImage(Collections.<String, Set<String>>emptyMap(),
- SentryStore.EMPTY_NOTIFICATION_ID, SentryStore.EMPTY_PATHS_SNAPSHOT_ID);
- }
- }
-
- /**
- * Retrieve a Hive full snapshot from HMS.
- *
- * @return HMS snapshot. Snapshot consists of a mapping from auth object name to the set of paths
- * corresponding to that name.
- */
- private Map<String, Set<String>> fetchFullUpdate() {
- LOGGER.info("Request full HMS snapshot");
- try (FullUpdateInitializer updateInitializer =
- new FullUpdateInitializer(hiveConnectionFactory, conf);
- Context context = updateTimer.time()) {
- Map<String, Set<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot();
- LOGGER.info("Obtained full HMS snapshot");
- return pathsUpdate;
- } catch (Exception ignored) {
- failedSnapshotsCount.inc();
- LOGGER.error("Snapshot created failed ", ignored);
- return Collections.emptyMap();
- }
- }
-
- /**
- * Returns all HMS notifications with ID greater than the specified one
- *
- * @param notificationId ID of the last notification that was processed.
- * @return Collection of new events to be synced
- */
- Collection<NotificationEvent> getNotifications(long notificationId) throws Exception {
- if (client == null) {
- LOGGER.error("Client is not connected to HMS");
- return Collections.emptyList();
- }
- LOGGER.debug("Checking for notifications beyond {}", notificationId);
- // HIVE-15761: Currently getNextNotification API may return an empty
- // NotificationEventResponse causing TProtocolException.
- // Workaround: Only processes the notification events newer than the last updated one.
- CurrentNotificationEventId eventId = client.getCurrentNotificationEventId();
- LOGGER.debug("ID of Last HMS notifications is: {}", eventId.getEventId());
- if (eventId.getEventId() < notificationId) {
- LOGGER.error("Last notification of HMS is smaller than what sentry processed, Something is"
- + "wrong. Sentry will request a full Snapshot");
- // TODO Path Mapping info should be cleared so that HmsFollower would request for full
- // snapshot in the subsequent run.
- return Collections.emptyList();
- }
-
- if (eventId.getEventId() == notificationId) {
- return Collections.emptyList();
- }
-
- NotificationEventResponse response =
- client.getNextNotification(notificationId, Integer.MAX_VALUE, null);
- if (response.isSetEvents()) {
- LOGGER.debug("Last Id processed:{}. Received collection of notifications, Size:{}",
- notificationId, response.getEvents().size());
- return response.getEvents();
- }
-
- return Collections.emptyList();
- }
-}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
index adb2030..6014a79 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
@@ -98,7 +98,7 @@ public class SentryService implements Callable, SigUtils.SigListener {
private final String keytab;
private final ExecutorService serviceExecutor;
private ScheduledExecutorService hmsFollowerExecutor = null;
- private HmsFollower hmsFollower = null;
+ private HMSFollower hmsFollower = null;
private Future serviceStatus;
private TServer thriftServer;
private Status status;
@@ -108,7 +108,7 @@ public class SentryService implements Callable, SigUtils.SigListener {
/*
sentryStore provides the data access for sentry data. It is the singleton instance shared
between various {@link SentryPolicyService}, i.e., {@link SentryPolicyStoreProcessor} and
- {@link HmsFollower}.
+ {@link HMSFollower}.
*/
private final SentryStore sentryStore;
private ScheduledExecutorService sentryStoreCleanService;
@@ -283,11 +283,11 @@ public class SentryService implements Callable, SigUtils.SigListener {
String metastoreURI = SentryServiceUtil.getHiveMetastoreURI();
if (metastoreURI == null) {
- LOGGER.info("Metastore uri is not configured. Do not start HmsFollower");
+ LOGGER.info("Metastore uri is not configured. Do not start HMSFollower");
return;
}
- LOGGER.info("Starting HmsFollower to HMS {}", metastoreURI);
+ LOGGER.info("Starting HMSFollower to HMS {}", metastoreURI);
Preconditions.checkState(hmsFollower == null);
Preconditions.checkState(hmsFollowerExecutor == null);
@@ -295,7 +295,8 @@ public class SentryService implements Callable, SigUtils.SigListener {
hiveConnectionFactory = new HiveSimpleConnectionFactory(conf, new HiveConf());
hiveConnectionFactory.init();
- hmsFollower = new HmsFollower(conf, sentryStore, leaderMonitor, hiveConnectionFactory);
+ hmsFollower = new HMSFollower(conf, sentryStore, leaderMonitor, hiveConnectionFactory);
+
long initDelay = conf.getLong(ServerConfig.SENTRY_HMSFOLLOWER_INIT_DELAY_MILLS,
ServerConfig.SENTRY_HMSFOLLOWER_INIT_DELAY_MILLS_DEFAULT);
long period = conf.getLong(ServerConfig.SENTRY_HMSFOLLOWER_INTERVAL_MILLS,
@@ -308,7 +309,7 @@ public class SentryService implements Callable, SigUtils.SigListener {
hmsFollowerExecutor.scheduleAtFixedRate(hmsFollower,
initDelay, period, TimeUnit.MILLISECONDS);
} catch (IllegalArgumentException e) {
- LOGGER.error(String.format("Could not start HmsFollower due to illegal argument. period is %s ms",
+ LOGGER.error(String.format("Could not start HMSFollower due to illegal argument. period is %s ms",
period), e);
throw e;
}
@@ -349,8 +350,8 @@ public class SentryService implements Callable, SigUtils.SigListener {
try {
// close connections
hmsFollower.close();
- } catch (Exception ex) {
- LOGGER.error("HmsFollower.close() failed", ex);
+ } catch (RuntimeException ex) {
+ LOGGER.error("HMSFollower.close() failed", ex);
} finally {
hmsFollower = null;
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceUtil.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceUtil.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceUtil.java
index 5826766..9c3e485 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceUtil.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceUtil.java
@@ -30,12 +30,9 @@ import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.sentry.core.common.exception.SentryInvalidInputException;
import org.apache.sentry.core.common.utils.SentryConstants;
import org.apache.sentry.core.common.utils.KeyValue;
import org.apache.sentry.core.common.utils.PolicyFileConstants;
-import org.apache.sentry.provider.db.service.persistent.SentryStore;
-import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable;
import org.apache.sentry.provider.db.service.thrift.TSentryGrantOption;
import org.apache.sentry.provider.db.service.thrift.TSentryPrivilege;
import org.apache.sentry.service.thrift.ServiceConstants.PrivilegeScope;
@@ -222,38 +219,8 @@ public final class SentryServiceUtil {
return hiveConf.get(METASTOREURIS.varname);
}
- /**
- * Derives object name from database and table names by concatenating them
- *
- * @param authorizable for which is name is to be derived
- * @return authorizable name
- * @throws SentryInvalidInputException if argument provided does not have all the
- * required fields set.
- */
- public static String getAuthzObj(TSentryAuthorizable authorizable)
- throws SentryInvalidInputException {
- return getAuthzObj(authorizable.getDb(), authorizable.getTable());
- }
-
- /**
- * Derives object name from database and table names by concatenating them
- *
- * @param dbName
- * @param tblName
- * @return authorizable name
- * @throws SentryInvalidInputException if argument provided does not have all the
- * required fields set.
- */
- public static String getAuthzObj(String dbName, String tblName)
- throws SentryInvalidInputException {
- if (SentryStore.isNULL(dbName)) {
- throw new SentryInvalidInputException("Invalif input, DB name is missing");
- }
- return SentryStore.isNULL(tblName) ? dbName.toLowerCase() :
- (dbName + "." + tblName).toLowerCase();
- }
-
private SentryServiceUtil() {
// Make constructor private to avoid instantiation
}
+
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
index cbbd3ad..6e22875 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
@@ -165,7 +165,7 @@ public class ServiceConstants {
.put("javax.jdo.option.Multithreaded", "true")
.build();
- // InitialDelay and period time for HmsFollower thread.
+ // InitialDelay and period time for HMSFollower thread.
public static final String SENTRY_HMSFOLLOWER_INIT_DELAY_MILLS = "sentry.hmsfollower.init.delay.mills";
public static final long SENTRY_HMSFOLLOWER_INIT_DELAY_MILLS_DEFAULT = 0;
public static final String SENTRY_HMSFOLLOWER_INTERVAL_MILLS = "sentry.hmsfollower.interval.mills";
http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
index a8ebf7c..51f6c5d 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java
@@ -2697,6 +2697,7 @@ public class TestSentryStore extends org.junit.Assert {
@Test
public void testRenameUpdateAfterReplacingANewPathsImage() throws Exception {
Map<String, Set<String>> authzPaths = new HashMap<>();
+
// First image to persist (this will be replaced later)
authzPaths.put("db1.table1", Sets.newHashSet("/user/hive/warehouse/db2.db/table1.1",
"/user/hive/warehouse/db2.db/table1.2"));
http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
index 2095469..66ad2a1 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
@@ -16,115 +16,75 @@
*/
package org.apache.sentry.service.thrift;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.*;
import org.apache.hive.hcatalog.messaging.HCatEventMessage;
import org.apache.hive.hcatalog.messaging.HCatEventMessage.EventType;
import org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageFactory;
import org.apache.sentry.hdfs.Updateable;
import org.apache.sentry.provider.db.service.persistent.SentryStore;
import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
+
+import java.util.Arrays;
+
import org.junit.Test;
+import org.junit.Ignore;
import org.mockito.Mockito;
-import javax.security.auth.login.LoginException;
-
-public class TestHmsFollower {
+import java.util.ArrayList;
+import java.util.List;
- private final static String hiveInstance = "server2";
- private final static Configuration configuration = new Configuration();
- private final SentryJSONMessageFactory messageFactory = new SentryJSONMessageFactory();
- private final SentryStore sentryStore = Mockito.mock(SentryStore.class);
- private static HiveSimpleConnectionFactory hiveConnectionFactory;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.reset;
- @BeforeClass
- public static void setup() throws IOException, LoginException {
- hiveConnectionFactory = new HiveSimpleConnectionFactory(configuration, new HiveConf());
- hiveConnectionFactory.init();
- configuration.set("sentry.hive.sync.create", "true");
- }
+public class TestHMSFollower {
+ SentryJSONMessageFactory messageFactory = new SentryJSONMessageFactory();
+ SentryStore sentryStore = Mockito.mock(SentryStore.class);
+ final static String hiveInstance = "server2";
- /**
- * Constructs create database event and makes sure that appropriate sentry store API's
- * are invoke when the event is processed by hms follower.
- *
- * @throws Exception
- */
@Test
public void testCreateDatabase() throws Exception {
String dbName = "db1";
// Create notification events
- NotificationEvent notificationEvent = new NotificationEvent(1, 0,
- HCatEventMessage.EventType.CREATE_DATABASE.toString(),
- messageFactory.buildCreateDatabaseMessage(new Database(dbName, null, "hdfs:///db1", null))
- .toString());
+ NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.CREATE_DATABASE.toString(),
+ messageFactory.buildCreateDatabaseMessage(new Database(dbName, null, "hdfs:///db1", null)).toString());
List<NotificationEvent> events = new ArrayList<>();
events.add(notificationEvent);
- HmsFollower hmsFollower = new HmsFollower(configuration, sentryStore, null,
- hiveConnectionFactory, hiveInstance);
- hmsFollower.processNotifications(events);
+
+ Configuration configuration = new Configuration();
+ HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance);
+ hmsFollower.processNotificationEvents(events);
TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
authorizable.setServer(hiveInstance);
authorizable.setDb("db1");
- verify(sentryStore, times(1))
- .dropPrivilege(authorizable, NotificationProcessor.getPermUpdatableOnDrop(authorizable));
+ verify(sentryStore, times(0)).dropPrivilege(authorizable, HMSFollower.onDropSentryPrivilege(authorizable));
}
- /**
- * Constructs drop database event and makes sure that appropriate sentry store API's
- * are invoke when the event is processed by hms follower.
- *
- * @throws Exception
- */
@Test
public void testDropDatabase() throws Exception {
String dbName = "db1";
// Create notification events
- NotificationEvent notificationEvent = new NotificationEvent(1, 0,
- HCatEventMessage.EventType.DROP_DATABASE.toString(),
- messageFactory.buildDropDatabaseMessage(new Database(dbName, null, "hdfs:///db1", null))
- .toString());
+ NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.DROP_DATABASE.toString(),
+ messageFactory.buildDropDatabaseMessage(new Database(dbName, null, "hdfs:///db1", null)).toString());
List<NotificationEvent> events = new ArrayList<>();
events.add(notificationEvent);
- HmsFollower hmsFollower = new HmsFollower(configuration, sentryStore, null,
- hiveConnectionFactory, hiveInstance);
- hmsFollower.processNotifications(events);
+ Configuration configuration = new Configuration();
+ HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance);
+ hmsFollower.processNotificationEvents(events);
TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
authorizable.setServer(hiveInstance);
authorizable.setDb("db1");
- verify(sentryStore, times(1))
- .dropPrivilege(authorizable, NotificationProcessor.getPermUpdatableOnDrop(authorizable));
+ verify(sentryStore, times(1)).dropPrivilege(authorizable, HMSFollower.onDropSentryPrivilege(authorizable));
}
- /**
- * Constructs create table event and makes sure that appropriate sentry store API's
- * are invoke when the event is processed by hms follower.
- *
- * @throws Exception
- */
@Test
public void testCreateTable() throws Exception {
String dbName = "db1";
@@ -133,33 +93,23 @@ public class TestHmsFollower {
// Create notification events
StorageDescriptor sd = new StorageDescriptor();
sd.setLocation("hdfs:///db1.db/table1");
- NotificationEvent notificationEvent = new NotificationEvent(1, 0,
- HCatEventMessage.EventType.CREATE_TABLE.toString(),
- messageFactory.buildCreateTableMessage(
- new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null))
- .toString());
+ NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.CREATE_TABLE.toString(),
+ messageFactory.buildCreateTableMessage(new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null)).toString());
List<NotificationEvent> events = new ArrayList<>();
events.add(notificationEvent);
- HmsFollower hmsFollower = new HmsFollower(configuration, sentryStore, null,
- hiveConnectionFactory, hiveInstance);
- hmsFollower.processNotifications(events);
+ Configuration configuration = new Configuration();
+ HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance);
+ hmsFollower.processNotificationEvents(events);
TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
authorizable.setServer(hiveInstance);
authorizable.setDb("db1");
authorizable.setTable(tableName);
- verify(sentryStore, times(1))
- .dropPrivilege(authorizable, NotificationProcessor.getPermUpdatableOnDrop(authorizable));
+ verify(sentryStore, times(0)).dropPrivilege(authorizable, HMSFollower.onDropSentryPrivilege(authorizable));
}
- /**
- * Constructs drop table event and makes sure that appropriate sentry store API's
- * are invoke when the event is processed by hms follower.
- *
- * @throws Exception
- */
@Test
public void testDropTable() throws Exception {
String dbName = "db1";
@@ -168,33 +118,23 @@ public class TestHmsFollower {
// Create notification events
StorageDescriptor sd = new StorageDescriptor();
sd.setLocation("hdfs:///db1.db/table1");
- NotificationEvent notificationEvent = new NotificationEvent(1, 0,
- HCatEventMessage.EventType.DROP_TABLE.toString(),
- messageFactory.buildDropTableMessage(
- new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null))
- .toString());
+ NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.DROP_TABLE.toString(),
+ messageFactory.buildDropTableMessage(new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null)).toString());
List<NotificationEvent> events = new ArrayList<>();
events.add(notificationEvent);
- HmsFollower hmsFollower = new HmsFollower(configuration, sentryStore, null,
- hiveConnectionFactory, hiveInstance);
- hmsFollower.processNotifications(events);
+ Configuration configuration = new Configuration();
+ HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance);
+ hmsFollower.processNotificationEvents(events);
TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
authorizable.setServer(hiveInstance);
authorizable.setDb("db1");
authorizable.setTable(tableName);
- verify(sentryStore, times(1))
- .dropPrivilege(authorizable, NotificationProcessor.getPermUpdatableOnDrop(authorizable));
+ verify(sentryStore, times(1)).dropPrivilege(authorizable, HMSFollower.onDropSentryPrivilege(authorizable));
}
- /**
- * Constructs rename table event and makes sure that appropriate sentry store API's
- * are invoke when the event is processed by hms follower.
- *
- * @throws Exception
- */
@Test
public void testRenameTable() throws Exception {
String dbName = "db1";
@@ -206,20 +146,18 @@ public class TestHmsFollower {
// Create notification events
StorageDescriptor sd = new StorageDescriptor();
sd.setLocation("hdfs:///db1.db/table1");
- NotificationEvent notificationEvent = new NotificationEvent(1, 0,
- HCatEventMessage.EventType.ALTER_TABLE.toString(),
- messageFactory.buildAlterTableMessage(
- new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null),
- new Table(newTableName, newDbName, null, 0, 0, 0, sd, null, null, null, null, null))
- .toString());
+ NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.ALTER_TABLE.toString(),
+ messageFactory.buildAlterTableMessage(
+ new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null),
+ new Table(newTableName, newDbName, null, 0, 0, 0, sd, null, null, null, null, null)).toString());
notificationEvent.setDbName(newDbName);
notificationEvent.setTableName(newTableName);
List<NotificationEvent> events = new ArrayList<>();
events.add(notificationEvent);
- HmsFollower hmsFollower = new HmsFollower(configuration, sentryStore, null,
- hiveConnectionFactory, hiveInstance);
- hmsFollower.processNotifications(events);
+ Configuration configuration = new Configuration();
+ HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance);
+ hmsFollower.processNotificationEvents(events);
TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
authorizable.setServer(hiveInstance);
@@ -231,20 +169,11 @@ public class TestHmsFollower {
newAuthorizable.setDb(newDbName);
newAuthorizable.setTable(newTableName);
- verify(sentryStore, times(1)).renamePrivilege(authorizable, newAuthorizable,
- NotificationProcessor.getPermUpdatableOnRename(authorizable, newAuthorizable));
+ verify(sentryStore, times(1)).renamePrivilege(authorizable, newAuthorizable, HMSFollower.onRenameSentryPrivilege(authorizable, newAuthorizable));
}
@Ignore
- /**
- * Constructs a bunch of events and passed to processor of hms follower. One of those is alter
- * partition event with out actually changing anything(invalid event). Idea is to make sure that
- * hms follower calls appropriate sentry store API's for the events processed by hms follower
- * after processing the invalid alter partition event.
- *
- * @throws Exception
- */
@Test
public void testAlterPartitionWithInvalidEvent() throws Exception {
String dbName = "db1";
@@ -252,38 +181,35 @@ public class TestHmsFollower {
String tableName2 = "table2";
long inputEventId = 1;
List<NotificationEvent> events = new ArrayList<>();
- NotificationEvent notificationEvent;
+ NotificationEvent notificationEvent = null;
List<FieldSchema> partCols;
- StorageDescriptor sd;
+ StorageDescriptor sd = null;
Mockito.doNothing().when(sentryStore).persistLastProcessedNotificationID(Mockito.anyLong());
- //noinspection unchecked
Mockito.doNothing().when(sentryStore).addAuthzPathsMapping(Mockito.anyString(),
- Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
+ Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
Configuration configuration = new Configuration();
- HmsFollower hmsFollower = new HmsFollower(configuration, sentryStore, null,
- hiveConnectionFactory, hiveInstance);
+ HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance);
+
// Create a table
sd = new StorageDescriptor();
sd.setLocation("hdfs://db1.db/table1");
- partCols = new ArrayList<>();
+ partCols = new ArrayList<FieldSchema>();
partCols.add(new FieldSchema("ds", "string", ""));
- Table table = new Table(tableName1, dbName, null, 0, 0, 0, sd, partCols, null, null, null,
- null);
+ Table table = new Table(tableName1, dbName, null, 0, 0, 0, sd, partCols, null, null, null, null);
notificationEvent = new NotificationEvent(inputEventId, 0,
- HCatEventMessage.EventType.CREATE_TABLE.toString(),
- messageFactory.buildCreateTableMessage(table).toString());
+ HCatEventMessage.EventType.CREATE_TABLE.toString(),
+ messageFactory.buildCreateTableMessage(table).toString());
notificationEvent.setDbName(dbName);
notificationEvent.setTableName(tableName1);
events.add(notificationEvent);
inputEventId += 1;
// Process the notification
- hmsFollower.processNotifications(events);
+ hmsFollower.processNotificationEvents(events);
// Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification
// and persistLastProcessedNotificationID was not invoked.
- //noinspection unchecked
verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(),
- Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
+ Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
reset(sentryStore);
events.clear();
@@ -292,22 +218,21 @@ public class TestHmsFollower {
List<Partition> partitions = new ArrayList<>();
StorageDescriptor invalidSd = new StorageDescriptor();
invalidSd.setLocation(null);
- Partition partition = new Partition(Collections.singletonList("today"), dbName, tableName1,
- 0, 0, sd, null);
+ Partition partition = new Partition(Arrays.asList("today"), dbName, tableName1,
+ 0, 0, sd, null);
partitions.add(partition);
notificationEvent = new NotificationEvent(inputEventId, 0, EventType.ADD_PARTITION.toString(),
- messageFactory.buildAddPartitionMessage(table, partitions).toString());
+ messageFactory.buildAddPartitionMessage(table, partitions).toString());
notificationEvent.setDbName(dbName);
notificationEvent.setTableName(tableName1);
events.add(notificationEvent);
inputEventId += 1;
//Process the notification
- hmsFollower.processNotifications(events);
+ hmsFollower.processNotificationEvents(events);
// Make sure that addAuthzPathsMapping was invoked once to handle ADD_PARTITION notification
// and persistLastProcessedNotificationID was not invoked.
- //noinspection unchecked
verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(),
- Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
+ Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
reset(sentryStore);
events.clear();
@@ -316,13 +241,13 @@ public class TestHmsFollower {
// This is an invalid event and should be processed by sentry store.
// Event Id should be explicitly persisted using persistLastProcessedNotificationID
notificationEvent = new NotificationEvent(inputEventId, 0, EventType.ALTER_PARTITION.toString(),
- messageFactory.buildAlterPartitionMessage(partition, partition).toString());
+ messageFactory.buildAlterPartitionMessage(partition, partition).toString());
notificationEvent.setDbName(dbName);
notificationEvent.setTableName(tableName1);
events.add(notificationEvent);
inputEventId += 1;
// Process the notification
- hmsFollower.processNotifications(events);
+ hmsFollower.processNotificationEvents(events);
// Make sure that persistLastProcessedNotificationID is invoked explicitly.
verify(sentryStore, times(1)).persistLastProcessedNotificationID(inputEventId - 1);
reset(sentryStore);
@@ -330,21 +255,21 @@ public class TestHmsFollower {
// Create a alter notification with some actual change.
sd = new StorageDescriptor();
- sd.setLocation("hdfs://user/hive/warehouse/db1.db/table1");
+ sd.setLocation("hdfs://user/hive/wareshouse/db1.db/table1");
Partition updatedPartition = new Partition(partition);
updatedPartition.setSd(sd);
notificationEvent = new NotificationEvent(inputEventId, 0, EventType.ALTER_PARTITION.toString(),
- messageFactory.buildAlterPartitionMessage(partition, updatedPartition).toString());
+ messageFactory.buildAlterPartitionMessage(partition, updatedPartition).toString());
notificationEvent.setDbName(dbName);
notificationEvent.setTableName(tableName1);
events.add(notificationEvent);
inputEventId += 1;
// Process the notification
- hmsFollower.processNotifications(events);
+ hmsFollower.processNotificationEvents(events);
// Make sure that updateAuthzPathsMapping was invoked once to handle ALTER_PARTITION
// notification and persistLastProcessedNotificationID was not invoked.
verify(sentryStore, times(1)).updateAuthzPathsMapping(Mockito.anyString(),
- Mockito.anyString(), Mockito.anyString(), Mockito.any(Updateable.Update.class));
+ Mockito.anyString(), Mockito.anyString(), Mockito.any(Updateable.Update.class));
verify(sentryStore, times(0)).persistLastProcessedNotificationID(inputEventId - 1);
reset(sentryStore);
events.clear();
@@ -352,34 +277,24 @@ public class TestHmsFollower {
// Create a table
sd = new StorageDescriptor();
sd.setLocation("hdfs://db1.db/table2");
- partCols = new ArrayList<>();
+ partCols = new ArrayList<FieldSchema>();
partCols.add(new FieldSchema("ds", "string", ""));
- Table table1 = new Table(tableName2, dbName, null, 0, 0, 0, sd, partCols, null, null, null,
- null);
+ Table table1 = new Table(tableName2, dbName, null, 0, 0, 0, sd, partCols, null, null, null, null);
notificationEvent = new NotificationEvent(inputEventId, 0,
- HCatEventMessage.EventType.CREATE_TABLE.toString(),
- messageFactory.buildCreateTableMessage(table1).toString());
+ HCatEventMessage.EventType.CREATE_TABLE.toString(),
+ messageFactory.buildCreateTableMessage(table1).toString());
notificationEvent.setDbName(dbName);
notificationEvent.setTableName(tableName2);
events.add(notificationEvent);
// Process the notification
- hmsFollower.processNotifications(events);
+ hmsFollower.processNotificationEvents(events);
// Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification
// and persistLastProcessedNotificationID was not invoked.
- //noinspection unchecked
verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(),
- Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
+ Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
}
- /**
- * Constructs a bunch of events and passed to processor of hms follower. One of those is alter
- * table event with out actually changing anything(invalid event). Idea is to make sure that
- * hms follower calls appropriate sentry store API's for the events processed by hms follower
- * after processing the invalid alter table event.
- *
- * @throws Exception
- */
@Test
public void testAlterTableWithInvalidEvent() throws Exception {
String dbName = "db1";
@@ -387,67 +302,61 @@ public class TestHmsFollower {
String tableName2 = "table2";
long inputEventId = 1;
List<NotificationEvent> events = new ArrayList<>();
- NotificationEvent notificationEvent;
+ NotificationEvent notificationEvent = null;
List<FieldSchema> partCols;
- StorageDescriptor sd;
+ StorageDescriptor sd = null;
Mockito.doNothing().when(sentryStore).persistLastProcessedNotificationID(Mockito.anyLong());
- //noinspection unchecked
Mockito.doNothing().when(sentryStore).addAuthzPathsMapping(Mockito.anyString(),
- Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
+ Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
Configuration configuration = new Configuration();
- HmsFollower hmsFollower = new HmsFollower(configuration, sentryStore, null,
- hiveConnectionFactory, hiveInstance);
+ HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance);
// Create a table
sd = new StorageDescriptor();
sd.setLocation("hdfs://db1.db/table1");
- partCols = new ArrayList<>();
+ partCols = new ArrayList<FieldSchema>();
partCols.add(new FieldSchema("ds", "string", ""));
- Table table = new Table(tableName1, dbName, null, 0, 0, 0, sd, partCols, null, null, null,
- null);
+ Table table = new Table(tableName1, dbName, null, 0, 0, 0, sd, partCols, null, null, null, null);
notificationEvent = new NotificationEvent(inputEventId, 0,
- HCatEventMessage.EventType.CREATE_TABLE.toString(),
- messageFactory.buildCreateTableMessage(table).toString());
+ HCatEventMessage.EventType.CREATE_TABLE.toString(),
+ messageFactory.buildCreateTableMessage(table).toString());
notificationEvent.setDbName(dbName);
notificationEvent.setTableName(tableName1);
events.add(notificationEvent);
inputEventId += 1;
// Process the notification
- hmsFollower.processNotifications(events);
+ hmsFollower.processNotificationEvents(events);
// Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification
// and persistLastProcessedNotificationID was not invoked.
- //noinspection unchecked
verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(),
- Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
+ Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
reset(sentryStore);
events.clear();
- // Create alter table notification with out actually changing anything.
+
+ // Create alter table notification with actuall changing anything.
// This notification should not be processed by sentry server
// Notification should be persisted explicitly
- notificationEvent = new NotificationEvent(1, 0,
- HCatEventMessage.EventType.ALTER_TABLE.toString(),
- messageFactory.buildAlterTableMessage(
- new Table(tableName1, dbName, null, 0, 0, 0, sd, null, null, null, null, null),
- new Table(tableName1, dbName, null, 0, 0, 0, sd, null, null, null, null, null))
- .toString());
+ notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.ALTER_TABLE.toString(),
+ messageFactory.buildAlterTableMessage(
+ new Table(tableName1, dbName, null, 0, 0, 0, sd, null, null, null, null, null),
+ new Table(tableName1, dbName, null, 0, 0, 0, sd, null, null, null, null, null)).toString());
notificationEvent.setDbName(dbName);
notificationEvent.setTableName(tableName1);
events = new ArrayList<>();
events.add(notificationEvent);
inputEventId += 1;
// Process the notification
- hmsFollower.processNotifications(events);
+ hmsFollower.processNotificationEvents(events);
// Make sure that renameAuthzObj and deleteAuthzPathsMapping were not invoked
// to handle CREATE_TABLE notification
// and persistLastProcessedNotificationID is explicitly invoked
verify(sentryStore, times(0)).renameAuthzObj(Mockito.anyString(), Mockito.anyString(),
- Mockito.any(Updateable.Update.class));
- //noinspection unchecked
+ Mockito.any(Updateable.Update.class));
verify(sentryStore, times(0)).deleteAuthzPathsMapping(Mockito.anyString(),
- Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
+ Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
verify(sentryStore, times(1)).persistLastProcessedNotificationID(Mockito.anyLong());
reset(sentryStore);
events.clear();
@@ -455,78 +364,21 @@ public class TestHmsFollower {
// Create a table
sd = new StorageDescriptor();
sd.setLocation("hdfs://db1.db/table2");
- partCols = new ArrayList<>();
+ partCols = new ArrayList<FieldSchema>();
partCols.add(new FieldSchema("ds", "string", ""));
- Table table1 = new Table(tableName2, dbName, null, 0, 0, 0, sd, partCols, null, null, null,
- null);
+ Table table1 = new Table(tableName2, dbName, null, 0, 0, 0, sd, partCols, null, null, null, null);
notificationEvent = new NotificationEvent(inputEventId, 0,
- HCatEventMessage.EventType.CREATE_TABLE.toString(),
- messageFactory.buildCreateTableMessage(table1).toString());
+ HCatEventMessage.EventType.CREATE_TABLE.toString(),
+ messageFactory.buildCreateTableMessage(table1).toString());
notificationEvent.setDbName(dbName);
notificationEvent.setTableName(tableName2);
events.add(notificationEvent);
// Process the notification
- hmsFollower.processNotifications(events);
+ hmsFollower.processNotificationEvents(events);
// Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification
// and persistLastProcessedNotificationID was not invoked.
- //noinspection unchecked
verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(),
- Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
+ Mockito.anyCollection(), Mockito.any(Updateable.Update.class));
verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
}
-
- /**
- * Constructs a two events and passed to processor of hms follower. First one being create table
- * event with location information(Invalid Event). Idea is to make sure that hms follower calls
- * appropriate sentry store API's for the event processed by hms follower after processing the
- * invalid create table event.
- *
- * @throws Exception
- */
- public void testCreateTableAfterInvalidEvent() throws Exception {
- String dbName = "db1";
- String tableName = "table1";
- long inputEventId = 1;
-
- Mockito.doNothing().when(sentryStore).persistLastProcessedNotificationID(Mockito.anyLong());
- //noinspection unchecked
- Mockito.doNothing().when(sentryStore)
- .addAuthzPathsMapping(Mockito.anyString(), Mockito.anyCollection(),
- Mockito.any(Updateable.Update.class));
-
- // Create invalid notification event. The location of the storage descriptor is null, which is invalid for creating table
- StorageDescriptor invalidSd = new StorageDescriptor();
- invalidSd.setLocation(null);
- NotificationEvent invalidNotificationEvent = new NotificationEvent(inputEventId, 0,
- HCatEventMessage.EventType.CREATE_TABLE.toString(),
- messageFactory.buildCreateTableMessage(
- new Table(tableName, dbName, null, 0, 0, 0, invalidSd, null, null, null, null, null))
- .toString());
-
- // Create valid notification event
- StorageDescriptor sd = new StorageDescriptor();
- sd.setLocation("hdfs://db1.db/table1");
- inputEventId += 1;
- NotificationEvent notificationEvent = new NotificationEvent(inputEventId, 0,
- HCatEventMessage.EventType.CREATE_TABLE.toString(),
- messageFactory.buildCreateTableMessage(
- new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null))
- .toString());
- List<NotificationEvent> events = new ArrayList<>();
- events.add(invalidNotificationEvent);
- events.add(notificationEvent);
-
- Configuration configuration = new Configuration();
- HmsFollower hmsFollower = new HmsFollower(configuration, sentryStore, null,
- hiveConnectionFactory, hiveInstance);
- hmsFollower.processNotifications(events);
-
- // invalid event updates notification ID directly
- verify(sentryStore, times(1)).persistLastProcessedNotificationID(inputEventId - 1);
-
- // next valid event update path, which updates notification ID
- //noinspection unchecked
- verify(sentryStore, times(1)).addAuthzPathsMapping(Mockito.anyString(), Mockito.anyCollection(),
- Mockito.any(Updateable.Update.class));
- }
}
[3/3] sentry git commit: Revert "SENTRY-1769 Refactor HMSFollower
Class (Kalyan Kumar Kalvagadda reviewed by Vamsee Yarlagadda, Na Li,
Sergio Pena and Alexander Kolbasov)"
Posted by ka...@apache.org.
Revert "SENTRY-1769 Refactor HMSFollower Class (Kalyan Kumar Kalvagadda reviewed by Vamsee Yarlagadda, Na Li, Sergio Pena and Alexander Kolbasov)"
This reverts commit e5bb466efc621318c69f4a929dea3e39a77962af.
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/c56f48cc
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/c56f48cc
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/c56f48cc
Branch: refs/heads/sentry-ha-redesign
Commit: c56f48cc62e5b346d6c8432ccfa07d7f9fa4f110
Parents: e5bb466
Author: Kalyan Kumar Kalvagadda <kk...@cloudera.com>
Authored: Thu Jul 13 18:57:20 2017 -0500
Committer: Kalyan Kumar Kalvagadda <kk...@cloudera.com>
Committed: Thu Jul 13 18:57:20 2017 -0500
----------------------------------------------------------------------
.../apache/sentry/hdfs/PathImageRetriever.java | 13 +-
.../org/apache/sentry/hdfs/SentryPlugin.java | 40 +-
.../provider/db/SentryPolicyStorePlugin.java | 4 +-
.../db/service/model/MAuthzPathsSnapshotId.java | 2 +-
.../service/model/MSentryHmsNotification.java | 8 +-
.../db/service/persistent/PathsImage.java | 10 +-
.../db/service/persistent/SentryStore.java | 13 +-
.../thrift/SentryPolicyStoreProcessor.java | 5 +-
.../sentry/service/thrift/HMSFollower.java | 714 ++++++++++++++-----
.../service/thrift/NotificationProcessor.java | 571 ++++-----------
.../sentry/service/thrift/SentryHmsClient.java | 244 -------
.../sentry/service/thrift/SentryService.java | 17 +-
.../service/thrift/SentryServiceUtil.java | 35 +-
.../sentry/service/thrift/ServiceConstants.java | 2 +-
.../db/service/persistent/TestSentryStore.java | 1 +
.../sentry/service/thrift/TestHMSFollower.java | 348 +++------
.../thrift/TestNotificationProcessor.java | 465 ------------
.../service/thrift/TestSentryHmsClient.java | 470 ------------
.../TestDbPrivilegeCleanupOnDrop.java | 4 +-
19 files changed, 820 insertions(+), 2146 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java
index ac5c5b2..2426b40 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java
@@ -25,14 +25,9 @@ import org.apache.sentry.provider.db.service.persistent.PathsImage;
import org.apache.sentry.provider.db.service.persistent.SentryStore;
import javax.annotation.concurrent.ThreadSafe;
-
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import javax.annotation.concurrent.ThreadSafe;
-import org.apache.sentry.hdfs.service.thrift.TPathChanges;
-import org.apache.sentry.provider.db.service.persistent.PathsImage;
-import org.apache.sentry.provider.db.service.persistent.SentryStore;
/**
* PathImageRetriever obtains a complete snapshot of Hive Paths from a persistent
@@ -42,10 +37,10 @@ import org.apache.sentry.provider.db.service.persistent.SentryStore;
* It is a thread safe class, as all the underlying database operation is thread safe.
*/
@ThreadSafe
-class PathImageRetriever implements ImageRetriever<PathsUpdate> {
+public class PathImageRetriever implements ImageRetriever<PathsUpdate> {
- private static final String[] root = {"/"};
private final SentryStore sentryStore;
+ private static final String[] root = {"/"};
PathImageRetriever(SentryStore sentryStore) {
this.sentryStore = sentryStore;
@@ -60,8 +55,8 @@ class PathImageRetriever implements ImageRetriever<PathsUpdate> {
// persistent storage, along with the sequence number of latest
// delta change the snapshot corresponds to.
PathsImage pathsImage = sentryStore.retrieveFullPathsImage();
+ long curSeqNum = pathsImage.getCurSeqNum();
long curImgNum = pathsImage.getCurImgNum();
- long curSeqNum = pathsImage.getId();
Map<String, Set<String>> pathImage = pathsImage.getPathImage();
// Translates the complete Hive paths snapshot into a PathsUpdate.
@@ -78,7 +73,7 @@ class PathImageRetriever implements ImageRetriever<PathsUpdate> {
}
SentryHdfsMetricsUtil.getPathChangesHistogram.update(pathsUpdate
- .getPathChanges().size());
+ .getPathChanges().size());
// Translate PathsUpdate that contains a full image to TPathsDump for
// consumer (NN) to be able to quickly construct UpdateableAuthzPaths
http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
index 0c3ba5b..d6100de 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
@@ -23,14 +23,12 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
-import org.apache.sentry.core.common.exception.SentryInvalidInputException;
import org.apache.sentry.core.common.utils.SigUtils;
import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges;
import org.apache.sentry.hdfs.service.thrift.TRoleChanges;
import org.apache.sentry.provider.db.SentryPolicyStorePlugin;
import org.apache.sentry.provider.db.service.persistent.SentryStore;
-import org.apache.sentry.service.thrift.SentryServiceUtil;
import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleAddGroupsRequest;
import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleDeleteGroupsRequest;
import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleGrantPrivilegeRequest;
@@ -40,7 +38,7 @@ import org.apache.sentry.provider.db.service.thrift.TDropSentryRoleRequest;
import org.apache.sentry.provider.db.service.thrift.TRenamePrivilegesRequest;
import org.apache.sentry.provider.db.service.thrift.TSentryGroup;
import org.apache.sentry.provider.db.service.thrift.TSentryPrivilege;
-import org.apache.sentry.service.thrift.HmsFollower;
+import org.apache.sentry.service.thrift.HMSFollower;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +56,7 @@ import static org.apache.sentry.hdfs.service.thrift.sentry_hdfs_serviceConstants
* <ol>
* <li>
* Whenever updates happen on HMS, a corresponding notification log is generated,
- * and {@link HmsFollower} will process the notification event and persist it in database.
+ * and {@link HMSFollower} will process the notification event and persist it in database.
* <li>
* The NameNode periodically asks Sentry for updates. Sentry may return zero
* or more updates previously received via HMS notification log.
@@ -242,22 +240,16 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
@Override
public Update onRenameSentryPrivilege(TRenamePrivilegesRequest request)
- throws SentryPluginException, SentryInvalidInputException{
- String oldAuthz = null;
- String newAuthz = null;
- try {
- oldAuthz = SentryServiceUtil.getAuthzObj(request.getOldAuthorizable());
- newAuthz = SentryServiceUtil.getAuthzObj(request.getNewAuthorizable());
- } catch (SentryInvalidInputException failure) {
- LOGGER.error("onRenameSentryPrivilege, Could not rename sentry privilege ", failure);
- throw failure;
- }
+ throws SentryPluginException {
+ String oldAuthz = HMSFollower.getAuthzObj(request.getOldAuthorizable());
+ String newAuthz = HMSFollower.getAuthzObj(request.getNewAuthorizable());
PermissionsUpdate update = new PermissionsUpdate();
TPrivilegeChanges privUpdate = update.addPrivilegeUpdate(PermissionsUpdate.RENAME_PRIVS);
privUpdate.putToAddPrivileges(newAuthz, newAuthz);
privUpdate.putToDelPrivileges(oldAuthz, oldAuthz);
- LOGGER.debug("onRenameSentryPrivilege, Authz Perm preUpdate [ {} ]", oldAuthz);
+ LOGGER.debug(String.format("onRenameSentryPrivilege, Authz Perm preUpdate [ %s ]",
+ oldAuthz));
return update;
}
@@ -291,7 +283,8 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
update.addPrivilegeUpdate(authzObj).putToDelPrivileges(
roleName, privilege.getAction().toUpperCase());
- LOGGER.debug("onAlterSentryRoleRevokePrivilegeCore, Authz Perm preUpdate [ {} ]", authzObj);
+ LOGGER.debug(String.format("onAlterSentryRoleRevokePrivilegeCore, Authz Perm preUpdate [ %s ]",
+ authzObj));
return update;
}
@@ -303,7 +296,8 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
request.getRoleName(), PermissionsUpdate.ALL_AUTHZ_OBJ);
update.addRoleUpdate(request.getRoleName()).addToDelGroups(PermissionsUpdate.ALL_GROUPS);
- LOGGER.debug("onDropSentryRole, Authz Perm preUpdate [ {} ]", request.getRoleName());
+ LOGGER.debug(String.format("onDropSentryRole, Authz Perm preUpdate [ %s ]",
+ request.getRoleName()));
return update;
}
@@ -311,18 +305,12 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
public Update onDropSentryPrivilege(TDropPrivilegesRequest request)
throws SentryPluginException {
PermissionsUpdate update = new PermissionsUpdate();
- String authzObj = null;
- try {
- authzObj = SentryServiceUtil.getAuthzObj(request.getAuthorizable());
- } catch (SentryInvalidInputException failure) {
- LOGGER.error("onDropSentryPrivilege, Could not drop sentry privilege "
- + failure.toString(), failure);
- throw new SentryPluginException(failure.getMessage(), failure);
- }
+ String authzObj = HMSFollower.getAuthzObj(request.getAuthorizable());
update.addPrivilegeUpdate(authzObj).putToDelPrivileges(
PermissionsUpdate.ALL_ROLES, PermissionsUpdate.ALL_ROLES);
- LOGGER.debug("onDropSentryPrivilege, Authz Perm preUpdate [ {} ]", authzObj);
+ LOGGER.debug(String.format("onDropSentryPrivilege, Authz Perm preUpdate [ %s ]",
+ authzObj));
return update;
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java
index a22b422..5b8a572 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java
@@ -19,7 +19,6 @@
package org.apache.sentry.provider.db;
import org.apache.hadoop.conf.Configuration;
-import org.apache.sentry.core.common.exception.SentryInvalidInputException;
import org.apache.sentry.core.common.exception.SentryUserException;
import org.apache.sentry.provider.db.service.persistent.SentryStore;
import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleAddGroupsRequest;
@@ -69,8 +68,7 @@ public interface SentryPolicyStorePlugin {
Update onDropSentryRole(TDropSentryRoleRequest tRequest) throws SentryPluginException;
- Update onRenameSentryPrivilege(TRenamePrivilegesRequest request)
- throws SentryPluginException, SentryInvalidInputException;
+ Update onRenameSentryPrivilege(TRenamePrivilegesRequest request) throws SentryPluginException;
Update onDropSentryPrivilege(TDropPrivilegesRequest request) throws SentryPluginException;
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MAuthzPathsSnapshotId.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MAuthzPathsSnapshotId.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MAuthzPathsSnapshotId.java
index d8d54f3..d683c2c 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MAuthzPathsSnapshotId.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MAuthzPathsSnapshotId.java
@@ -22,7 +22,7 @@ import javax.jdo.annotations.PrimaryKey;
/**
* This class is used to persist new authz paths snapshots IDs. An authz path snapshot ID is required by
- * the MAuthzPathsMapping to detect new HMS snapshots created by the HmsFollower.
+ * the MAuthzPathsMapping to detect new HMS snapshots created by the HMSFollower.
*/
@PersistenceCapable
public class MAuthzPathsSnapshotId {
http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java
index 166bec7..0d54548 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java
@@ -20,13 +20,15 @@ package org.apache.sentry.provider.db.service.model;
/**
* Database backend store for HMS Notification ID's. All the notifications that are processed
* by sentry are stored.
- * <p>
+ */
+
+/*
* <p> HMS notification ID's are stored in separate table for three reasons</p>
* <ol>
* <li>SENTRY_PATH_CHANGE is not updated for every notification that is received from HMS. There
- * are cases where HmsFollower doesn't process notifications and skip's them. Depending on
+ * are cases where HMSFollower doesn't process notifications and skip's them. Depending on
* SENTRY_PATH_CHANGE information may not provide the last notification processed.</li>
- * <li> There could be cases where HmsFollower thread in multiple sentry servers acting as a
+ * <li> There could be cases where HMSFollower thread in multiple sentry servers acting as a
* leader and process HMS notifications. we need to avoid processing the notifications
* multiple times. This can be made sure by always having some number of notification
* information always regardless of purging interval.</li>
http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java
index 409a557..4d852e6 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java
@@ -31,17 +31,17 @@ public class PathsImage {
// A full image of hiveObj to Paths mapping.
private final Map<String, Set<String>> pathImage;
- private final long id;
+ private final long curSeqNum;
private final long curImgNum;
- public PathsImage(Map<String, Set<String>> pathImage, long id, long curImgNum) {
+ public PathsImage(Map<String, Set<String>> pathImage, long curSeqNum, long curImgNum) {
this.pathImage = pathImage;
- this.id = id;
+ this.curSeqNum = curSeqNum;
this.curImgNum = curImgNum;
}
- public long getId() {
- return id;
+ public long getCurSeqNum() {
+ return curSeqNum;
}
public long getCurImgNum() {
http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
index 7b02e2c..979e45b 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
@@ -107,7 +107,7 @@ import static org.apache.sentry.provider.db.service.persistent.QueryParamBuilder
* single node and rely on DB for multi-node synchronization.
* <p>
* This isn't much of a problem for path updates since they are
- * driven by HmsFollower which usually runs on a single leader
+ * driven by HMSFollower which usually runs on a single leader
* node, but permission updates originate from clients
* directly and may be highly concurrent.
* <p>
@@ -151,7 +151,7 @@ public class SentryStore {
private static final long COUNT_VALUE_UNKNOWN = -1L;
// Representation for unknown HMS notification ID
- public static final long NOTIFICATION_UNKNOWN = -1L;
+ private static final long NOTIFICATION_UNKNOWN = -1L;
private static final Set<String> ALL_ACTIONS = Sets.newHashSet(AccessConstants.ALL,
AccessConstants.SELECT, AccessConstants.INSERT, AccessConstants.ALTER,
@@ -169,8 +169,8 @@ public class SentryStore {
private final TransactionManager tm;
/**
- * counterWait is used to synchronize notifications between Thrift and HmsFollower.
- * Technically it doesn't belong here, but the only thing that connects HmsFollower
+ * counterWait is used to synchronize notifications between Thrift and HMSFollower.
+ * Technically it doesn't belong here, but the only thing that connects HMSFollower
* and Thrift API is SentryStore. An alternative could be a singleton CounterWait or
* some factory that returns CounterWait instances keyed by name, but this complicates
* things unnecessary.
@@ -2674,7 +2674,7 @@ public class SentryStore {
/**
* Persist an up-to-date HMS snapshot into Sentry DB in a single transaction.
*
- * @param authzPaths paths to be be persisted
+ * @param authzPaths Mapping of hiveObj to < Paths <
* @throws Exception
*/
public void persistFullPathsImage(final Map<String, Set<String>> authzPaths) throws Exception {
@@ -2685,6 +2685,7 @@ public class SentryStore {
long snapshotID = getCurrentAuthzPathsSnapshotID(pm);
long nextSnapshotID = snapshotID + 1;
+
pm.makePersistent(new MAuthzPathsSnapshotId(nextSnapshotID));
for (Map.Entry<String, Set<String>> authzPath : authzPaths.entrySet()) {
pm.makePersistent(new MAuthzPathsMapping(nextSnapshotID, authzPath.getKey(), authzPath.getValue()));
@@ -3703,7 +3704,7 @@ public class SentryStore {
*
* @param pm the PersistenceManager
* @return EMPTY_NOTIFICATION_ID(0) when there are no notifications processed.
- * else last NotificationID processed by HmsFollower
+ * else last NotificationID processed by HMSFollower
*/
static Long getLastProcessedNotificationIDCore(
PersistenceManager pm) {
http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
index cfd0e30..ad23334 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
@@ -945,10 +945,7 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface {
} catch (SentryThriftAPIMismatchException e) {
LOGGER.error(e.getMessage(), e);
response.setStatus(Status.THRIFT_VERSION_MISMATCH(e.getMessage(), e));
- } catch (SentryInvalidInputException e) {
- response.setStatus(Status.InvalidInput(e.getMessage(), e));
- }
- catch (Exception e) {
+ } catch (Exception e) {
String msg = "Unknown error for request: " + request + ", message: "
+ e.getMessage();
LOGGER.error(msg, e);
http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
index a9d05b1..1b6ae18 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
@@ -1,275 +1,641 @@
-/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
- <p>
- http://www.apache.org/licenses/LICENSE-2.0
- <p>
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-
package org.apache.sentry.service.thrift;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Timer;
+import com.codahale.metrics.Timer.Context;
import com.google.common.annotations.VisibleForTesting;
-
-import java.net.SocketException;
-
-import java.util.Collection;
-import javax.jdo.JDODataStoreException;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hive.hcatalog.messaging.HCatEventMessage;
import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
-import org.apache.sentry.provider.db.service.persistent.PathsImage;
+import org.apache.sentry.core.common.exception.SentryInvalidHMSEventException;
+import org.apache.sentry.core.common.exception.SentryInvalidInputException;
+import org.apache.sentry.core.common.exception.SentryNoSuchObjectException;
+import org.apache.sentry.hdfs.PermissionsUpdate;
+import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges;
+import org.apache.sentry.provider.db.SentryPolicyStorePlugin;
import org.apache.sentry.provider.db.service.persistent.SentryStore;
+import org.apache.sentry.provider.db.service.thrift.SentryMetrics;
+import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.sentry.binding.metastore.messaging.json.*;
+
+import javax.jdo.JDODataStoreException;
+import javax.security.auth.login.LoginException;
+
+import java.io.IOException;
+import java.net.SocketException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import static com.codahale.metrics.MetricRegistry.name;
+import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_CREATE_WITH_POLICY_STORE;
+import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_DROP_WITH_POLICY_STORE;
+import static org.apache.sentry.hdfs.Updateable.Update;
/**
- * HmsFollower is the thread which follows the Hive MetaStore state changes from Sentry.
+ * HMSFollower is the thread which follows the Hive MetaStore state changes from Sentry.
* It gets the full update and notification logs from HMS and applies it to
* update permissions stored in Sentry using SentryStore and also update the < obj,path > state
* stored for HDFS-Sentry sync.
*/
-public class HmsFollower implements Runnable, AutoCloseable {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(HmsFollower.class);
- private static boolean connectedToHms = false;
- private final SentryHmsClient client;
+@SuppressWarnings("PMD")
+public class HMSFollower implements Runnable, AutoCloseable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(HMSFollower.class);
+ private HiveSimpleConnectionFactory hiveConnectionFactory;
+ // Track the latest eventId of the event that has been logged. So we don't log the same message
+ private long lastLoggedEventId = SentryStore.EMPTY_CHANGE_ID;
+ private static boolean connectedToHMS = false;
+ private HMSClient client;
private final Configuration authzConf;
private final SentryStore sentryStore;
- private final NotificationProcessor notificationProcessor;
+ private String hiveInstance;
+ private boolean needLogHMSSupportReady = true;
private final LeaderStatusMonitor leaderMonitor;
- /**
- * Configuring Hms Follower thread.
- *
- * @param conf sentry configuration
- * @param store sentry store
- * @param leaderMonitor singleton instance of LeaderStatusMonitor
- */
- HmsFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor,
- HiveSimpleConnectionFactory hiveConnectionFactory) {
- this(conf, store, leaderMonitor, hiveConnectionFactory, null);
- }
+ private static final String SNAPSHOT = "snapshot";
+ /** Measures time to get full snapshot */
+ private final Timer updateTimer = SentryMetrics.getInstance()
+ .getTimer(name(FullUpdateInitializer.class, SNAPSHOT));
+ /** Number of times update failed */
+ private final Counter failedSnapshotsCount = SentryMetrics.getInstance()
+ .getCounter(name(FullUpdateInitializer.class, "failed"));
- @VisibleForTesting
- /**
- * Constructor should be used only for testing purposes.
- *
- * @param conf sentry configuration
- * @param store sentry store
- * @param leaderMonitor
- * @param authServerName Server that sentry is Authorizing
- */
- HmsFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor,
- HiveSimpleConnectionFactory hiveConnectionFactory, String authServerName) {
- LOGGER.info("HmsFollower is being initialized");
+ HMSFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor,
+ HiveSimpleConnectionFactory hiveConnectionFactory) {
authzConf = conf;
this.leaderMonitor = leaderMonitor;
sentryStore = store;
- if (authServerName == null) {
- HiveConf hiveConf = new HiveConf();
- authServerName = hiveConf.get(HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME.getVar());
- }
- notificationProcessor = new NotificationProcessor(sentryStore, authServerName, authzConf);
- client = new SentryHmsClient(authzConf, hiveConnectionFactory);
+ this.hiveConnectionFactory = hiveConnectionFactory;
+ }
+
+ @VisibleForTesting
+ HMSFollower(Configuration conf, SentryStore sentryStore, String hiveInstance)
+ throws IOException, LoginException {
+ this(conf, sentryStore, null, null);
+ this.hiveInstance = hiveInstance;
+ hiveConnectionFactory = new HiveSimpleConnectionFactory(conf, new HiveConf());
+ hiveConnectionFactory.init();
}
@VisibleForTesting
- public static boolean isConnectedToHms() {
- return connectedToHms;
+ public static boolean isConnectedToHMS() {
+ return connectedToHMS;
}
@Override
public void close() {
- if (client != null) {
- // Close any outstanding connections to HMS
- try {
- client.disconnect();
- } catch (Exception failure) {
- LOGGER.error("Failed to close the Sentry Hms Client", failure);
- }
+ // Close any outstanding connections to HMS
+ closeHMSConnection();
+ try {
+ hiveConnectionFactory.close();
+ } catch (Exception e) {
+ LOGGER.error("failed to close Hive Connection Factory", e);
}
}
+ /**
+ * Returns HMS Client if successful, returns null if HMS is not ready yet to take connections
+ * Throws @LoginException if Kerberos context creation failed using Sentry's kerberos credentials
+ * Throws @MetaException if there was a problem on creating an HMSClient
+ */
+ private HiveMetaStoreClient getMetaStoreClient()
+ throws IOException, InterruptedException, MetaException {
+ if (client == null) {
+ client = hiveConnectionFactory.connect();
+ connectedToHMS = true;
+ }
+ return client.getClient();
+ }
+
@Override
public void run() {
- long lastProcessedNotificationId;
+ Long lastProcessedNotificationID;
try {
- // Initializing lastProcessedNotificationId based on the latest persisted notification ID.
- lastProcessedNotificationId = sentryStore.getLastProcessedNotificationID();
+ // Initializing lastProcessedNotificationID based on the latest persisted notification ID.
+ lastProcessedNotificationID = sentryStore.getLastProcessedNotificationID();
} catch (Exception e) {
- LOGGER.error("Failed to get the last processed notification id from sentry store, "
- + "Skipping the processing", e);
+ LOGGER.error("Failed to get the last processed notification id from sentry store, " +
+ "Skipping the processing", e);
return;
}
// Wake any clients connected to this service waiting for HMS already processed notifications.
- wakeUpWaitingClientsForSync(lastProcessedNotificationId);
+ wakeUpWaitingClientsForSync(lastProcessedNotificationID);
// Only the leader should listen to HMS updates
if ((leaderMonitor != null) && !leaderMonitor.isLeader()) {
// Close any outstanding connections to HMS
- close();
+ closeHMSConnection();
return;
}
- syncupWithHms(lastProcessedNotificationId);
+ processHiveMetastoreUpdates();
}
/**
- * Processes new Hive Metastore notifications.
- *
- * <p>If no notifications are processed yet, then it
- * does a full initial snapshot of the Hive Metastore followed by new notifications updates that
- * could have happened after it.
+ * Wakes up HMS waiters waiting for a specific event notification.
*
- * <p>Clients connections waiting for an event notification will be
- * woken up afterwards.
+ * @param eventID
*/
- private void syncupWithHms(long notificationId) {
- try {
- client.connect();
- connectedToHms = true;
- } catch (Throwable e) {
- LOGGER.error("HmsFollower cannot connect to HMS!!", e);
- return;
+ private void wakeUpWaitingClientsForSync(long eventID) {
+ CounterWait counterWait = sentryStore.getCounterWait();
+
+ // Wake up any HMS waiters that are waiting for this ID.
+ // counterWait should never be null, but tests mock SentryStore and a mocked one
+ // doesn't have it.
+ if (counterWait != null) {
+ counterWait.update(eventID);
}
+ }
+ /**
+ * Processes new Hive Metastore notifications.
+ *
+ * If no notifications are processed yet, then it does a full initial snapshot of the Hive Metastore
+ * followed by new notifications updates that could have happened after it.
+ *
+ * Clients connections waiting for an event notification will be woken up afterwards.
+ */
+ private void processHiveMetastoreUpdates() {
try {
- long lastProcessedNotificationId = notificationId;
- // Create a full HMS snapshot if there is none
// Decision of taking full snapshot is based on AuthzPathsMapping information persisted
- // in the sentry persistent store. If AuthzPathsMapping is empty, snapshot is needed.
+ // in the sentry persistent store. If AuthzPathsMapping is empty, shapshot is needed.
+ Long lastProcessedNotificationID;
if (sentryStore.isAuthzPathsMappingEmpty()) {
- lastProcessedNotificationId = createFullSnapshot();
- if (lastProcessedNotificationId == SentryStore.EMPTY_NOTIFICATION_ID) {
+ // TODO: expose time used for full update in the metrics
+
+ // To ensure point-in-time snapshot consistency, need to make sure
+ // there were no HMS updates while retrieving the snapshot.
+ // In detail the logic is:
+ //
+ // 1. Read current HMS notification ID_initial
+ // 2. Read HMS metadata state
+ // 3. Read current notification ID_new
+ // 4. If ID_initial != ID_new then the attempts for retrieving full HMS snapshot
+ // will be dropped. A new attempts will be made after 500 milliseconds when
+ // HMSFollower run again.
+
+ CurrentNotificationEventId eventIDBefore = getMetaStoreClient().getCurrentNotificationEventId();
+ LOGGER.info("Before fetching hive full snapshot, Current NotificationID = {}", eventIDBefore);
+
+ Map<String, Set<String>> pathsFullSnapshot = fetchFullUpdate();
+ if(pathsFullSnapshot.isEmpty()) {
+ LOGGER.info("Hive full snapshot is Empty. Perhaps, HMS does not have any data");
return;
}
+
+ CurrentNotificationEventId eventIDAfter = getMetaStoreClient().getCurrentNotificationEventId();
+ LOGGER.info("After fetching hive full snapshot, Current NotificationID = {}", eventIDAfter);
+
+ if (!eventIDBefore.equals(eventIDAfter)) {
+ LOGGER.error("Fail to get a point-in-time hive full snapshot. Current ID = {}",
+ eventIDAfter);
+ return;
+ }
+
+ LOGGER.info("Successfully fetched hive full snapshot, Current NotificationID = {}",
+ eventIDAfter);
+ // As eventIDAfter is the last event that was processed, eventIDAfter is used to update
+ // lastProcessedNotificationID instead of getting it from persistent store.
+ lastProcessedNotificationID = eventIDAfter.getEventId();
+ sentryStore.persistFullPathsImage(pathsFullSnapshot);
+ sentryStore.persistLastProcessedNotificationID(eventIDAfter.getEventId());
+ // Wake up any HMS waiters that could have been put on hold before getting the eventIDBefore value.
+ wakeUpWaitingClientsForSync(lastProcessedNotificationID);
+ } else {
+ // Every time HMSFollower is scheduled to run, value should be updates based
+ // on the value stored in database.
+ lastProcessedNotificationID = sentryStore.getLastProcessedNotificationID();
+ }
+
+ // HMSFollower connected to HMS and it finished full snapshot if that was required
+ // Log this message only once
+ if (needLogHMSSupportReady && connectedToHMS) {
+ LOGGER.info("Sentry HMS support is ready");
+ needLogHMSSupportReady = false;
+ }
+
+ // HIVE-15761: Currently getNextNotification API may return an empty
+ // NotificationEventResponse causing TProtocolException.
+ // Workaround: Only processes the notification events newer than the last updated one.
+ CurrentNotificationEventId eventId = getMetaStoreClient().getCurrentNotificationEventId();
+ LOGGER.debug("Last Notification in HMS {} lastProcessedNotificationID is {}",
+ eventId.getEventId(), lastProcessedNotificationID);
+ if (eventId.getEventId() > lastProcessedNotificationID) {
+ NotificationEventResponse response =
+ getMetaStoreClient().getNextNotification(lastProcessedNotificationID, Integer.MAX_VALUE, null);
+ if (response.isSetEvents()) {
+ if (!response.getEvents().isEmpty()) {
+ if (lastProcessedNotificationID != lastLoggedEventId) {
+ // Only log when there are updates and the notification ID has changed.
+ LOGGER.debug("lastProcessedNotificationID = {}. Processing {} events",
+ lastProcessedNotificationID, response.getEvents().size());
+ lastLoggedEventId = lastProcessedNotificationID;
+ }
+
+ processNotificationEvents(response.getEvents());
+ }
+ }
}
- // Get the new notification from HMS and process them.
- processNotifications(client.getNotifications(lastProcessedNotificationId));
} catch (TException e) {
- // If the underlying exception is around socket exception,
- // it is better to retry connection to HMS
+ // If the underlying exception is around socket exception, it is better to retry connection to HMS
if (e.getCause() instanceof SocketException) {
- LOGGER.error("Encountered Socket Exception during fetching Notification entries,"
- + " will attempt to reconnect to HMS after configured interval", e);
- close();
+ LOGGER.error("Encountered Socket Exception during fetching Notification entries, will reconnect to HMS", e);
+ client.invalidate();
+ closeHMSConnection();
} else {
- LOGGER.error("ThriftException occurred communicating with HMS", e);
+ LOGGER.error("ThriftException occured fetching Notification entries, will try", e);
}
+ } catch (SentryInvalidInputException | SentryInvalidHMSEventException e) {
+ LOGGER.error("Encounter SentryInvalidInputException|SentryInvalidHMSEventException " +
+ "while processing notification log", e);
} catch (Throwable t) {
// catching errors to prevent the executor to halt.
- LOGGER.error("Exception in HmsFollower! Caused by: " + t.getMessage(),
- t);
+ LOGGER.error("Caught unexpected exception in HMSFollower! Caused by: " + t.getMessage(),
+ t.getCause());
+ t.printStackTrace();
}
}
/**
- * Request for full snapshot and persists it if there is no snapshot available in the
- * sentry store. Also, wakes-up any waiting clients.
- *
- * @return ID of last notification processed.
- * @throws Exception if there are failures
+ * Function to close HMS connection and any associated kerberos context (if applicable)
*/
- private long createFullSnapshot() throws Exception {
- LOGGER.debug("Attempting to take full HMS snapshot");
- PathsImage snapshotInfo = client.getFullSnapshot();
- if (snapshotInfo.getPathImage().isEmpty()) {
- return snapshotInfo.getId();
- }
+ private void closeHMSConnection() {
try {
- LOGGER.debug("Persisting HMS path full snapshot");
- sentryStore.persistFullPathsImage(snapshotInfo.getPathImage());
- sentryStore.persistLastProcessedNotificationID(snapshotInfo.getId());
- } catch (Exception failure) {
- LOGGER.error("Received exception while persisting HMS path full snapshot ");
- throw failure;
+ if (client != null) {
+ LOGGER.info("Closing the HMS client connection");
+ client.close();
+ connectedToHMS = false;
+ }
+ } finally {
+ client = null;
}
- // Wake up any HMS waiters that could have been put on hold before getting the
- // eventIDBefore value.
- wakeUpWaitingClientsForSync(snapshotInfo.getId());
- // HmsFollower connected to HMS and it finished full snapshot if that was required
- // Log this message only once
- LOGGER.info("Sentry HMS support is ready");
- return snapshotInfo.getId();
}
/**
- * Process the collection of notifications and wake up any waiting clients.
- * Also, persists the notification ID regardless of processing result.
+ * Retrieve a Hive full snapshot from HMS.
*
- * @param events list of event to be processed
- * @throws Exception if the complete notification list is not processed because of JDO Exception
+ * @return HMS snapshot. Snapshot consists of a mapping from auth object name
+ * to the set of paths corresponding to that name.
+ * @throws InterruptedException
+ * @throws TException
+ * @throws ExecutionException
*/
- void processNotifications(Collection<NotificationEvent> events) throws Exception {
- boolean isNotificationProcessed;
- if (events.isEmpty()) {
- return;
+ private Map<String, Set<String>> fetchFullUpdate()
+ throws TException, ExecutionException {
+ LOGGER.info("Request full HMS snapshot");
+ try (FullUpdateInitializer updateInitializer =
+ new FullUpdateInitializer(hiveConnectionFactory, authzConf);
+ Context context = updateTimer.time()) {
+ Map<String, Set<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot();
+ LOGGER.info("Obtained full HMS snapshot");
+ return pathsUpdate;
+ } catch (Exception ignored) {
+ failedSnapshotsCount.inc();
+ // Caller will retry later
+ return Collections.emptyMap();
}
+ }
+
+ private boolean syncWithPolicyStore(HiveAuthzConf.AuthzConfVars syncConfVar) {
+ return "true"
+ .equalsIgnoreCase((authzConf.get(syncConfVar.getVar(), syncConfVar.getDefault())));
+ }
+
+ /**
+ * Throws SentryInvalidHMSEventException if Notification event contains insufficient information
+ */
+ void processNotificationEvents(List<NotificationEvent> events) throws Exception {
+ SentryJSONMessageDeserializer deserializer = new SentryJSONMessageDeserializer();
+
+ boolean isNotificationProcessingSkipped = false;
for (NotificationEvent event : events) {
- isNotificationProcessed = false;
+ String dbName;
+ String tableName;
+ String oldLocation;
+ String newLocation;
+ String location;
+ List<String> locations;
+ NotificationProcessor notificationProcessor = new NotificationProcessor(sentryStore, LOGGER);
try {
- // Only the leader should process the notifications
- if ((leaderMonitor != null) && !leaderMonitor.isLeader()) {
- return;
+ LOGGER.debug("Processing notification with id {} and type {}", event.getEventId(),
+ event.getEventType());
+ switch (HCatEventMessage.EventType.valueOf(event.getEventType())) {
+ case CREATE_DATABASE:
+ SentryJSONCreateDatabaseMessage message =
+ deserializer.getCreateDatabaseMessage(event.getMessage());
+ dbName = message.getDB();
+ location = message.getLocation();
+ if ((dbName == null) || (location == null)) {
+ isNotificationProcessingSkipped = true;
+ LOGGER.error(String.format("Create database event " +
+ "has incomplete information. dbName = %s location = %s",
+ StringUtils.defaultIfBlank(dbName, "null"),
+ StringUtils.defaultIfBlank(location, "null")));
+ break;
+ }
+ if (syncWithPolicyStore(AUTHZ_SYNC_CREATE_WITH_POLICY_STORE)) {
+ dropSentryDbPrivileges(dbName, event);
+ }
+ notificationProcessor.processCreateDatabase(dbName, location, event.getEventId());
+ break;
+ case DROP_DATABASE:
+ SentryJSONDropDatabaseMessage dropDatabaseMessage =
+ deserializer.getDropDatabaseMessage(event.getMessage());
+ dbName = dropDatabaseMessage.getDB();
+ location = dropDatabaseMessage.getLocation();
+ if (dbName == null) {
+ isNotificationProcessingSkipped = true;
+ LOGGER.error("Drop database event has incomplete information: dbName = null");
+ break;
+ }
+ if (syncWithPolicyStore(AUTHZ_SYNC_DROP_WITH_POLICY_STORE)) {
+ dropSentryDbPrivileges(dbName, event);
+ }
+ notificationProcessor.processDropDatabase(dbName, location, event.getEventId());
+ break;
+ case CREATE_TABLE:
+ SentryJSONCreateTableMessage createTableMessage = deserializer.getCreateTableMessage(event.getMessage());
+ dbName = createTableMessage.getDB();
+ tableName = createTableMessage.getTable();
+ location = createTableMessage.getLocation();
+ if ((dbName == null) || (tableName == null) || (location == null)) {
+ isNotificationProcessingSkipped = true;
+ LOGGER.error(String.format("Create table event " + "has incomplete information."
+ + " dbName = %s, tableName = %s, location = %s",
+ StringUtils.defaultIfBlank(dbName, "null"),
+ StringUtils.defaultIfBlank(tableName, "null"),
+ StringUtils.defaultIfBlank(location, "null")));
+ break;
+ }
+ if (syncWithPolicyStore(AUTHZ_SYNC_CREATE_WITH_POLICY_STORE)) {
+ dropSentryTablePrivileges(dbName, tableName, event);
+ }
+ notificationProcessor.processCreateTable(dbName, tableName, location, event.getEventId());
+ break;
+ case DROP_TABLE:
+ SentryJSONDropTableMessage dropTableMessage = deserializer.getDropTableMessage(event.getMessage());
+ dbName = dropTableMessage.getDB();
+ tableName = dropTableMessage.getTable();
+ if ((dbName == null) || (tableName == null)) {
+ isNotificationProcessingSkipped = true;
+ LOGGER.error(String.format("Drop table event " +
+ "has incomplete information. dbName = %s, tableName = %s",
+ StringUtils.defaultIfBlank(dbName, "null"),
+ StringUtils.defaultIfBlank(tableName, "null")));
+ break;
+ }
+ if (syncWithPolicyStore(AUTHZ_SYNC_DROP_WITH_POLICY_STORE)) {
+ dropSentryTablePrivileges(dbName, tableName, event);
+ }
+ notificationProcessor.processDropTable(dbName, tableName, event.getEventId());
+ break;
+ case ALTER_TABLE:
+ SentryJSONAlterTableMessage alterTableMessage = deserializer.getAlterTableMessage(event.getMessage());
+
+ String oldDbName = alterTableMessage.getDB();
+ String oldTableName = alterTableMessage.getTable();
+ String newDbName = event.getDbName();
+ String newTableName = event.getTableName();
+ oldLocation = alterTableMessage.getOldLocation();
+ newLocation = alterTableMessage.getNewLocation();
+
+ if ((oldDbName == null) ||
+ (oldTableName == null) ||
+ (newDbName == null) ||
+ (newTableName == null) ||
+ (oldLocation == null) ||
+ (newLocation == null)) {
+ isNotificationProcessingSkipped = true;
+ LOGGER.error(String.format("Alter table event " +
+ "has incomplete information. oldDbName = %s, oldTableName = %s, oldLocation = %s, " +
+ "newDbName = %s, newTableName = %s, newLocation = %s",
+ StringUtils.defaultIfBlank(oldDbName, "null"),
+ StringUtils.defaultIfBlank(oldTableName, "null"),
+ StringUtils.defaultIfBlank(oldLocation, "null"),
+ StringUtils.defaultIfBlank(newDbName, "null"),
+ StringUtils.defaultIfBlank(newTableName, "null"),
+ StringUtils.defaultIfBlank(newLocation, "null")));
+ break;
+ } else if ((oldDbName.equalsIgnoreCase(newDbName)) &&
+ (oldTableName.equalsIgnoreCase(newTableName)) &&
+ (oldLocation.equalsIgnoreCase(newLocation))) {
+ isNotificationProcessingSkipped = true;
+ LOGGER.info(String.format("Alter table notification ignored as neither name nor " +
+ "location has changed: oldAuthzObj = %s, oldLocation = %s, newAuthzObj = %s, " +
+ "newLocation = %s", oldDbName + "." + oldTableName , oldLocation,
+ newDbName + "." + newTableName, newLocation));
+ break;
+ }
+
+ if (!newDbName.equalsIgnoreCase(oldDbName) || !oldTableName.equalsIgnoreCase(newTableName)) {
+ // Name has changed
+ try {
+ renamePrivileges(oldDbName, oldTableName, newDbName, newTableName);
+ } catch (SentryNoSuchObjectException e) {
+ LOGGER.info("Rename Sentry privilege ignored as there are no privileges on the table: %s.%s",
+ oldDbName, oldTableName);
+ } catch (Exception e) {
+ isNotificationProcessingSkipped = true;
+ LOGGER.info("Could not process Alter table event. Event: " + event.toString(), e);
+ break;
+ }
+ }
+ notificationProcessor.processAlterTable(oldDbName, newDbName, oldTableName,
+ newTableName, oldLocation, newLocation, event.getEventId());
+ break;
+ case ADD_PARTITION:
+ SentryJSONAddPartitionMessage addPartitionMessage =
+ deserializer.getAddPartitionMessage(event.getMessage());
+ dbName = addPartitionMessage.getDB();
+ tableName = addPartitionMessage.getTable();
+ locations = addPartitionMessage.getLocations();
+ if ((dbName == null) || (tableName == null) || (locations == null)) {
+ isNotificationProcessingSkipped = true;
+ LOGGER.error(String.format("Create table event has incomplete information. " +
+ "dbName = %s, tableName = %s, locations = %s",
+ StringUtils.defaultIfBlank(dbName, "null"),
+ StringUtils.defaultIfBlank(tableName, "null"),
+ locations != null ? locations.toString() : "null"));
+ break;
+ }
+ notificationProcessor.processAddPartition(dbName, tableName, locations, event.getEventId());
+ break;
+ case DROP_PARTITION:
+ SentryJSONDropPartitionMessage dropPartitionMessage =
+ deserializer.getDropPartitionMessage(event.getMessage());
+ dbName = dropPartitionMessage.getDB();
+ tableName = dropPartitionMessage.getTable();
+ locations = dropPartitionMessage.getLocations();
+ if ((dbName == null) || (tableName == null) || (locations == null)) {
+ isNotificationProcessingSkipped = true;
+ LOGGER.error(String.format("Drop partition event " +
+ "has incomplete information. dbName = %s, tableName = %s, location = %s",
+ StringUtils.defaultIfBlank(dbName, "null"),
+ StringUtils.defaultIfBlank(tableName, "null"),
+ locations != null ? locations.toString() : "null"));
+ break;
+ }
+ notificationProcessor.processDropPartition(dbName, tableName, locations, event.getEventId());
+
+ break;
+ case ALTER_PARTITION:
+ SentryJSONAlterPartitionMessage alterPartitionMessage =
+ deserializer.getAlterPartitionMessage(event.getMessage());
+ dbName = alterPartitionMessage.getDB();
+ tableName = alterPartitionMessage.getTable();
+ oldLocation = alterPartitionMessage.getOldLocation();
+ newLocation = alterPartitionMessage.getNewLocation();
+
+ if ((dbName == null) ||
+ (tableName == null) ||
+ (oldLocation == null) ||
+ (newLocation == null)) {
+ isNotificationProcessingSkipped = true;
+ LOGGER.error(String.format("Alter partition event " +
+ "has incomplete information. dbName = %s, tableName = %s, " +
+ "oldLocation = %s, newLocation = %s",
+ StringUtils.defaultIfBlank(dbName, "null"),
+ StringUtils.defaultIfBlank(tableName, "null"),
+ StringUtils.defaultIfBlank(oldLocation, "null"),
+ StringUtils.defaultIfBlank(newLocation, "null")));
+ break;
+ } else if (oldLocation.equalsIgnoreCase(newLocation)) {
+ isNotificationProcessingSkipped = true;
+ LOGGER.info(String.format("Alter partition notification ignored as" +
+ "location has not changed: AuthzObj = %s, Location = %s", dbName + "." +
+ "." + tableName, oldLocation));
+ break;
+ }
+
+ notificationProcessor.processAlterPartition(dbName, tableName, oldLocation,
+ newLocation, event.getEventId());
+ break;
+ case INSERT:
+ // TODO DO we need to do anything here?
+ break;
}
- isNotificationProcessed = notificationProcessor.processNotificationEvent(event);
} catch (Exception e) {
if (e.getCause() instanceof JDODataStoreException) {
- LOGGER.info("Received JDO Storage Exception, Could be because of processing "
- + "duplicate notification");
+ LOGGER.info("Received JDO Storage Exception, Could be because of processing " +
+ "duplicate notification");
if (event.getEventId() <= sentryStore.getLastProcessedNotificationID()) {
// Rest of the notifications need not be processed.
- LOGGER.error("Received event with Id: {} which is smaller then the ID "
- + "persisted in store", event.getEventId());
- break;
+ throw e;
}
- } else {
- LOGGER.error("Processing the notification with ID:{} failed with exception {}",
- event.getEventId(), e);
}
+ sentryStore.persistLastProcessedNotificationID(event.getEventId());
}
- if (!isNotificationProcessed) {
- try {
- // Update the notification ID in the persistent store even when the notification is
- // not processed as the content in in the notification is not valid.
- // Continue processing the next notification.
- LOGGER.debug("Explicitly Persisting Notification ID:{}", event.getEventId());
- sentryStore.persistLastProcessedNotificationID(event.getEventId());
- } catch (Exception failure) {
- LOGGER.error("Received exception while persisting the notification ID "
- + event.getEventId());
- throw failure;
- }
+ if (isNotificationProcessingSkipped) {
+ // Update the notification ID in the persistent store even when the notification is
+ // not processed as the content in in the notification is not valid.
+ // Continue processing the next notification.
+ sentryStore.persistLastProcessedNotificationID(event.getEventId());
+ isNotificationProcessingSkipped = false;
}
// Wake up any HMS waiters that are waiting for this ID.
wakeUpWaitingClientsForSync(event.getEventId());
}
}
- /**
- * Wakes up HMS waiters waiting for a specific event notification.
- *
- * @param eventId Id of a notification
- */
- private void wakeUpWaitingClientsForSync(long eventId) {
- CounterWait counterWait = sentryStore.getCounterWait();
+ private void dropSentryDbPrivileges(String dbName, NotificationEvent event) throws Exception {
+ try {
+ TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
+ authorizable.setDb(dbName);
+ sentryStore.dropPrivilege(authorizable, onDropSentryPrivilege(authorizable));
+ } catch (SentryNoSuchObjectException e) {
+ LOGGER.info("Drop Sentry privilege ignored as there are no privileges on the database: %s", dbName);
+ } catch (Exception e) {
+ throw new SentryInvalidInputException("Could not process Drop database event." +
+ "Event: " + event.toString(), e);
+ }
+ }
- // Wake up any HMS waiters that are waiting for this ID.
- // counterWait should never be null, but tests mock SentryStore and a mocked one
- // doesn't have it.
- if (counterWait != null) {
- counterWait.update(eventId);
+ private void dropSentryTablePrivileges(String dbName, String tableName, NotificationEvent event) throws Exception {
+ try {
+ TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
+ authorizable.setDb(dbName);
+ authorizable.setTable(tableName);
+ sentryStore.dropPrivilege(authorizable, onDropSentryPrivilege(authorizable));
+ } catch (SentryNoSuchObjectException e) {
+ LOGGER.info("Drop Sentry privilege ignored as there are no privileges on the table: %s.%s", dbName, tableName);
+ } catch (Exception e) {
+ throw new SentryInvalidInputException("Could not process Drop table event. Event: " + event.toString(), e);
+ }
+ }
+
+ private void renamePrivileges(String oldDbName, String oldTableName, String newDbName, String newTableName) throws
+ Exception {
+ TSentryAuthorizable oldAuthorizable = new TSentryAuthorizable(hiveInstance);
+ oldAuthorizable.setDb(oldDbName);
+ oldAuthorizable.setTable(oldTableName);
+ TSentryAuthorizable newAuthorizable = new TSentryAuthorizable(hiveInstance);
+ newAuthorizable.setDb(newDbName);
+ newAuthorizable.setTable(newTableName);
+ Update update =
+ onRenameSentryPrivilege(oldAuthorizable, newAuthorizable);
+ sentryStore.renamePrivilege(oldAuthorizable, newAuthorizable, update);
+ }
+
+ @VisibleForTesting
+ static Update onDropSentryPrivilege(TSentryAuthorizable authorizable) {
+ PermissionsUpdate update = new PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false);
+ String authzObj = getAuthzObj(authorizable);
+ update.addPrivilegeUpdate(authzObj).putToDelPrivileges(PermissionsUpdate.ALL_ROLES, PermissionsUpdate.ALL_ROLES);
+ return update;
+ }
+
+ @VisibleForTesting
+ static Update onRenameSentryPrivilege(TSentryAuthorizable oldAuthorizable,
+ TSentryAuthorizable newAuthorizable)
+ throws SentryPolicyStorePlugin.SentryPluginException {
+ String oldAuthz = getAuthzObj(oldAuthorizable);
+ String newAuthz = getAuthzObj(newAuthorizable);
+ PermissionsUpdate update = new PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false);
+ TPrivilegeChanges privUpdate = update.addPrivilegeUpdate(PermissionsUpdate.RENAME_PRIVS);
+ privUpdate.putToAddPrivileges(newAuthz, newAuthz);
+ privUpdate.putToDelPrivileges(oldAuthz, oldAuthz);
+ return update;
+ }
+
+ public static String getAuthzObj(TSentryAuthorizable authzble) {
+ String authzObj = null;
+ if (!SentryStore.isNULL(authzble.getDb())) {
+ String dbName = authzble.getDb();
+ String tblName = authzble.getTable();
+ if (SentryStore.isNULL(tblName)) {
+ authzObj = dbName;
+ } else {
+ authzObj = dbName + "." + tblName;
+ }
}
+ return authzObj == null ? null : authzObj.toLowerCase();
}
}