You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by sp...@apache.org on 2017/11/19 21:13:47 UTC

sentry git commit: SENTRY-1640: Implement HMS Notification barrier on the HMS plugin side (Sergio Pena, reviewed by kalyan kumar kalvagadda)

Repository: sentry
Updated Branches:
  refs/heads/master 5f64fe9f3 -> 372ffc9b4


SENTRY-1640: Implement HMS Notification barrier on the HMS plugin side (Sergio Pena, reviewed by kalyan kumar kalvagadda)


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/372ffc9b
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/372ffc9b
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/372ffc9b

Branch: refs/heads/master
Commit: 372ffc9b4c662b0d076940b7526d2fe58c8a6a09
Parents: 5f64fe9
Author: Sergio Pena <se...@cloudera.com>
Authored: Sun Nov 19 15:12:39 2017 -0600
Committer: Sergio Pena <se...@cloudera.com>
Committed: Sun Nov 19 15:12:39 2017 -0600

----------------------------------------------------------------------
 ...rySyncHMSNotificationsPostEventListener.java | 230 +++++++++++++++++++
 ...rySyncHMSNotificationsPostEventListener.java | 161 +++++++++++++
 .../thrift/SentryPolicyServiceClient.java       |   9 +
 .../SentryPolicyServiceClientDefaultImpl.java   |  13 ++
 4 files changed, 413 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/372ffc9b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentrySyncHMSNotificationsPostEventListener.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentrySyncHMSNotificationsPostEventListener.java b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentrySyncHMSNotificationsPostEventListener.java
