You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ak...@apache.org on 2017/07/28 17:07:01 UTC
[43/50] [abbrv] sentry git commit: SENTRY-1760: HMSFollower should
detect when a full snapshot from HMS is required (Sergio Pena,
reviewed by Alex Kolbasov, Brian Towels)
SENTRY-1760: HMSFollower should detect when a full snapshot from HMS is required (Sergio Pena, reviewed by Alex Kolbasov, Brian Towels)
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/ca315fe9
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/ca315fe9
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/ca315fe9
Branch: refs/heads/master
Commit: ca315fe971d32a9ca87183746c9dd1744a76e8d4
Parents: eed5383
Author: Alexander Kolbasov <ak...@cloudera.com>
Authored: Thu Jul 20 19:37:28 2017 +0200
Committer: Alexander Kolbasov <ak...@cloudera.com>
Committed: Thu Jul 20 19:37:28 2017 +0200
----------------------------------------------------------------------
.../sentry/service/thrift/HMSFollower.java | 118 +++++++++++++--
.../sentry/service/thrift/SentryHMSClient.java | 38 +++--
.../sentry/service/thrift/TestHMSFollower.java | 146 +++++++++++++++++++
3 files changed, 278 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sentry/blob/ca315fe9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
index 083ee4c..547a61f 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
@@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting;
import java.net.SocketException;
import java.util.Collection;
+import java.util.List;
import javax.jdo.JDODataStoreException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -44,7 +45,7 @@ public class HMSFollower implements Runnable, AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(HMSFollower.class);
private static boolean connectedToHms = false;
- private final SentryHMSClient client;
+ private SentryHMSClient client;
private final Configuration authzConf;
private final SentryStore sentryStore;
private final NotificationProcessor notificationProcessor;
@@ -91,6 +92,11 @@ public class HMSFollower implements Runnable, AutoCloseable {
return connectedToHms;
}
+ @VisibleForTesting
+ void setSentryHmsClient(SentryHMSClient client) {
+ this.client = client;
+ }
+
@Override
public void close() {
if (client != null) {
@@ -117,7 +123,7 @@ public class HMSFollower implements Runnable, AutoCloseable {
// Wake any clients connected to this service waiting for HMS already processed notifications.
wakeUpWaitingClientsForSync(lastProcessedNotificationId);
// Only the leader should listen to HMS updates
- if ((leaderMonitor != null) && !leaderMonitor.isLeader()) {
+ if (!isLeader()) {
// Close any outstanding connections to HMS
close();
return;
@@ -125,6 +131,10 @@ public class HMSFollower implements Runnable, AutoCloseable {
syncupWithHms(lastProcessedNotificationId);
}
+ private boolean isLeader() {
+ return (leaderMonitor == null) || leaderMonitor.isLeader();
+ }
+
/**
* Processes new Hive Metastore notifications.
*
@@ -145,18 +155,23 @@ public class HMSFollower implements Runnable, AutoCloseable {
}
try {
- long lastProcessedNotificationId = notificationId;
- // Create a full HMS snapshot if there is none
- // Decision of taking full snapshot is based on AuthzPathsMapping information persisted
- // in the sentry persistent store. If AuthzPathsMapping is empty, snapshot is needed.
- if (sentryStore.isAuthzPathsMappingEmpty()) {
- lastProcessedNotificationId = createFullSnapshot();
- if (lastProcessedNotificationId == SentryStore.EMPTY_NOTIFICATION_ID) {
- return;
- }
+ /* Before getting notifications, it checks if a full HMS snapshot is required. */
+ if (isFullSnapshotRequired(notificationId)) {
+ createFullSnapshot();
+ return;
+ }
+
+ Collection<NotificationEvent> notifications = client.getNotifications(notificationId);
+
+ // After getting notifications, it checks if the HMS did some clean-up and notifications
+ // are out-of-sync with Sentry.
+ if (areNotificationsOutOfSync(notifications, notificationId)) {
+ createFullSnapshot();
+ return;
}
- // Get the new notification from HMS and process them.
- processNotifications(client.getNotifications(lastProcessedNotificationId));
+
+ // Continue with processing new notifications if no snapshots are done.
+ processNotifications(notifications);
} catch (TException e) {
// If the underlying exception is around socket exception,
// it is better to retry connection to HMS
@@ -175,6 +190,75 @@ public class HMSFollower implements Runnable, AutoCloseable {
}
/**
+ * Checks if a new full HMS snapshot request is needed by checking if:
+ * <ul>
+ * <li>No snapshots has been persisted yet.</li>
+ * <li>The current notification Id on the HMS is less than the
+ * latest processed by Sentry.</li>
+ * </ul>
+ *
+ * @param latestSentryNotificationId The notification Id to check against the HMS
+ * @return True if a full snapshot is required; False otherwise.
+ * @throws Exception If an error occurs while checking the SentryStore or the HMS client.
+ */
+ private boolean isFullSnapshotRequired(long latestSentryNotificationId) throws Exception {
+ if (sentryStore.isAuthzPathsMappingEmpty()) {
+ return true;
+ }
+
+ long currentHmsNotificationId = client.getCurrentNotificationId();
+ if (currentHmsNotificationId < latestSentryNotificationId) {
+ LOGGER.info("The latest notification ID on HMS is less than the latest notification ID "
+ + "processed by Sentry. Need to request a full HMS snapshot.");
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Checks if the HMS and Sentry processed notifications are out-of-sync.
+ * This could happen because the HMS did some clean-up of old notifications
+ * and Sentry was not requesting notifications during that time.
+ *
+ * @param events All new notifications to check for an out-of-sync.
+ * @param latestProcessedId The latest notification processed by Sentry to check against the
+ * list of notifications events.
+ * @return True if an out-of-sync is found; False otherwise.
+ */
+ private boolean areNotificationsOutOfSync(Collection<NotificationEvent> events,
+ long latestProcessedId) {
+ if (events.isEmpty()) {
+ return false;
+ }
+
+ List<NotificationEvent> eventList = (List<NotificationEvent>) events;
+ long firstNotificationId = eventList.get(0).getEventId();
+ long lastNotificationId = eventList.get(eventList.size() - 1).getEventId();
+
+ /* If the next expected notification is not available, then an out-of-sync might
+ * have happened due to the following issue:
+ *
+ * - HDFS sync was disabled or Sentry was shutdown for a time period longer than
+ * the HMS notification clean-up thread causing old notifications to be deleted.
+ */
+ if ((latestProcessedId + 1) != firstNotificationId) {
+ LOGGER.info("Current HMS notifications are out-of-sync with latest Sentry processed"
+ + "notifications. Need to request a full HMS snapshot.");
+ return true;
+ }
+
+ long expectedSize = lastNotificationId - latestProcessedId;
+ if (expectedSize < eventList.size()) {
+ LOGGER.info("The HMS notifications fetched has some gaps in the # of events received. These"
+ + "should not cause an out-of-sync issue. (expected = {}, fetched = {})",
+ expectedSize, eventList.size());
+ }
+
+ return false;
+ }
+
+ /**
* Request for full snapshot and persists it if there is no snapshot available in the
* sentry store. Also, wakes-up any waiting clients.
*
@@ -187,6 +271,12 @@ public class HMSFollower implements Runnable, AutoCloseable {
if (snapshotInfo.getPathImage().isEmpty()) {
return snapshotInfo.getId();
}
+
+ // Check we're still the leader before persisting the new snapshot
+ if (!isLeader()) {
+ return SentryStore.EMPTY_NOTIFICATION_ID;
+ }
+
try {
LOGGER.debug("Persisting HMS path full snapshot");
sentryStore.persistFullPathsImage(snapshotInfo.getPathImage());
@@ -220,7 +310,7 @@ public class HMSFollower implements Runnable, AutoCloseable {
isNotificationProcessed = false;
try {
// Only the leader should process the notifications
- if ((leaderMonitor != null) && !leaderMonitor.isLeader()) {
+ if (!isLeader()) {
return;
}
isNotificationProcessed = notificationProcessor.processNotificationEvent(event);
http://git-wip-us.apache.org/repos/asf/sentry/blob/ca315fe9/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java
index 05518e8..4f76a94 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java
@@ -54,7 +54,7 @@ import org.slf4j.LoggerFactory;
* <p>Abstracts communication with HMS and exposes APi's to connect/disconnect to HMS and to
* request HMS snapshots and also for new notifications.
*/
-final class SentryHMSClient implements AutoCloseable {
+class SentryHMSClient implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(SentryHMSClient.class);
private final Configuration conf;
@@ -213,27 +213,27 @@ final class SentryHMSClient implements AutoCloseable {
LOGGER.error("Client is not connected to HMS");
return Collections.emptyList();
}
+
LOGGER.debug("Checking for notifications beyond {}", notificationId);
- // HIVE-15761: Currently getNextNotification API may return an empty
- // NotificationEventResponse causing TProtocolException.
- // Workaround: Only processes the notification events newer than the last updated one.
+
+ // A bug HIVE-15761 (fixed on Hive 2.4.0) should allow requesting notifications using
+ // an unprocessed notification ID without causing an exception. For now, we just
+ // leave this workaround and log debug messages.
CurrentNotificationEventId eventId = client.getCurrentNotificationEventId();
LOGGER.debug("ID of Last HMS notifications is: {}", eventId.getEventId());
- if (eventId.getEventId() < notificationId) {
- LOGGER.error("Last notification of HMS is smaller than what sentry processed, Something is"
+ if (eventId != null && eventId.getEventId() < notificationId) {
+ LOGGER.debug("Last notification of HMS is smaller than what sentry processed, Something is"
+ "wrong. Sentry will request a full Snapshot");
- // TODO Path Mapping info should be cleared so that HMSFollower would request for full
- // snapshot in the subsequent run.
return Collections.emptyList();
}
- if (eventId.getEventId() == notificationId) {
+ if (eventId != null && eventId.getEventId() == notificationId) {
return Collections.emptyList();
}
NotificationEventResponse response =
client.getNextNotification(notificationId, Integer.MAX_VALUE, null);
- if (response.isSetEvents()) {
+ if (response != null && response.isSetEvents()) {
LOGGER.debug("Last Id processed:{}. Received collection of notifications, Size:{}",
notificationId, response.getEvents().size());
return response.getEvents();
@@ -241,4 +241,22 @@ final class SentryHMSClient implements AutoCloseable {
return Collections.emptyList();
}
+
+ /**
+ * @return the latest notification Id logged by the HMS
+ * @throws Exception when an error occurs when talking to the HMS client
+ */
+ public long getCurrentNotificationId() throws Exception {
+ if (client == null) {
+ LOGGER.error("Client is not connected to HMS");
+ return SentryStore.EMPTY_NOTIFICATION_ID;
+ }
+
+ CurrentNotificationEventId eventId = client.getCurrentNotificationEventId();
+ if (eventId.isSetEventId()) {
+ return eventId.getEventId();
+ }
+
+ return SentryStore.EMPTY_NOTIFICATION_ID;
+ }
}
http://git-wip-us.apache.org/repos/asf/sentry/blob/ca315fe9/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
index d67c162..fdf52bf 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
@@ -20,10 +20,14 @@ import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
@@ -36,6 +40,7 @@ import org.apache.hive.hcatalog.messaging.HCatEventMessage;
import org.apache.hive.hcatalog.messaging.HCatEventMessage.EventType;
import org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageFactory;
import org.apache.sentry.hdfs.Updateable;
+import org.apache.sentry.provider.db.service.persistent.PathsImage;
import org.apache.sentry.provider.db.service.persistent.SentryStore;
import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable;
import org.junit.BeforeClass;
@@ -60,6 +65,147 @@ public class TestHMSFollower {
configuration.set("sentry.hive.sync.create", "true");
}
+ @Test
+ public void testPersistAFullSnapshotWhenNoSnapshotAreProcessedYet() throws Exception {
+ /*
+ * TEST CASE
+ *
+ * Simulates (by using mocks) that Sentry has not processed any notifications, so this
+ * should trigger a new full HMS snapshot request with the eventId = 1
+ */
+
+ final long SENTRY_PROCESSED_EVENT_ID = SentryStore.EMPTY_NOTIFICATION_ID;
+ final long HMS_PROCESSED_EVENT_ID = 1L;
+
+ // Mock that returns a full snapshot
+ Map<String, Set<String>> snapshotObjects = new HashMap<>();
+ snapshotObjects.put("db", Sets.newHashSet("/db"));
+ snapshotObjects.put("db.table", Sets.newHashSet("/db/table"));
+ PathsImage fullSnapshot = new PathsImage(snapshotObjects, HMS_PROCESSED_EVENT_ID, 1);
+
+ SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class);
+ Mockito.when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot);
+ Mockito.when(sentryHmsClient.getCurrentNotificationId()).thenReturn(fullSnapshot.getId());
+
+ HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
+ hiveConnectionFactory, hiveInstance);
+ hmsFollower.setSentryHmsClient(sentryHmsClient);
+
+ // 1st run should get a full snapshot because AuthzPathsMapping is empty
+ Mockito.when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID);
+ Mockito.when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(true);
+ hmsFollower.run();
+ Mockito.verify(sentryStore, times(1)).persistFullPathsImage(fullSnapshot.getPathImage());
+ Mockito.verify(sentryStore, times(1)).persistLastProcessedNotificationID(fullSnapshot.getId());
+
+ Mockito.reset(sentryStore);
+
+ // 2nd run should not get a snapshot because is already processed
+ Mockito.when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId());
+ Mockito.when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
+ hmsFollower.run();
+ Mockito.verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap());
+ Mockito.verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
+ }
+
+ @Test
+ public void testPersistAFullSnapshotWhenLastHmsNotificationIsLowerThanLastProcessed()
+ throws Exception {
+ /*
+ * TEST CASE
+ *
+ * Simulates (by using mocks) that Sentry already processed (and persisted) a notification
+ * with Id = 5, but the latest notification processed by the HMS is eventId = 1. So, an
+ * out-of-sync issue is happening on Sentry and HMS. This out-of-sync issue should trigger
+ * a new full HMS snapshot request with the same eventId = 1;
+ */
+
+ final long SENTRY_PROCESSED_EVENT_ID = 5L;
+ final long HMS_PROCESSED_EVENT_ID = 1L;
+
+ // Mock that returns a full snapshot
+ Map<String, Set<String>> snapshotObjects = new HashMap<>();
+ snapshotObjects.put("db", Sets.newHashSet("/db"));
+ snapshotObjects.put("db.table", Sets.newHashSet("/db/table"));
+ PathsImage fullSnapshot = new PathsImage(snapshotObjects, HMS_PROCESSED_EVENT_ID, 1);
+
+ SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class);
+ Mockito.when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot);
+ Mockito.when(sentryHmsClient.getCurrentNotificationId()).thenReturn(fullSnapshot.getId());
+
+ HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
+ hiveConnectionFactory, hiveInstance);
+ hmsFollower.setSentryHmsClient(sentryHmsClient);
+
+ // 1st run should get a full snapshot
+ Mockito.when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID);
+ Mockito.when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
+ hmsFollower.run();
+ Mockito.verify(sentryStore, times(1)).persistFullPathsImage(Mockito.anyMap());
+ Mockito.verify(sentryStore, times(1)).persistLastProcessedNotificationID(Mockito.anyLong());
+
+ Mockito.reset(sentryStore);
+
+ // 2nd run should not get a snapshot because is already processed
+ Mockito.when(sentryStore.getLastProcessedNotificationID()).thenReturn(HMS_PROCESSED_EVENT_ID);
+ Mockito.when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
+ hmsFollower.run();
+ Mockito.verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap());
+ Mockito.verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
+ }
+
+ @Test
+ public void testPersistAFullSnapshotWhenNextExpectedEventIsNotAvailable() throws Exception {
+ /*
+ * TEST CASE
+ *
+ * Simulates (by using mocks) that Sentry already processed (and persisted) a notification
+ * with Id = 1, and the latest notification processed by the HMS is eventId = 5. So, new
+ * notifications should be fetched.
+ *
+ * The number of expected notifications should be 4, but we simulate that we fetch only one
+ * notification with eventId = 5 causing an out-of-sync because the expected notificatoin
+ * should be 2. This out-of-sync should trigger a new full HMS snapshot request with the
+ * same eventId = 5.
+ */
+
+ final long SENTRY_PROCESSED_EVENT_ID = 1L;
+ final long HMS_PROCESSED_EVENT_ID = 5L;
+
+ // Mock that returns a full snapshot
+ Map<String, Set<String>> snapshotObjects = new HashMap<>();
+ snapshotObjects.put("db", Sets.newHashSet("/db"));
+ snapshotObjects.put("db.table", Sets.newHashSet("/db/table"));
+ PathsImage fullSnapshot = new PathsImage(snapshotObjects, HMS_PROCESSED_EVENT_ID, 1);
+
+ SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class);
+ Mockito.when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot);
+ Mockito.when(sentryHmsClient.getCurrentNotificationId()).thenReturn(fullSnapshot.getId());
+ Mockito.when(sentryHmsClient.getNotifications(SENTRY_PROCESSED_EVENT_ID))
+ .thenReturn(Collections.singletonList(
+ new NotificationEvent(fullSnapshot.getId(), 0, "", "")));
+
+ HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null,
+ hiveConnectionFactory, hiveInstance);
+ hmsFollower.setSentryHmsClient(sentryHmsClient);
+
+ // 1st run should get a full snapshot
+ Mockito.when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID);
+ Mockito.when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
+ hmsFollower.run();
+ Mockito.verify(sentryStore, times(1)).persistFullPathsImage(Mockito.anyMap());
+ Mockito.verify(sentryStore, times(1)).persistLastProcessedNotificationID(Mockito.anyLong());
+
+ Mockito.reset(sentryStore);
+
+ // 2nd run should not get a snapshot because is already processed
+ Mockito.when(sentryStore.getLastProcessedNotificationID()).thenReturn(HMS_PROCESSED_EVENT_ID);
+ Mockito.when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false);
+ hmsFollower.run();
+ Mockito.verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap());
+ Mockito.verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong());
+ }
+
/**
* Constructs create database event and makes sure that appropriate sentry store API's
* are invoke when the event is processed by hms follower.