You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ak...@apache.org on 2017/07/28 17:07:01 UTC

[43/50] [abbrv] sentry git commit: SENTRY-1760: HMSFollower should detect when a full snapshot from HMS is required (Sergio Pena, reviewed by Alex Kolbasov, Brian Towels)

SENTRY-1760: HMSFollower should detect when a full snapshot from HMS is required (Sergio Pena, reviewed by Alex Kolbasov, Brian Towels)


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/ca315fe9
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/ca315fe9
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/ca315fe9

Branch: refs/heads/master
Commit: ca315fe971d32a9ca87183746c9dd1744a76e8d4
Parents: eed5383
Author: Alexander Kolbasov <ak...@cloudera.com>
Authored: Thu Jul 20 19:37:28 2017 +0200
Committer: Alexander Kolbasov <ak...@cloudera.com>
Committed: Thu Jul 20 19:37:28 2017 +0200

----------------------------------------------------------------------
 .../sentry/service/thrift/HMSFollower.java      | 118 +++++++++++++--
 .../sentry/service/thrift/SentryHMSClient.java  |  38 +++--
 .../sentry/service/thrift/TestHMSFollower.java  | 146 +++++++++++++++++++
 3 files changed, 278 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/ca315fe9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
index 083ee4c..547a61f 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
@@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting;
 import java.net.SocketException;
 
 import java.util.Collection;
