You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ak...@apache.org on 2017/09/08 00:09:40 UTC

sentry git commit: SENTRY-1929: When full HMS snapshot is created all higher notifications should be purged (Alex Kolbasov, reviewed by Vamsee Yarlaga and Na Li)

Repository: sentry
Updated Branches:
  refs/heads/master 1275be7e3 -> c302da781


SENTRY-1929: When full HMS snapshot is created all higher notifications should be purged (Alex Kolbasov, reviewed by Vamsee Yarlaga and Na Li)


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/c302da78
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/c302da78
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/c302da78

Branch: refs/heads/master
Commit: c302da78189790d48191856cfaee2db6fcf7e9c7
Parents: 1275be7
Author: Alexander Kolbasov <ak...@gmail.com>
Authored: Thu Sep 7 17:00:50 2017 -0700
Committer: Alexander Kolbasov <ak...@gmail.com>
Committed: Thu Sep 7 17:09:35 2017 -0700

----------------------------------------------------------------------
 .../db/service/persistent/SentryStore.java      | 53 ++++++++++++++++++--
 .../sentry/service/thrift/CounterWait.java      | 22 ++++++++
 .../sentry/service/thrift/HMSFollower.java      | 19 ++++++-
 .../sentry/service/thrift/TestHMSFollower.java  |  2 +-
 4 files changed, 89 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/c302da78/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
index 1ef7dcc..a70a552 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
@@ -603,8 +603,10 @@ public class SentryStore {
     query.setFilter("changeID <= maxChangedIdDeleted");
     query.declareParameters("long maxChangedIdDeleted");
     long numDeleted = query.deletePersistentAll(maxIDDeleted);
-    LOGGER.info(String.format("Purged %d of %s to changeID=%d",
-        numDeleted, cls.getSimpleName(), maxIDDeleted));
+    if (numDeleted > 0) {
+      LOGGER.info(String.format("Purged %d of %s to changeID=%d",
+              numDeleted, cls.getSimpleName(), maxIDDeleted));
+    }
   }
 
   /**
@@ -627,7 +629,9 @@ public class SentryStore {
     query.setFilter("notificationId <= maxNotificationIdDeleted");
     query.declareParameters("long maxNotificationIdDeleted");
     long numDeleted = query.deletePersistentAll(lastNotificationID - changesToKeep);
-    LOGGER.info("Purged {} of {}", numDeleted, MSentryHmsNotification.class.getSimpleName());
+    if (numDeleted > 0) {
+      LOGGER.info("Purged {} of {}", numDeleted, MSentryHmsNotification.class.getSimpleName());
+    }
   }
 
   /**
@@ -2765,6 +2769,28 @@ public class SentryStore {
   }
 
   /**
+   * Delete all stored HMS notifications starting from given ID.<p>
+   *
+   * The purpose of the function is to clean up notifications in cases
+   * were we recover from HMS notifications resets.
+   *
+   * @param pm Persistent manager instance
+   * @param id initial ID. All notifications starting from this ID and above are
+   *          removed.
+   */
+  private void deleteNotificationsSince(PersistenceManager pm, long id) {
+    Query query = pm.newQuery(MSentryHmsNotification.class);
+    query.addExtension(LOAD_RESULTS_AT_COMMIT, "false");
+    query.setFilter("notificationId >= currentNotificationId");
+    query.declareParameters("long currentNotificationId");
+    long numDeleted = query.deletePersistentAll(id);
+    if (numDeleted > 0) {
+      LOGGER.info("Purged {} notification entries starting from {}",
+              numDeleted, id);
+    }
+  }
+
+  /**
    * Persist an up-to-date HMS snapshot into Sentry DB in a single transaction with its latest
    * notification ID
    *
@@ -2778,6 +2804,7 @@ public class SentryStore {
       new TransactionBlock() {
         public Object execute(PersistenceManager pm) throws Exception {
           pm.setDetachAllOnCommit(false); // No need to detach objects
+          deleteNotificationsSince(pm, notificationID + 1);
 
           // persist the notidicationID
           pm.makePersistent(new MSentryHmsNotification(notificationID));
@@ -3814,17 +3841,33 @@ public class SentryStore {
   }
 
   /**
-   * Set the notification ID of last processed HMS notification.
+   * Set the notification ID of last processed HMS notification and remove all
+   * subsequent notifications stored.
    */
