You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by co...@apache.org on 2017/11/20 14:47:29 UTC
[8/9] sentry git commit: SENTRY-1640: Implement HMS Notification
barrier on the HMS plugin side (Sergio Pena,
reviewed by kalyan kumar kalvagadda)
SENTRY-1640: Implement HMS Notification barrier on the HMS plugin side (Sergio Pena, reviewed by kalyan kumar kalvagadda)
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/372ffc9b
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/372ffc9b
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/372ffc9b
Branch: refs/heads/akolb-cli
Commit: 372ffc9b4c662b0d076940b7526d2fe58c8a6a09
Parents: 5f64fe9
Author: Sergio Pena <se...@cloudera.com>
Authored: Sun Nov 19 15:12:39 2017 -0600
Committer: Sergio Pena <se...@cloudera.com>
Committed: Sun Nov 19 15:12:39 2017 -0600
----------------------------------------------------------------------
...rySyncHMSNotificationsPostEventListener.java | 230 +++++++++++++++++++
...rySyncHMSNotificationsPostEventListener.java | 161 +++++++++++++
.../thrift/SentryPolicyServiceClient.java | 9 +
.../SentryPolicyServiceClientDefaultImpl.java | 13 ++
4 files changed, 413 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sentry/blob/372ffc9b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentrySyncHMSNotificationsPostEventListener.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentrySyncHMSNotificationsPostEventListener.java b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentrySyncHMSNotificationsPostEventListener.java
new file mode 100644
index 0000000..24d7763
--- /dev/null
+++ b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentrySyncHMSNotificationsPostEventListener.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.binding.metastore;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
+import org.apache.hadoop.hive.metastore.MetaStoreEventListenerConstants;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
+import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
+import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.events.ListenerEvent;
+import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
+import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
+import org.apache.sentry.service.thrift.SentryServiceClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This HMS post-event listener is used only to synchronize with HMS notifications on the Sentry server
+ * whenever a DDL event happens on the Hive metastore.
+ */
+public class SentrySyncHMSNotificationsPostEventListener extends MetaStoreEventListener {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SentrySyncHMSNotificationsPostEventListener.class);
+
+ private final HiveAuthzConf authzConf;
+
+ /*
+ * Latest processed ID by the Sentry server. May only increase.
+ *
+ * This listener will track the latest event ID processed by the Sentry server so that it avoids calling
+ * the sync request in case a late thread attempts to synchronize again an already processed ID.
+ *
+ * The variable is shared across threads, so the AtomicLong variable guarantees that is increased
+ * monotonically.
+ */
+ private final AtomicLong latestProcessedId = new AtomicLong(0);
+
+ /*
+ * A client used for testing purposes only. I
+ *
+ * It may be set by unit-tests as a mock object and used to verify that the client methods
+ * were called correctly (see TestSentrySyncHMSNotificationsPostEventListener)
+ */
+ private SentryPolicyServiceClient serviceClient;
+
+ public SentrySyncHMSNotificationsPostEventListener(Configuration config) {
+ super(config);
+
+ if (!(config instanceof HiveConf)) {
+ String error = "Could not initialize Plugin - Configuration is not an instanceof HiveConf";
+ LOGGER.error(error);
+ throw new RuntimeException(error);
+ }
+
+ authzConf = HiveAuthzConf.getAuthzConf((HiveConf)config);
+ }
+
+ @Override
+ public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
+ syncNotificationEvents(tableEvent, "onCreateTable");
+ }
+
+ @Override
+ public void onDropTable(DropTableEvent tableEvent) throws MetaException {
+ syncNotificationEvents(tableEvent, "onDropTable");
+ }
+
+ @Override
+ public void onAlterTable(AlterTableEvent tableEvent) throws MetaException {
+ // no-op
+ }
+
+ @Override
+ public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaException {
+ // no-op
+ }
+
+ @Override
+ public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException {
+ // no-op
+ }
+
+ @Override
+ public void onAlterPartition(AlterPartitionEvent partitionEvent) throws MetaException {
+ // no-op
+ }
+
+ @Override
+ public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException {
+ syncNotificationEvents(dbEvent, "onCreateDatabase");
+ }
+
+ @Override
+ public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException {
+ syncNotificationEvents(dbEvent, "onDropDatabase");
+ }
+
+ /**
+ * It requests the Sentry server the synchronization of recent notification events.
+ *
+ * After the sync call, the latest processed ID will be stored for future reference to avoid
+ * syncing an ID that was already processed.
+ *
+ * @param event An event that contains a DB_NOTIFICATION_EVENT_ID_KEY_NAME value to request.
+ */
+ private void syncNotificationEvents(ListenerEvent event, String eventName) {
+ // Do not sync notifications if the event has failed.
+ if (failedEvent(event, eventName)) {
+ return;
+ }
+
+ Map<String, String> eventParameters = event.getParameters();
+ if (!eventParameters.containsKey(MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME)) {
+ return;
+ }
+
+ /* If the HMS is running in an active transaction, then we do not want to sync with Sentry
+ * because the desired eventId is not available for Sentry yet, and Sentry may block the HMS
+ * forever or until a read time-out happens. */
+ if (isMetastoreTransactionActive(eventParameters)) {
+ return;
+ }
+
+ long eventId =
+ Long.parseLong(eventParameters.get(MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME));
+
+ // This check is only for performance reasons to avoid calling the sync thrift call if the Sentry server
+ // already processed the requested eventId.
+ if (eventId <= latestProcessedId.get()) {
+ return;
+ }
+
+ try(SentryPolicyServiceClient sentryClient = this.getSentryServiceClient()) {
+ LOGGER.debug("Starting Sentry/HMS notifications sync for {} (id: {})", eventName, eventId);
+ long sentryLatestProcessedId = sentryClient.syncNotifications(eventId);
+ LOGGER.debug("Finished Sentry/HMS notifications sync for {} (id: {})", eventName, eventId);
+ LOGGER.debug("Latest processed event ID returned by the Sentry server: {}", sentryLatestProcessedId);
+
+ updateProcessedId(sentryLatestProcessedId);
+ } catch (Exception e) {
+ // This error is only logged. There is no need to throw an error to Hive because HMS sync is called
+ // after the notification is already generated by Hive (as post-event).
+ LOGGER.error("Failed to sync requested HMS notifications up to the event ID: " + eventId, e);
+ }
+ }
+
+ /**
+ * @return True if the HMS is calling this notification in an active transaction; False otherwise
+ */
+ private boolean isMetastoreTransactionActive(Map<String, String> parameters) {
+ String transactionActive =
+ parameters.get(MetaStoreEventListenerConstants.HIVE_METASTORE_TRANSACTION_ACTIVE);
+
+ return transactionActive != null && Boolean.valueOf(transactionActive);
+ }
+
+ /**
+ * Updates the latest processed ID, if and only if eventId is bigger. This keeps the contract that
+ * {@link #latestProcessedId} may only increase.
+ *
+ * @param eventId The value to be set on the {@link #latestProcessedId}
+ */
+ private void updateProcessedId(long eventId) {
+ long oldVal = latestProcessedId.get();
+ if (eventId > oldVal) {
+ // It is fine for the compareAndSet to fail
+ latestProcessedId.compareAndSet(oldVal, eventId);
+ }
+ }
+
+ /**
+ * Sets the sentry client object (for testing purposes only)
+ *
+ * It may be set by unit-tests as a mock object and used to verify that the client methods
+ * were called correctly (see TestSentrySyncHMSNotificationsPostEventListener).
+ */
+ @VisibleForTesting
+ void setSentryServiceClient(SentryPolicyServiceClient serviceClient) {
+ this.serviceClient = serviceClient;
+ }
+
+ private SentryPolicyServiceClient getSentryServiceClient() throws MetaException {
+ // Return the sentry client in case was set by the unit tests.
+ if (serviceClient != null) {
+ return serviceClient;
+ }
+
+ try {
+ return SentryServiceClientFactory.create(authzConf);
+ } catch (Exception e) {
+ throw new MetaException("Failed to connect to Sentry service " + e.getMessage());
+ }
+ }
+
+ private boolean failedEvent(ListenerEvent event, String eventName) {
+ if (!event.getStatus()) {
+ LOGGER.debug("Skip HMS synchronization request with the Sentry server for {} " +
+ "{} since the operation failed. \n", eventName, event);
+ return true;
+ }
+
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/372ffc9b/sentry-binding/sentry-binding-hive/src/test/java/org/apache/sentry/binding/metastore/TestSentrySyncHMSNotificationsPostEventListener.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-hive/src/test/java/org/apache/sentry/binding/metastore/TestSentrySyncHMSNotificationsPostEventListener.java b/sentry-binding/sentry-binding-hive/src/test/java/org/apache/sentry/binding/metastore/TestSentrySyncHMSNotificationsPostEventListener.java
new file mode 100644
index 0000000..cca326b
--- /dev/null
+++ b/sentry-binding/sentry-binding-hive/src/test/java/org/apache/sentry/binding/metastore/TestSentrySyncHMSNotificationsPostEventListener.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.binding.metastore;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreEventListenerConstants;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
+import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.events.ListenerEvent;
+import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
+import org.apache.sentry.core.common.exception.SentryUserException;
+import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+/**
+ * Testing class that tests and verifies the sync sentry notifications are called correctly.
+ */
+public class TestSentrySyncHMSNotificationsPostEventListener {
+ private static final boolean FAILED_STATUS = false;
+ private static final boolean SUCCESSFUL_STATUS = true;
+ private static final boolean EVENT_ID_SET = true;
+ private static final boolean EVENT_ID_UNSET = false;
+
+ private SentrySyncHMSNotificationsPostEventListener eventListener;
+ private SentryPolicyServiceClient mockSentryClient;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void setUp() throws IOException, MetaException, SentryUserException {
+ String sentryConfFile = tempFolder.newFile().getAbsolutePath();
+
+ HiveConf hiveConf = new HiveConf(TestSentrySyncHMSNotificationsPostEventListener.class);
+ hiveConf.set(HiveAuthzConf.HIVE_SENTRY_CONF_URL, "file://" + sentryConfFile);
+
+ // Instead of generating an empty sentry-site.xml, we just write the same info from HiveConf.
+ // The SentrySyncHMSNotificationsPostEventListener won't use any information from it after all.
+ hiveConf.writeXml(new FileOutputStream(sentryConfFile));
+
+ eventListener = new SentrySyncHMSNotificationsPostEventListener(hiveConf);
+
+ mockSentryClient = Mockito.mock(SentryPolicyServiceClient.class);
+
+ // For some reason I cannot use a Mockito.spy() on the eventListener and just mock the
+ // getSentryServiceClient() to return the mock. When the TestURI runs before this
+ // test, then a mock exception is thrown saying a I have an unfinished stubbing method.
+ // This was the best approach I could take for now.
+ eventListener.setSentryServiceClient(mockSentryClient);
+ }
+
+ @Test
+ public void testFailedEventsDoNotSyncNotifications() throws MetaException, SentryUserException {
+ callAllEventsThatSynchronize(FAILED_STATUS, EVENT_ID_UNSET);
+ Mockito.verifyZeroInteractions(mockSentryClient);
+ }
+
+ @Test
+ public void testEventsWithoutAnEventIdDoNotSyncNotifications() throws MetaException {
+ callAllEventsThatSynchronize(SUCCESSFUL_STATUS, EVENT_ID_UNSET);
+ Mockito.verifyZeroInteractions(mockSentryClient);
+ }
+
+ @Test
+ public void testSuccessfulEventsWithAnEventIdSyncNotifications() throws Exception {
+ long latestEventId = callAllEventsThatSynchronize(SUCCESSFUL_STATUS, EVENT_ID_SET);
+
+ for (int i=1; i<=latestEventId; i++) {
+ Mockito.verify(
+ mockSentryClient, Mockito.times(1)
+ ).syncNotifications(i);
+ }
+
+ Mockito.verify(
+ mockSentryClient, Mockito.times((int)latestEventId)
+ ).close();
+
+ Mockito.verifyNoMoreInteractions(mockSentryClient);
+ }
+
+ @Test
+ public void testSyncNotificationsWithNewLatestProcessedIdMayAvoidSyncingCalls() throws Exception {
+ Mockito.doAnswer(new Answer<Long>() {
+ @Override
+ public Long answer(InvocationOnMock invocation) throws Throwable {
+ Long id = (Long)invocation.getArguments()[0];
+ return id + 1;
+ }
+ }).when(mockSentryClient).syncNotifications(Mockito.anyLong());
+
+ long latestEventId = callAllEventsThatSynchronize(SUCCESSFUL_STATUS, EVENT_ID_SET);
+
+ for (int i=1; i<=latestEventId; i+=2) {
+ Mockito.verify(
+ mockSentryClient, Mockito.times(1)
+ ).syncNotifications(i);
+ }
+
+ Mockito.verify(
+ mockSentryClient, Mockito.times((int)latestEventId / 2)
+ ).close();
+
+ Mockito.verifyNoMoreInteractions(mockSentryClient);
+ }
+
+ private long callAllEventsThatSynchronize(boolean status, boolean eventIdSet) throws MetaException {
+ long eventId = 0;
+
+ CreateDatabaseEvent createDatabaseEvent = new CreateDatabaseEvent(null, status , null);
+ setEventId(eventIdSet, createDatabaseEvent, ++eventId);
+ eventListener.onCreateDatabase(createDatabaseEvent);
+
+ DropDatabaseEvent dropDatabaseEvent = new DropDatabaseEvent(null, status , null);
+ setEventId(eventIdSet, dropDatabaseEvent, ++eventId);
+ eventListener.onDropDatabase(dropDatabaseEvent);
+
+ CreateTableEvent createTableEvent = new CreateTableEvent(null, status , null);
+ setEventId(eventIdSet, createTableEvent, ++eventId);
+ eventListener.onCreateTable(createTableEvent);
+
+ DropTableEvent dropTableEvent = new DropTableEvent(null, status , false, null);
+ setEventId(eventIdSet, dropTableEvent, ++eventId);
+ eventListener.onDropTable(dropTableEvent);
+
+ return eventId;
+ }
+
+ private void setEventId(boolean eventIdSet, ListenerEvent eventListener, long eventId) {
+ if (eventIdSet) {
+ eventListener.putParameter(
+ MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME, String.valueOf(eventId));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sentry/blob/372ffc9b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
index 61833fc..f69a8cd 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
@@ -215,4 +215,13 @@ public interface SentryPolicyServiceClient extends AutoCloseable {
// export the sentry mapping data with map structure
Map<String, Map<String, Set<String>>> exportPolicy(String requestorUserName, String objectPath)
throws SentryUserException;
+
+ /**
+ * Requests the sentry server to synchronize all HMS notification events up to the specified id.
+ * The sentry server will return once it have processed the id specified..
+ *
+ * @param id Requested HMS notification ID.
+ * @return The most recent processed notification ID.
+ */
+ long syncNotifications(long id) throws SentryUserException;
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/372ffc9b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
index 7ada138..bede5b1 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
@@ -1065,4 +1065,17 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
transport = null;
}
}
+
+ public long syncNotifications(long id) throws SentryUserException {
+ TSentrySyncIDRequest request =
+ new TSentrySyncIDRequest(ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT, id);
+
+ try {
+ TSentrySyncIDResponse response = client.sentry_sync_notifications(request);
+ Status.throwIfNotOk(response.getStatus());
+ return response.getId();
+ } catch (TException e) {
+ throw new SentryUserException(THRIFT_EXCEPTION_MESSAGE, e);
+ }
+ }
}
\ No newline at end of file