new file mode 100644
index 0000000..24d7763
--- /dev/null
+++ b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentrySyncHMSNotificationsPostEventListener.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.binding.metastore;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
+import org.apache.hadoop.hive.metastore.MetaStoreEventListenerConstants;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
+import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
+import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.events.ListenerEvent;
+import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
+import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
+import org.apache.sentry.service.thrift.SentryServiceClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This HMS post-event listener is used only to synchronize with HMS notifications on the Sentry server
+ * whenever a DDL event happens on the Hive metastore.
+ */
+public class SentrySyncHMSNotificationsPostEventListener extends MetaStoreEventListener {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SentrySyncHMSNotificationsPostEventListener.class);
+
+  private final HiveAuthzConf authzConf;
+
+  /*
+   * Latest processed ID by the Sentry server. May only increase.
+   *
+   * This listener will track the latest event ID processed by the Sentry server so that it avoids calling
+   * the sync request in case a late thread attempts to synchronize again an already processed ID.
+   *
+   * The variable is shared across threads, so the AtomicLong variable guarantees that is increased
+   * monotonically.
+   */
+  private final AtomicLong latestProcessedId = new AtomicLong(0);
+
+  /*
+   * A client used for testing purposes only. I
+   *
+   * It may be set by unit-tests as a mock object and used to verify that the client methods
+   * were called correctly (see TestSentrySyncHMSNotificationsPostEventListener)
+   */
+  private SentryPolicyServiceClient serviceClient;
+
+  public SentrySyncHMSNotificationsPostEventListener(Configuration config) {
+    super(config);
+
+    if (!(config instanceof HiveConf)) {
+      String error = "Could not initialize Plugin - Configuration is not an instanceof HiveConf";
+      LOGGER.error(error);
+      throw new RuntimeException(error);
+    }
+
+    authzConf = HiveAuthzConf.getAuthzConf((HiveConf)config);
+  }
+
+  @Override
+  public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
+    syncNotificationEvents(tableEvent, "onCreateTable");
+  }
+
+  @Override
+  public void onDropTable(DropTableEvent tableEvent) throws MetaException {
+    syncNotificationEvents(tableEvent, "onDropTable");
+  }
+
+  @Override
+  public void onAlterTable(AlterTableEvent tableEvent) throws MetaException {
+    // no-op
+  }
+
+  @Override
+  public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaException {
+    // no-op
+  }
+
+  @Override
+  public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException {
+    // no-op
+  }
+
+  @Override
+  public void onAlterPartition(AlterPartitionEvent partitionEvent) throws MetaException {
+    // no-op
+  }
+
+  @Override
+  public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException {
+    syncNotificationEvents(dbEvent, "onCreateDatabase");
+  }
+
+  @Override
+  public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException {
+    syncNotificationEvents(dbEvent, "onDropDatabase");
+  }
+
+  /**
+   * It requests the Sentry server the synchronization of recent notification events.
+   *
+   * After the sync call, the latest processed ID will be stored for future reference to avoid
+   * syncing an ID that was already processed.
+   *
+   * @param event An event that contains a DB_NOTIFICATION_EVENT_ID_KEY_NAME value to request.
+   */
+  private void syncNotificationEvents(ListenerEvent event, String eventName) {
+    // Do not sync notifications if the event has failed.
+    if (failedEvent(event, eventName)) {
+      return;
+    }
+
+    Map<String, String> eventParameters = event.getParameters();
+    if (!eventParameters.containsKey(MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME)) {
+      return;
+    }
+
+    /* If the HMS is running in an active transaction, then we do not want to sync with Sentry
+     * because the desired eventId is not available for Sentry yet, and Sentry may block the HMS
+     * forever or until a read time-out happens. */
+    if (isMetastoreTransactionActive(eventParameters)) {
+      return;
+    }
+
+    long eventId =
+        Long.parseLong(eventParameters.get(MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME));
+
+    // This check is only for performance reasons to avoid calling the sync thrift call if the Sentry server
+    // already processed the requested eventId.
+    if (eventId <= latestProcessedId.get()) {
+      return;
+    }
+
+    try(SentryPolicyServiceClient sentryClient = this.getSentryServiceClient()) {
+      LOGGER.debug("Starting Sentry/HMS notifications sync for {} (id: {})", eventName, eventId);
+      long sentryLatestProcessedId = sentryClient.syncNotifications(eventId);
+      LOGGER.debug("Finished Sentry/HMS notifications sync for {} (id: {})", eventName, eventId);
+      LOGGER.debug("Latest processed event ID returned by the Sentry server: {}", sentryLatestProcessedId);
+
+      updateProcessedId(sentryLatestProcessedId);
+    } catch (Exception e) {
+      // This error is only logged. There is no need to throw an error to Hive because HMS sync is called
+      // after the notification is already generated by Hive (as post-event).
+      LOGGER.error("Failed to sync requested HMS notifications up to the event ID: " + eventId, e);
+    }
+  }
+
+  /**
+   * @return True if the HMS is calling this notification in an active transaction; False otherwise
+   */
+  private boolean isMetastoreTransactionActive(Map<String, String> parameters) {
+    String transactionActive =
+        parameters.get(MetaStoreEventListenerConstants.HIVE_METASTORE_TRANSACTION_ACTIVE);
+
+    return transactionActive != null && Boolean.valueOf(transactionActive);
+  }
+
+  /**
+   * Updates the latest processed ID, if and only if eventId is bigger. This keeps the contract that
+   * {@link #latestProcessedId} may only increase.
+   *
+   * @param eventId The value to be set on the {@link #latestProcessedId}
+   */
+  private void updateProcessedId(long eventId) {
+    long oldVal = latestProcessedId.get();
+    if (eventId > oldVal) {
+      // It is fine for the compareAndSet to fail
+      latestProcessedId.compareAndSet(oldVal, eventId);
+    }
+  }
+
+  /**
+   * Sets the sentry client object (for testing purposes only)
+   *
+   * It may be set by unit-tests as a mock object and used to verify that the client methods
+   * were called correctly (see TestSentrySyncHMSNotificationsPostEventListener).
+   */
+  @VisibleForTesting
+  void setSentryServiceClient(SentryPolicyServiceClient serviceClient) {
+    this.serviceClient = serviceClient;
+  }
+
+  private SentryPolicyServiceClient getSentryServiceClient() throws MetaException {
+    // Return the sentry client in case was set by the unit tests.
+    if (serviceClient != null) {
+      return serviceClient;
+    }
+
+    try {
+      return SentryServiceClientFactory.create(authzConf);
+    } catch (Exception e) {
+      throw new MetaException("Failed to connect to Sentry service " + e.getMessage());
+    }
+  }
+
+  private boolean failedEvent(ListenerEvent event, String eventName) {
+    if (!event.getStatus()) {
+      LOGGER.debug("Skip HMS synchronization request with the Sentry server for {} " +
+          "{} since the operation failed. \n", eventName, event);
+      return true;
+    }
+
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/372ffc9b/sentry-binding/sentry-binding-hive/src/test/java/org/apache/sentry/binding/metastore/TestSentrySyncHMSNotificationsPostEventListener.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-hive/src/test/java/org/apache/sentry/binding/metastore/TestSentrySyncHMSNotificationsPostEventListener.java b/sentry-binding/sentry-binding-hive/src/test/java/org/apache/sentry/binding/metastore/TestSentrySyncHMSNotificationsPostEventListener.java
new file mode 100644
index 0000000..cca326b
--- /dev/null
+++ b/sentry-binding/sentry-binding-hive/src/test/java/org/apache/sentry/binding/metastore/TestSentrySyncHMSNotificationsPostEventListener.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.binding.metastore;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreEventListenerConstants;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
+import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.events.ListenerEvent;
+import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
+import org.apache.sentry.core.common.exception.SentryUserException;
+import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+/**
+ * Testing class that tests and verifies the sync sentry notifications are called correctly.
+ */
+public class TestSentrySyncHMSNotificationsPostEventListener {
+  private static final boolean FAILED_STATUS = false;
+  private static final boolean SUCCESSFUL_STATUS = true;
+  private static final boolean EVENT_ID_SET = true;
+  private static final boolean EVENT_ID_UNSET = false;
+
+  private SentrySyncHMSNotificationsPostEventListener eventListener;
+  private SentryPolicyServiceClient mockSentryClient;
+
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  @Before
+  public void setUp() throws IOException, MetaException, SentryUserException {
+    String sentryConfFile = tempFolder.newFile().getAbsolutePath();
+
+    HiveConf hiveConf = new HiveConf(TestSentrySyncHMSNotificationsPostEventListener.class);
+    hiveConf.set(HiveAuthzConf.HIVE_SENTRY_CONF_URL, "file://" + sentryConfFile);
+
+    // Instead of generating an empty sentry-site.xml, we just write the same info from HiveConf.
+    // The SentrySyncHMSNotificationsPostEventListener won't use any information from it after all.
+    hiveConf.writeXml(new FileOutputStream(sentryConfFile));
+
+    eventListener = new SentrySyncHMSNotificationsPostEventListener(hiveConf);
+
+    mockSentryClient = Mockito.mock(SentryPolicyServiceClient.class);
+
+    // For some reason I cannot use a Mockito.spy() on the eventListener and just mock the
+    // getSentryServiceClient() to return the mock. When the TestURI runs before this
+    // test, then a mock exception is thrown saying a I have an unfinished stubbing method.
+    // This was the best approach I could take for now.
+    eventListener.setSentryServiceClient(mockSentryClient);
+  }
+
+  @Test
+  public void testFailedEventsDoNotSyncNotifications() throws MetaException, SentryUserException {
+    callAllEventsThatSynchronize(FAILED_STATUS, EVENT_ID_UNSET);
+    Mockito.verifyZeroInteractions(mockSentryClient);
+  }
+
+  @Test
+  public void testEventsWithoutAnEventIdDoNotSyncNotifications() throws MetaException {
+    callAllEventsThatSynchronize(SUCCESSFUL_STATUS, EVENT_ID_UNSET);
+    Mockito.verifyZeroInteractions(mockSentryClient);
+  }
+
+  @Test
+  public void testSuccessfulEventsWithAnEventIdSyncNotifications() throws Exception {
+    long latestEventId = callAllEventsThatSynchronize(SUCCESSFUL_STATUS, EVENT_ID_SET);
+
+    for (int i=1; i<=latestEventId; i++) {
+      Mockito.verify(
+          mockSentryClient, Mockito.times(1)
+      ).syncNotifications(i);
+    }
+
+    Mockito.verify(
+        mockSentryClient, Mockito.times((int)latestEventId)
+    ).close();
+
+    Mockito.verifyNoMoreInteractions(mockSentryClient);
+  }
+
+  @Test
+  public void testSyncNotificationsWithNewLatestProcessedIdMayAvoidSyncingCalls() throws Exception {
+    Mockito.doAnswer(new Answer<Long>() {
+      @Override
+      public Long answer(InvocationOnMock invocation) throws Throwable {
+        Long id = (Long)invocation.getArguments()[0];
+        return id + 1;
+      }
+    }).when(mockSentryClient).syncNotifications(Mockito.anyLong());
+
+    long latestEventId = callAllEventsThatSynchronize(SUCCESSFUL_STATUS, EVENT_ID_SET);
+
+    for (int i=1; i<=latestEventId; i+=2) {
+      Mockito.verify(
+          mockSentryClient, Mockito.times(1)
+      ).syncNotifications(i);
+    }
+
+    Mockito.verify(
+        mockSentryClient, Mockito.times((int)latestEventId / 2)
+    ).close();
+
+    Mockito.verifyNoMoreInteractions(mockSentryClient);
+  }
+
+  private long callAllEventsThatSynchronize(boolean status, boolean eventIdSet) throws MetaException {
+    long eventId = 0;
+
+    CreateDatabaseEvent createDatabaseEvent = new CreateDatabaseEvent(null, status , null);
+    setEventId(eventIdSet, createDatabaseEvent, ++eventId);
+    eventListener.onCreateDatabase(createDatabaseEvent);
+
+    DropDatabaseEvent dropDatabaseEvent = new DropDatabaseEvent(null, status , null);
+    setEventId(eventIdSet, dropDatabaseEvent, ++eventId);
+    eventListener.onDropDatabase(dropDatabaseEvent);
+
+    CreateTableEvent createTableEvent = new CreateTableEvent(null, status , null);
+    setEventId(eventIdSet, createTableEvent, ++eventId);
+    eventListener.onCreateTable(createTableEvent);
+
+    DropTableEvent dropTableEvent = new DropTableEvent(null, status , false, null);
+    setEventId(eventIdSet, dropTableEvent, ++eventId);
+    eventListener.onDropTable(dropTableEvent);
+
+    return eventId;
+  }
+
+  private void setEventId(boolean eventIdSet, ListenerEvent eventListener, long eventId) {
+    if (eventIdSet) {
+      eventListener.putParameter(
+          MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME, String.valueOf(eventId));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/372ffc9b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
index 61833fc..f69a8cd 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClient.java
@@ -215,4 +215,13 @@ public interface SentryPolicyServiceClient extends AutoCloseable {
   // export the sentry mapping data with map structure
   Map<String, Map<String, Set<String>>> exportPolicy(String requestorUserName, String objectPath)
       throws SentryUserException;
+
+  /**
+   * Requests the sentry server to synchronize all HMS notification events up to the specified id.
+   * The sentry server will return once it have processed the id specified..
+   *
+   * @param id Requested HMS notification ID.
+   * @return The most recent processed notification ID.
+   */
+  long syncNotifications(long id) throws SentryUserException;
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/372ffc9b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
index 7ada138..bede5b1 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyServiceClientDefaultImpl.java
@@ -1065,4 +1065,17 @@ public class SentryPolicyServiceClientDefaultImpl implements SentryPolicyService
       transport = null;
     }
   }
+
+  public long syncNotifications(long id) throws SentryUserException {
+    TSentrySyncIDRequest request =
+        new TSentrySyncIDRequest(ThriftConstants.TSENTRY_SERVICE_VERSION_CURRENT, id);
+
+    try {
+      TSentrySyncIDResponse response = client.sentry_sync_notifications(request);
+      Status.throwIfNotOk(response.getStatus());
+      return response.getId();
+    } catch (TException e) {
+      throw new SentryUserException(THRIFT_EXCEPTION_MESSAGE, e);
+    }
+  }
 }
\ No newline at end of file