-  public void persistLastProcessedNotificationID(final Long notificationId) throws Exception {
+  public void setLastProcessedNotificationID(final Long notificationId) throws Exception {
     LOGGER.debug("Persisting Last Processed Notification ID {}", notificationId);
     tm.executeTransaction(
       new TransactionBlock<Object>() {
         public Object execute(PersistenceManager pm) throws Exception {
+          deleteNotificationsSince(pm, notificationId + 1);
           return pm.makePersistent(new MSentryHmsNotification(notificationId));
         }
       });
   }
+
+  /**
+   * Set the notification ID of last processed HMS notification.
+   */
+  public void persistLastProcessedNotificationID(final Long notificationId) throws Exception {
+    LOGGER.debug("Persisting Last Processed Notification ID {}", notificationId);
+    tm.executeTransaction(
+            new TransactionBlock<Object>() {
+              public Object execute(PersistenceManager pm) throws Exception {
+                return pm.makePersistent(new MSentryHmsNotification(notificationId));
+              }
+            });
+  }
+
   /**
    * Gets the last processed change ID for perm delta changes.
    *

http://git-wip-us.apache.org/repos/asf/sentry/blob/c302da78/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java
index 9c9bb69..2c9e87a 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/CounterWait.java
@@ -117,6 +117,28 @@ public final class CounterWait {
     wakeup(newValue);
   }
 
+  /**
+   * Explicitly reset the counter value to a new value, but allow setting to a
+   * smaller value.
+   * This should be used when we have some external event that resets the counter
+   * value space.
+   * @param newValue New counter value. If this is greater or equal then the current
+   *                value, this is equivalent to {@link #update(long)}. Otherwise
+   *                 sets the counter to the new smaller value.
+   */
+  public synchronized void reset(long newValue) {
+    long oldValue = currentId.get();
+
+    if (newValue > oldValue) {
+      update(newValue);
+    } else if (newValue < oldValue) {
+      LOGGER.warn("resetting counter from {} to smaller value {}",
+              oldValue, newValue);
+      currentId.set(newValue);
+      // No need to wakeup waiters since no one should wait on the smaller value
+    }
+  }
+
 
   /**
    * Wait for specified counter value.

http://git-wip-us.apache.org/repos/asf/sentry/blob/c302da78/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
index c234eaf..4d83ad5 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
@@ -294,8 +294,10 @@ public class HMSFollower implements Runnable, AutoCloseable {
         sentryStore.persistFullPathsImage(snapshotInfo.getPathImage(), snapshotInfo.getId());
       } else {
         // We need to persist latest notificationID for next poll
-        sentryStore.persistLastProcessedNotificationID(snapshotInfo.getId());
+        sentryStore.setLastProcessedNotificationID(snapshotInfo.getId());
       }
+      // Only reset the counter if the above operations succeeded
+      resetCounterWait(snapshotInfo.getId());
     } catch (Exception failure) {
       LOGGER.error("Received exception while persisting HMS path full snapshot ");
       throw failure;
@@ -378,4 +380,19 @@ public class HMSFollower implements Runnable, AutoCloseable {
       counterWait.update(eventId);
     }
   }
+
+  /**
+   * Reset CounterWait counter to the new value
+   * @param eventId new event id value, may be smaller then the old value.
+   */
+  private void resetCounterWait(long eventId) {
+    CounterWait counterWait = sentryStore.getCounterWait();
+
+    // Wake up any HMS waiters that are waiting for this ID.
+    // counterWait should never be null, but tests mock SentryStore and a mocked one
+    // doesn't have it.
+    if (counterWait != null) {
+      counterWait.reset(eventId);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/c302da78/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
index 93afb61..7d64375 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java
@@ -821,7 +821,7 @@ public class TestHMSFollower {
     when(sentryStore.isHmsNotificationEmpty()).thenReturn(true);
     hmsFollower.run();
     verify(sentryStore, times(0)).persistFullPathsImage(fullSnapshot.getPathImage(), fullSnapshot.getId());
-    verify(sentryStore, times(1)).persistLastProcessedNotificationID(fullSnapshot.getId());
+    verify(sentryStore, times(1)).setLastProcessedNotificationID(fullSnapshot.getId());
     verify(sentryStore, times(1)).isHmsNotificationEmpty();
     verify(sentryStore, times(0)).isAuthzPathsMappingEmpty();
   }