+import java.util.List;
 import javax.jdo.JDODataStoreException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -44,7 +45,7 @@ public class HMSFollower implements Runnable, AutoCloseable {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(HMSFollower.class);
   private static boolean connectedToHms = false;
-  private final SentryHMSClient client;
+  private SentryHMSClient client;
   private final Configuration authzConf;
   private final SentryStore sentryStore;
   private final NotificationProcessor notificationProcessor;
@@ -91,6 +92,11 @@ public class HMSFollower implements Runnable, AutoCloseable {
     return connectedToHms;
   }
 
+  @VisibleForTesting
+  void setSentryHmsClient(SentryHMSClient client) {
+    this.client = client;
+  }
+
   @Override
   public void close() {
     if (client != null) {
@@ -117,7 +123,7 @@ public class HMSFollower implements Runnable, AutoCloseable {
     // Wake any clients connected to this service waiting for HMS already processed notifications.
     wakeUpWaitingClientsForSync(lastProcessedNotificationId);
     // Only the leader should listen to HMS updates
-    if ((leaderMonitor != null) && !leaderMonitor.isLeader()) {
+    if (!isLeader()) {
       // Close any outstanding connections to HMS
       close();
       return;
@@ -125,6 +131,10 @@ public class HMSFollower implements Runnable, AutoCloseable {
     syncupWithHms(lastProcessedNotificationId);
   }
 
+  private boolean isLeader() {
+    return (leaderMonitor == null) || leaderMonitor.isLeader();
+  }
+
   /**
    * Processes new Hive Metastore notifications.
    *
@@ -145,18 +155,23 @@ public class HMSFollower implements Runnable, AutoCloseable {
     }
 
     try {
-      long lastProcessedNotificationId = notificationId;
-      // Create a full HMS snapshot if there is none
-      // Decision of taking full snapshot is based on AuthzPathsMapping information persisted
-      // in the sentry persistent store. If AuthzPathsMapping is empty, snapshot is needed.
-      if (sentryStore.isAuthzPathsMappingEmpty()) {
-        lastProcessedNotificationId = createFullSnapshot();
-        if (lastProcessedNotificationId == SentryStore.EMPTY_NOTIFICATION_ID) {
-          return;
-        }
+      /* Before getting notifications, it checks if a full HMS snapshot is required. */
+      if (isFullSnapshotRequired(notificationId)) {
+        createFullSnapshot();
+        return;
+      }
+
+      Collection<NotificationEvent> notifications = client.getNotifications(notificationId);
+
+      // After getting notifications, it checks if the HMS did some clean-up and notifications
+      // are out-of-sync with Sentry.
+      if (areNotificationsOutOfSync(notifications, notificationId)) {
+        createFullSnapshot();
+        return;
       }
-      // Get the new notification from HMS and process them.
-      processNotifications(client.getNotifications(lastProcessedNotificationId));
+
+      // Continue with processing new notifications if no snapshots are done.
+      processNotifications(notifications);
     } catch (TException e) {
       // If the underlying exception is around socket exception,
       // it is better to retry connection to HMS
@@ -175,6 +190,75 @@ public class HMSFollower implements Runnable, AutoCloseable {
   }
 
   /**
+   * Checks if a new full HMS snapshot request is needed by checking if:
+   * <ul>
+   *   <li>No snapshots has been persisted yet.</li>
+   *   <li>The current notification Id on the HMS is less than the
+   *   latest processed by Sentry.</li>
+   * </ul>
+   *
+   * @param latestSentryNotificationId The notification Id to check against the HMS
+   * @return True if a full snapshot is required; False otherwise.
+   * @throws Exception If an error occurs while checking the SentryStore or the HMS client.
+   */
+  private boolean isFullSnapshotRequired(long latestSentryNotificationId) throws Exception {
+    if (sentryStore.isAuthzPathsMappingEmpty()) {
+      return true;
+    }
+
+    long currentHmsNotificationId = client.getCurrentNotificationId();
+    if (currentHmsNotificationId < latestSentryNotificationId) {
+      LOGGER.info("The latest notification ID on HMS is less than the latest notification ID "
+          + "processed by Sentry. Need to request a full HMS snapshot.");
+      return true;
+    }
+
+    return false;
+  }
+
+  /**
+   * Checks if the HMS and Sentry processed notifications are out-of-sync.
+   * This could happen because the HMS did some clean-up of old notifications
+   * and Sentry was not requesting notifications during that time.
+   *
+   * @param events All new notifications to check for an out-of-sync.
+   * @param latestProcessedId The latest notification processed by Sentry to check against the
+   *        list of notifications events.
+   * @return True if an out-of-sync is found; False otherwise.
+   */
+  private boolean areNotificationsOutOfSync(Collection<NotificationEvent> events,
+      long latestProcessedId) {
+    if (events.isEmpty()) {
+      return false;
+    }
+
+    List<NotificationEvent> eventList = (List<NotificationEvent>) events;
+    long firstNotificationId = eventList.get(0).getEventId();
+    long lastNotificationId = eventList.get(eventList.size() - 1).getEventId();
+
+    /* If the next expected notification is not available, then an out-of-sync might
+     * have happened due to the following issue:
+     *
+     * - HDFS sync was disabled or Sentry was shutdown for a time period longer than
+     *   the HMS notification clean-up thread causing old notifications to be deleted.
+     */
+    if ((latestProcessedId + 1) != firstNotificationId) {
+      LOGGER.info("Current HMS notifications are out-of-sync with latest Sentry processed"
+          + "notifications. Need to request a full HMS snapshot.");
+      return true;
+    }
+
+    long expectedSize = lastNotificationId - latestProcessedId;
+    if (expectedSize < eventList.size()) {
+      LOGGER.info("The HMS notifications fetched has some gaps in the # of events received. These"
+          + "should not cause an out-of-sync issue. (expected = {}, fetched = {})",
+          expectedSize, eventList.size());
+    }
+
+    return false;
+  }
+
+  /**
    * Request for full snapshot and persists it if there is no snapshot available in the
    * sentry store. Also, wakes-up any waiting clients.
    *
@@ -187,6 +271,12 @@ public class HMSFollower implements Runnable, AutoCloseable {
     if (snapshotInfo.getPathImage().isEmpty()) {
       return snapshotInfo.getId();
     }
+
+    // Check we're still the leader before persisting the new snapshot
+    if (!isLeader()) {
+      return SentryStore.EMPTY_NOTIFICATION_ID;
+    }
+
     try {
       LOGGER.debug("Persisting HMS path full snapshot");
       sentryStore.persistFullPathsImage(snapshotInfo.getPathImage());
@@ -220,7 +310,7 @@ public class HMSFollower implements Runnable, AutoCloseable {
       isNotificationProcessed = false;
       try {
         // Only the leader should process the notifications
-        if ((leaderMonitor != null) && !leaderMonitor.isLeader()) {
+        if (!isLeader()) {
           return;
         }
         isNotificationProcessed = notificationProcessor.processNotificationEvent(event);

http://git-wip-us.apache.org/repos/asf/sentry/blob/ca315fe9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java
index 05518e8..4f76a94 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java
@@ -54,7 +54,7 @@ import org.slf4j.LoggerFactory;
  * <p>Abstracts communication with HMS and exposes APi's to connect/disconnect to HMS and to
  * request HMS snapshots and also for new notifications.
  */
-final class SentryHMSClient implements AutoCloseable {
+class SentryHMSClient implements AutoCloseable {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(SentryHMSClient.class);
   private final Configuration conf;
@@ -213,27 +213,27 @@ final class SentryHMSClient implements AutoCloseable {
       LOGGER.error("Client is not connected to HMS");
       return Collections.emptyList();
     }
+
     LOGGER.debug("Checking for notifications beyond {}", notificationId);
-    // HIVE-15761: Currently getNextNotification API may return an empty
-    // NotificationEventResponse causing TProtocolException.
-    // Workaround: Only processes the notification events newer than the last updated one.
+
+    // A bug HIVE-15761 (fixed on Hive 2.4.0) should allow requesting notifications using
+    // an unprocessed notification ID without causing an exception. For now, we just
+    // leave this workaround and log debug messages.
     CurrentNotificationEventId eventId = client.getCurrentNotificationEventId();
     LOGGER.debug("ID of Last HMS notifications is: {}", eventId.getEventId());
-    if (eventId.getEventId() < notificationId) {
-      LOGGER.error("Last notification of HMS is smaller than what sentry processed, Something is"
+    if (eventId != null && eventId.getEventId() < notificationId) {
+      LOGGER.debug("Last notification of HMS is smaller than what sentry processed, Something is"
           + "wrong. Sentry will request a full Snapshot");
-      // TODO Path Mapping info should be cleared so that HMSFollower would request for full
-      // snapshot in the subsequent run.
       return Collections.emptyList();
     }
 
-    if (eventId.getEventId() == notificationId) {
+    if (eventId != null && eventId.getEventId() == notificationId) {
       return Collections.emptyList();
     }
 
     NotificationEventResponse response =
         client.getNextNotification(notificationId, Integer.MAX_VALUE, null);
-    if (response.isSetEvents()) {
+    if (response != null && response.isSetEvents()) {
       LOGGER.debug("Last Id processed:{}. Received collection of notifications, Size:{}",
           notificationId, response.getEvents().size());
       return response.getEvents();
@@ -241,4 +241,22 @@ final class SentryHMSClient implements AutoCloseable {
 
     return Collections.emptyList();
   }
+
+  /**
+   * @return the latest notification Id logged by the HMS
+   * @throws Exception when an error occurs when talking to the HMS client
+   */
+  public long getCurrentNotificationId() throws Exception {
+    if (client == null) {
+      LOGGER.error("Client is not connected to HMS");
+      return SentryStore.EMPTY_NOTIFICATION_ID;
+    }
+
+    CurrentNotificationEventId eventId = client.getCurrentNotificationEventId();
+    if (eventId.isSetEventId()) {
+      return eventId.getEventId();
+    }
+
+    return SentryStore.EMPTY_NOTIFICATION_ID;
+  }
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/ca315fe9/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
index d67c162..fdf52bf 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
@@ -20,10 +20,14 @@ import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -36,6 +40,7 @@ import org.apache.hive.hcatalog.messaging.HCatEventMessage;
 import org.apache.hive.hcatalog.messaging.HCatEventMessage.EventType;
 import org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageFactory;
 import org.apache.sentry.hdfs.Updateable;
+import org.apache.sentry.provider.db.service.persistent.PathsImage;
 import org.apache.sentry.provider.db.service.persistent.SentryStore;
 import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable;
 import org.junit.BeforeClass;
@@ -60,6 +65,147 @@ public class TestHMSFollower {
     configuration.set("sentry.hive.sync.create", "true");
   }
 
+  @Test
+  public void testPersistAFullSnapshotWhenNoSnapshotAreProcessedYet() throws Exception {
+    /*
+     * TEST CASE
+     *
+     * Simulates (by using mocks) that Sentry has not processed any notifications, so this
+     * should trigger a new full HMS snapshot request with the eventId = 1
+     */
+
+    final long SENTRY_PROCESSED_EVENT_ID = SentryStore.EMPTY_NOTIFICATION_ID;
+    final long HMS_PROCESSED_EVENT_ID = 1L;
+
+    // Mock that returns a full snapshot
+    Map<String, Set<String>> snapshotObjects = new HashMap<>();
+    snapshotObjects.put("db", Sets.newHashSet("/db"));
+    snapshotObjects.put("db.table", Sets.newHashSet("/db/table"));
+    PathsImage fullSnapshot = new PathsImage(snapshotObjects, HMS_PROCESSED_EVENT_ID, 1);
+
+    SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class);
+    Mockito.when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot);
+    Mockito.when(sentryHmsClient.getCurrentNotificationId()).thenReturn(fullSnapshot.getId());
+
+    HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
+        hiveConnectionFactory, hiveInstance);
+    hmsFollower.setSentryHmsClient(sentryHmsClient);
+
+    // 1st run should get a full snapshot because AuthzPathsMapping is empty
+    Mockito.when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID);
+    Mockito.when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(true);
+    hmsFollower.run();
+    Mockito.verify(sentryStore, times(1)).persistFullPathsImage(fullSnapshot.getPathImage());
+    Mockito.verify(sentryStore, times(1)).persistLastProcessedNotificationID(fullSnapshot.getId());
+
+    Mockito.reset(sentryStore);
+
+    // 2nd run should not get a snapshot because is already processed
+    Mockito.when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId());
+    Mockito.when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
+    hmsFollower.run();
+    Mockito.verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap());
+    Mockito.verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
+  }
+
+  @Test
+  public void testPersistAFullSnapshotWhenLastHmsNotificationIsLowerThanLastProcessed()
+      throws Exception {
+    /*
+     * TEST CASE
+     *
+     * Simulates (by using mocks) that Sentry already processed (and persisted) a notification
+     * with Id = 5, but the latest notification processed by the HMS is eventId = 1. So, an
+     * out-of-sync issue is happening on Sentry and HMS. This out-of-sync issue should trigger
+     * a new full HMS snapshot request with the same eventId = 1;
+     */
+
+    final long SENTRY_PROCESSED_EVENT_ID = 5L;
+    final long HMS_PROCESSED_EVENT_ID = 1L;
+
+    // Mock that returns a full snapshot
+    Map<String, Set<String>> snapshotObjects = new HashMap<>();
+    snapshotObjects.put("db", Sets.newHashSet("/db"));
+    snapshotObjects.put("db.table", Sets.newHashSet("/db/table"));
+    PathsImage fullSnapshot = new PathsImage(snapshotObjects, HMS_PROCESSED_EVENT_ID, 1);
+
+    SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class);
+    Mockito.when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot);
+    Mockito.when(sentryHmsClient.getCurrentNotificationId()).thenReturn(fullSnapshot.getId());
+
+    HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
+        hiveConnectionFactory, hiveInstance);
+    hmsFollower.setSentryHmsClient(sentryHmsClient);
+
+    // 1st run should get a full snapshot
+    Mockito.when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID);
+    Mockito.when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
+    hmsFollower.run();
+    Mockito.verify(sentryStore, times(1)).persistFullPathsImage(Mockito.anyMap());
+    Mockito.verify(sentryStore, times(1)).persistLastProcessedNotificationID(Mockito.anyLong());
+
+    Mockito.reset(sentryStore);
+
+    // 2nd run should not get a snapshot because is already processed
+    Mockito.when(sentryStore.getLastProcessedNotificationID()).thenReturn(HMS_PROCESSED_EVENT_ID);
+    Mockito.when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
+    hmsFollower.run();
+    Mockito.verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap());
+    Mockito.verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
+  }
+
+  @Test
+  public void testPersistAFullSnapshotWhenNextExpectedEventIsNotAvailable() throws Exception {
+    /*
+     * TEST CASE
+     *
+     * Simulates (by using mocks) that Sentry already processed (and persisted) a notification
+     * with Id = 1, and the latest notification processed by the HMS is eventId = 5. So, new
+     * notifications should be fetched.
+     *
+     * The number of expected notifications should be 4, but we simulate that we fetch only one
+     * notification with eventId = 5 causing an out-of-sync because the expected notificatoin
+     * should be 2. This out-of-sync should trigger a new full HMS snapshot request with the
+     * same eventId = 5.
+     */
+
+    final long SENTRY_PROCESSED_EVENT_ID = 1L;
+    final long HMS_PROCESSED_EVENT_ID = 5L;
+
+    // Mock that returns a full snapshot
+    Map<String, Set<String>> snapshotObjects = new HashMap<>();
+    snapshotObjects.put("db", Sets.newHashSet("/db"));
+    snapshotObjects.put("db.table", Sets.newHashSet("/db/table"));
+    PathsImage fullSnapshot = new PathsImage(snapshotObjects, HMS_PROCESSED_EVENT_ID, 1);
+
+    SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class);
+    Mockito.when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot);
+    Mockito.when(sentryHmsClient.getCurrentNotificationId()).thenReturn(fullSnapshot.getId());
+    Mockito.when(sentryHmsClient.getNotifications(SENTRY_PROCESSED_EVENT_ID))
+        .thenReturn(Collections.singletonList(
+            new NotificationEvent(fullSnapshot.getId(), 0, "", "")));
+
+    HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
+        hiveConnectionFactory, hiveInstance);
+    hmsFollower.setSentryHmsClient(sentryHmsClient);
+
+    // 1st run should get a full snapshot
+    Mockito.when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID);
+    Mockito.when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
+    hmsFollower.run();
+    Mockito.verify(sentryStore, times(1)).persistFullPathsImage(Mockito.anyMap());
+    Mockito.verify(sentryStore, times(1)).persistLastProcessedNotificationID(Mockito.anyLong());
+
+    Mockito.reset(sentryStore);
+
+    // 2nd run should not get a snapshot because is already processed
+    Mockito.when(sentryStore.getLastProcessedNotificationID()).thenReturn(HMS_PROCESSED_EVENT_ID);
+    Mockito.when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
+    hmsFollower.run();
+    Mockito.verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap());
+    Mockito.verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
+  }
+
   /**
    * Constructs create database event and makes sure that appropriate sentry store API's
    * are invoke when the event is processed by hms follower.