You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ha...@apache.org on 2017/01/27 23:20:53 UTC

[6/7] sentry git commit: SENTRY-1613

SENTRY-1613

Change-Id: I6ae45e29ab6d6987e07ef2fa54037732d47f4b8d


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/18be1d5e
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/18be1d5e
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/18be1d5e

Branch: refs/heads/sentry-ha-redesign-1
Commit: 18be1d5e8c0efcb3bbd6d21cd457f20f2075e301
Parents: 813d10e
Author: hahao <ha...@cloudera.com>
Authored: Thu Jan 26 18:11:54 2017 -0800
Committer: hahao <ha...@cloudera.com>
Committed: Fri Jan 27 14:58:11 2017 -0800

----------------------------------------------------------------------
 .../org/apache/sentry/hdfs/ImageRetriever.java  |   2 +-
 .../apache/sentry/hdfs/ThriftSerializer.java    |  10 +-
 .../org/apache/sentry/hdfs/UpdateRetriever.java |  38 ++
 .../apache/sentry/hdfs/DBUpdateForwarder.java   |  67 ++++
 .../apache/sentry/hdfs/PathImageRetriever.java  |  15 +-
 .../apache/sentry/hdfs/PathUpdateRetriever.java |  58 +++
 .../apache/sentry/hdfs/PermImageRetriever.java  |   6 +-
 .../apache/sentry/hdfs/PermUpdateRetriever.java |  58 +++
 .../sentry/hdfs/SentryHDFSServiceProcessor.java |  29 +-
 .../org/apache/sentry/hdfs/SentryPlugin.java    |  77 +---
 .../org/apache/sentry/hdfs/UpdateForwarder.java | 335 -----------------
 .../sentry/hdfs/UpdateablePermissions.java      |  63 ----
 .../apache/sentry/hdfs/TestUpdateForwarder.java | 359 -------------------
 .../db/service/persistent/SentryStore.java      | 102 ++++++
 .../sentry/service/thrift/SentryService.java    |   3 +-
 .../tests/e2e/hdfs/TestHDFSIntegration.java     |   6 +-
 16 files changed, 362 insertions(+), 866 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ImageRetriever.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ImageRetriever.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ImageRetriever.java
index 1147c07..5080559 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ImageRetriever.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ImageRetriever.java
@@ -29,6 +29,6 @@ public interface ImageRetriever<K> {
    * @return a full snapshot of type K
    * @throws Exception
    */
-  K retrieveFullImage(long seqNum) throws Exception;
+  K retrieveFullImage() throws Exception;
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java
index 69aa098..d7b9923 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java
@@ -25,12 +25,12 @@ import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TSimpleJSONProtocol;
+import org.apache.thrift.protocol.TJSONProtocol;
 
 public class ThriftSerializer {
 
-  final static private TSimpleJSONProtocol.Factory tSimpleJSONProtocol =
-          new TSimpleJSONProtocol.Factory();
+  final static private TJSONProtocol.Factory tJSONProtocol =
+          new TJSONProtocol.Factory();
 
   // Use default max thrift message size here.
   // TODO: Figure out a way to make maxMessageSize configurable, eg. create a serializer singleton at startup by
@@ -67,13 +67,13 @@ public class ThriftSerializer {
 
   public static String serializeToJSON(TBase base) throws TException  {
     // Initiate a new TSerializer each time for thread safety.
-    TSerializer tSerializer = new TSerializer(tSimpleJSONProtocol);
+    TSerializer tSerializer = new TSerializer(tJSONProtocol);
     return tSerializer.toString(base);
   }
 
   public static void deserializeFromJSON(TBase base, String dataInJson) throws TException {
     // Initiate a new TDeserializer each time for thread safety.
-    TDeserializer tDeserializer = new TDeserializer(tSimpleJSONProtocol);
+    TDeserializer tDeserializer = new TDeserializer(tJSONProtocol);
     tDeserializer.fromString(base, dataInJson);
   }
 

http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UpdateRetriever.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UpdateRetriever.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UpdateRetriever.java
new file mode 100644
index 0000000..ef25cfa
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UpdateRetriever.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.hdfs;
+
+import java.util.List;
+
+/**
+ * Interface class for generating/retrieving a full image.
+ */
+public interface UpdateRetriever<K> {
+
+  /**
+   * Retrieve a full image of type k.
+   *
+   * @param seqNum the given seq number
+   * @return a full snapshot of type K
+   * @throws Exception
+   */
+  List<K> retrievePartialUpdate(long seqNum) throws Exception;
+
+  boolean isPartialUpdateAvailable(long seqNum) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/DBUpdateForwarder.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/DBUpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/DBUpdateForwarder.java
new file mode 100644
index 0000000..ca70a88
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/DBUpdateForwarder.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.sentry.provider.db.service.persistent.SentryStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  DBUpdateForwarder is thread safe class
+ */
+public class DBUpdateForwarder<K extends Updateable.Update> {
+
+  private final ImageRetriever<K> imageRetreiver;
+  private final UpdateRetriever<K> updateRetriever;
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(DBUpdateForwarder.class);
+  private static final String UPDATABLE_TYPE_NAME = "update_forwarder";
+
+  protected DBUpdateForwarder(final ImageRetriever<K> imageRetreiver,
+      final UpdateRetriever<K> updateRetriever) {
+    this.imageRetreiver = imageRetreiver;
+    this.updateRetriever = updateRetriever;
+  }
+
+  /**
+   * Return all updates from requested seqNum (inclusive)
+   *
+   * @param seqNum
+   * @return the list of updates
+   */
+  public List<K> getAllUpdatesFrom(long seqNum) throws Exception {
+    List<K> retVal = new LinkedList<>();
+
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("#### GetAllUpdatesFrom ["
+      + "reqSeqNum=" + seqNum + " ]");
+    }
+
+    if (seqNum == SentryStore.INIT_CHANGE_ID ||
+          !updateRetriever.isPartialUpdateAvailable(seqNum)) {
+      retVal.add(imageRetreiver.retrieveFullImage());
+    } else {
+      retVal.addAll(updateRetriever.retrievePartialUpdate(seqNum));
+    }
+
+    return retVal;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java
index 6cfd6d5..2edfcc1 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java
@@ -18,13 +18,14 @@
 package org.apache.sentry.hdfs;
 
 import com.codahale.metrics.Timer;
+import com.google.common.collect.Lists;
 import org.apache.sentry.hdfs.service.thrift.TPathChanges;
 import org.apache.sentry.provider.db.service.persistent.PathsImage;
 import org.apache.sentry.provider.db.service.persistent.SentryStore;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class PathImageRetriever implements ImageRetriever<PathsUpdate> {
 
@@ -35,7 +36,7 @@ public class PathImageRetriever implements ImageRetriever<PathsUpdate> {
   }
 
   @Override
-  public PathsUpdate retrieveFullImage(long seqNum) throws Exception {
+  public PathsUpdate retrieveFullImage() throws Exception {
     try (final Timer.Context timerContext =
         SentryHdfsMetricsUtil.getRetrievePathFullImageTimer.time()) {
 
@@ -48,7 +49,7 @@ public class PathImageRetriever implements ImageRetriever<PathsUpdate> {
 
       // Generate a corresponding PathsUpdate.
       // TODO: use curSeqNum from DB instead of seqNum when doing SENTRY-1567
-      PathsUpdate pathsUpdate = new PathsUpdate(seqNum, true);
+      PathsUpdate pathsUpdate = new PathsUpdate(curSeqNum, true);
       for (Map.Entry<String, Set<String>> pathEnt : pathImage.entrySet()) {
         TPathChanges pathChange = pathsUpdate.newPathChange(pathEnt.getKey());
 
@@ -59,7 +60,13 @@ public class PathImageRetriever implements ImageRetriever<PathsUpdate> {
 
       SentryHdfsMetricsUtil.getPathChangesHistogram.update(pathsUpdate
             .getPathChanges().size());
+
+      UpdateableAuthzPaths authzPaths = new UpdateableAuthzPaths(
+          new String[]{"/"});
+      authzPaths.updatePartial(Lists.newArrayList(pathsUpdate),
+          new ReentrantReadWriteLock());
+      pathsUpdate.toThrift().setPathsDump(authzPaths.getPathsDump().createPathsDump());
       return pathsUpdate;
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathUpdateRetriever.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathUpdateRetriever.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathUpdateRetriever.java
new file mode 100644
index 0000000..469ffe7
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathUpdateRetriever.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+
+import com.google.common.collect.Lists;
+import org.apache.sentry.provider.db.service.model.MSentryPathChange;
+import org.apache.sentry.provider.db.service.persistent.SentryStore;
+
+import java.util.List;
+
+public class PathUpdateRetriever implements UpdateRetriever<PathsUpdate> {
+
+  private final SentryStore sentryStore;
+
+  public PathUpdateRetriever(SentryStore sentryStore) {
+    this.sentryStore = sentryStore;
+  }
+
+  @Override
+  public List<PathsUpdate> retrievePartialUpdate(long seqNum) throws Exception {
+    List<MSentryPathChange> mSentryPathChanges =
+            sentryStore.getMSentryPathChanges(seqNum);
+    List<PathsUpdate> updates = Lists.newArrayList();
+    for (MSentryPathChange mSentryPathChange : mSentryPathChanges) {
+      // get changeID from stored MSentryPathChange
+      long changeID = mSentryPathChange.getChangeID();
+      // Create a corresponding PathsUpdate and deserialize the stored
+      // JSON string to TPathsUpdate. Then set the corresponding
+      // changeID.
+      PathsUpdate pathsUpdate = new PathsUpdate();
+      pathsUpdate.JSONDeserialize(mSentryPathChange.getPathChange());
+      pathsUpdate.setSeqNum(changeID);
+      updates.add(pathsUpdate);
+    }
+    return updates;
+  }
+
+  @Override
+  public boolean isPartialUpdateAvailable(long seqNum) throws Exception {
+    return sentryStore.findMSentryPathChangeByID(seqNum);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermImageRetriever.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermImageRetriever.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermImageRetriever.java
index 56985c2..323d090 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermImageRetriever.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermImageRetriever.java
@@ -37,7 +37,7 @@ public class PermImageRetriever implements ImageRetriever<PermissionsUpdate> {
   }
 
   @Override
-  public PermissionsUpdate retrieveFullImage(long seqNum) throws Exception {
+  public PermissionsUpdate retrieveFullImage() throws Exception {
     try(Timer.Context timerContext =
         SentryHdfsMetricsUtil.getRetrievePermFullImageTimer.time()) {
 
@@ -72,7 +72,7 @@ public class PermImageRetriever implements ImageRetriever<PermissionsUpdate> {
 
       PermissionsUpdate permissionsUpdate = new PermissionsUpdate(tPermUpdate);
       // TODO: use curSeqNum from DB instead of seqNum when doing SENTRY-1567
-      permissionsUpdate.setSeqNum(seqNum);
+      permissionsUpdate.setSeqNum(curSeqNum);
       SentryHdfsMetricsUtil.getPrivilegeChangesHistogram.update(
           tPermUpdate.getPrivilegeChangesSize());
       SentryHdfsMetricsUtil.getRoleChangesHistogram.update(
@@ -81,4 +81,4 @@ public class PermImageRetriever implements ImageRetriever<PermissionsUpdate> {
     }
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermUpdateRetriever.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermUpdateRetriever.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermUpdateRetriever.java
new file mode 100644
index 0000000..e7e0e77
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermUpdateRetriever.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+
+import com.google.common.collect.Lists;
+import org.apache.sentry.provider.db.service.model.MSentryPermChange;
+import org.apache.sentry.provider.db.service.persistent.SentryStore;
+
+import java.util.List;
+
+public class PermUpdateRetriever implements UpdateRetriever<PermissionsUpdate> {
+
+  private final SentryStore sentryStore;
+
+  public PermUpdateRetriever(SentryStore sentryStore) {
+    this.sentryStore = sentryStore;
+  }
+
+  @Override
+  public List<PermissionsUpdate> retrievePartialUpdate(long seqNum) throws Exception {
+    List<MSentryPermChange> mSentryPermChanges =
+            sentryStore.getMSentryPermChanges(seqNum);
+    List<PermissionsUpdate> updates = Lists.newArrayList();
+    for (MSentryPermChange mSentryPermChange : mSentryPermChanges) {
+      // get changeID from stored MSentryPermChange
+      long changeID = mSentryPermChange.getChangeID();
+      // Create a corresponding PermissionsUpdate and deserialize the stored
+      // JSON string to TPermissionsUpdate. Then set the corresponding
+      // changeID.
+      PermissionsUpdate permsUpdate = new PermissionsUpdate();
+      permsUpdate.JSONDeserialize(mSentryPermChange.getPermChange());
+      permsUpdate.setSeqNum(changeID);
+      updates.add(permsUpdate);
+    }
+    return updates;
+  }
+
+  @Override
+  public boolean isPartialUpdateAvailable(long seqNum) throws Exception {
+    return sentryStore.findMSentryPermChangeByID(seqNum);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
index e4f3f58..5b146b7 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
@@ -42,10 +42,6 @@ public class SentryHDFSServiceProcessor implements SentryHDFSService.Iface {
     retVal.setAuthzPathUpdate(new LinkedList<TPathsUpdate>());
     retVal.setAuthzPermUpdate(new LinkedList<TPermissionsUpdate>());
     if (SentryPlugin.instance != null) {
-      if (SentryPlugin.instance.isOutOfSync()) {
-        throw new TException(
-            "This Sentry server is not communicating with other nodes and out of sync ");
-      }
       final Timer.Context timerContext =
           SentryHdfsMetricsUtil.getAllAuthzUpdatesTimer.time();
       try {
@@ -99,33 +95,12 @@ public class SentryHDFSServiceProcessor implements SentryHDFSService.Iface {
 
   @Override
   public void handle_hms_notification(TPathsUpdate update) throws TException {
-    final Timer.Context timerContext =
-        SentryHdfsMetricsUtil.getHandleHmsNotificationTimer.time();
-    try {
-      PathsUpdate hmsUpdate = new PathsUpdate(update);
-      if (SentryPlugin.instance != null) {
-        SentryPlugin.instance.handlePathUpdateNotification(hmsUpdate);
-        LOGGER.debug("Authz Paths update [" + hmsUpdate.getSeqNum() + "]..");
-      } else {
-        LOGGER.error("SentryPlugin not initialized yet !!");
-      }
-    } catch (Exception e) {
-      LOGGER.error("Error handling notification from HMS", e);
-      SentryHdfsMetricsUtil.getFailedHandleHmsNotificationCounter.inc();
-      throw new TException(e);
-    } finally {
-      timerContext.stop();
-      SentryHdfsMetricsUtil.getHandleHmsPathChangeHistogram.update(
-          update.getPathChangesSize());
-      if (update.isHasFullImage()) {
-        SentryHdfsMetricsUtil.getHandleHmsHasFullImageCounter.inc();
-      }
-    }
+    throw new TException("handle_hms_notification is no longer supported");
   }
 
   @Override
   public long check_hms_seq_num(long pathSeqNum) throws TException {
-    return SentryPlugin.instance.getLastSeenHMSPathSeqNum();
+    throw new TException("check_hms_seq_num is no longer supported");
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
index b99013e..f5ee179 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
@@ -108,18 +108,16 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
 
   private static final Logger LOGGER = LoggerFactory.getLogger(SentryPlugin.class);
 
-  private final AtomicBoolean fullUpdateHMSWait = new AtomicBoolean(false);
   private final AtomicBoolean fullUpdateHMS = new AtomicBoolean(false);
   private final AtomicBoolean fullUpdateNN = new AtomicBoolean(false);
 
   public static volatile SentryPlugin instance;
 
-  private UpdateForwarder<PathsUpdate> pathsUpdater;
-  private UpdateForwarder<PermissionsUpdate> permsUpdater;
+  private DBUpdateForwarder<PathsUpdate> pathsUpdater;
+  private DBUpdateForwarder<PermissionsUpdate> permsUpdater;
+
   // TODO: Each perm change sequence number should be generated during persistence at sentry store.
   private final AtomicLong permSeqNum = new AtomicLong(5);
-  private PermImageRetriever permImageRetriever;
-  private boolean outOfSync = false;
   /*
    * This number is smaller than starting sequence numbers used by NN and HMS
    * so in both cases its effect is to create appearance of out-of-sync
@@ -129,33 +127,15 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
    */
   private static final long NO_LAST_SEEN_HMS_PATH_SEQ_NUM = 0L;
 
-  /*
-   * Call from HMS to get the last known update sequence #.
-   */
-  long getLastSeenHMSPathSeqNum() {
-    if (!fullUpdateHMS.getAndSet(false)) {
-      return pathsUpdater.getLastSeen();
-    } else {
-      LOGGER.info("SIGNAL HANDLING: asking for full update from HMS");
-      return NO_LAST_SEEN_HMS_PATH_SEQ_NUM;
-    }
-  }
-
   @Override
   public void initialize(Configuration conf, SentryStore sentryStore) throws SentryPluginException {
-    final String[] pathPrefixes = conf
-        .getStrings(ServerConfig.SENTRY_HDFS_INTEGRATION_PATH_PREFIXES,
-            ServerConfig.SENTRY_HDFS_INTEGRATION_PATH_PREFIXES_DEFAULT);
-    final int initUpdateRetryDelayMs =
-        conf.getInt(ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS,
-            ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT);
-    permImageRetriever = new PermImageRetriever(sentryStore);
-
-    pathsUpdater = UpdateForwarder.create(conf, new UpdateableAuthzPaths(
-        pathPrefixes), new PathsUpdate(0, false), null, 100, initUpdateRetryDelayMs, false);
-    permsUpdater = UpdateForwarder.create(conf,
-        new UpdateablePermissions(permImageRetriever), new PermissionsUpdate(0, false),
-        permImageRetriever, 100, initUpdateRetryDelayMs, true);
+    PermImageRetriever permImageRetriever = new PermImageRetriever(sentryStore);
+    PathImageRetriever pathImageRetriever = new PathImageRetriever(sentryStore);
+    PermUpdateRetriever permUpdateRetriever = new PermUpdateRetriever(sentryStore);
+    PathUpdateRetriever pathUpdateRetriever = new PathUpdateRetriever(sentryStore);
+    pathsUpdater = new DBUpdateForwarder<>(pathImageRetriever, pathUpdateRetriever);
+    permsUpdater = new DBUpdateForwarder<>(permImageRetriever, permUpdateRetriever);
+
     LOGGER.info("Sentry HDFS plugin initialized !!");
     instance = this;
 
@@ -181,7 +161,7 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
     if (!fullUpdateNN.get()) {
       // Most common case - Sentry is NOT handling a full update.
       return pathsUpdater.getAllUpdatesFrom(pathSeqNum);
-    } else if (!fullUpdateHMSWait.get()) {
+    } else {
       /*
        * Sentry is in the middle of signal-triggered full update.
        * It already got a full update from HMS
@@ -215,10 +195,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
         LOGGER.warn("SIGNAL HANDLING: returned NULL instead of full update to NameNode (???)");
       }
       return updates;
-    } else {
-      // Sentry is handling a full update, but not yet received full update from HMS
-      LOGGER.warn("SIGNAL HANDLING: sending partial update to NameNode: still waiting for full update from HMS");
-      return pathsUpdater.getAllUpdatesFrom(pathSeqNum);
     }
   }
 
@@ -226,21 +202,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
     return permsUpdater.getAllUpdatesFrom(permSeqNum);
   }
 
-  /*
-   * Handle partial (most common) or full update from HMS
-   */
-  public void handlePathUpdateNotification(PathsUpdate update)
-      throws SentryPluginException {
-    pathsUpdater.handleUpdateNotification(update);
-    if (!update.hasFullImage()) { // most common case of partial update
-      LOGGER.debug("Recieved Authz Path update [" + update.getSeqNum() + "]..");
-    } else { // rare case of full update
-      LOGGER.warn("Recieved Authz Path FULL update [" + update.getSeqNum() + "]..");
-      // indicate that we're ready to send full update to NameNode
-      fullUpdateHMSWait.set(false);
-    }
-  }
-
   @Override
   public DeltaTransactionBlock onAlterSentryRoleAddGroups(
       TAlterSentryRoleAddGroupsRequest request) throws SentryPluginException {
@@ -250,7 +211,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
       rUpdate.addToAddGroups(group.getGroupName());
     }
 
-    permsUpdater.handleUpdateNotification(update);
     LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + request.getRoleName() + "]..");
     return new DeltaTransactionBlock(update);
   }
@@ -265,7 +225,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
       rUpdate.addToDelGroups(group.getGroupName());
     }
 
-    permsUpdater.handleUpdateNotification(update);
     LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + request.getRoleName() + "]..");
     return new DeltaTransactionBlock(update);
   }
@@ -301,7 +260,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
     update.addPrivilegeUpdate(authzObj).putToAddPrivileges(
         roleName, privilege.getAction().toUpperCase());
 
-    permsUpdater.handleUpdateNotification(update);
     LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + "]..");
     return update;
   }
@@ -316,7 +274,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
     privUpdate.putToAddPrivileges(newAuthz, newAuthz);
     privUpdate.putToDelPrivileges(oldAuthz, oldAuthz);
 
-    permsUpdater.handleUpdateNotification(update);
     LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + newAuthz + ", " + oldAuthz + "]..");
     return new DeltaTransactionBlock(update);
   }
@@ -342,14 +299,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
     }
   }
 
-  public boolean isOutOfSync() {
-    return outOfSync;
-  }
-
-  public void setOutOfSync(boolean outOfSync) {
-    this.outOfSync = outOfSync;
-  }
-
   private PermissionsUpdate onAlterSentryRoleRevokePrivilegeCore(String roleName, TSentryPrivilege privilege)
       throws SentryPluginException {
     String authzObj = getAuthzObj(privilege);
@@ -361,7 +310,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
     update.addPrivilegeUpdate(authzObj).putToDelPrivileges(
         roleName, privilege.getAction().toUpperCase());
 
-    permsUpdater.handleUpdateNotification(update);
     LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + authzObj + "]..");
     return update;
   }
@@ -374,7 +322,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
         request.getRoleName(), PermissionsUpdate.ALL_AUTHZ_OBJ);
     update.addRoleUpdate(request.getRoleName()).addToDelGroups(PermissionsUpdate.ALL_GROUPS);
 
-    permsUpdater.handleUpdateNotification(update);
     LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + request.getRoleName() + "]..");
     return new DeltaTransactionBlock(update);
   }
@@ -387,7 +334,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
     update.addPrivilegeUpdate(authzObj).putToDelPrivileges(
         PermissionsUpdate.ALL_ROLES, PermissionsUpdate.ALL_ROLES);
 
-    permsUpdater.handleUpdateNotification(update);
     LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + authzObj + "]..");
     return new DeltaTransactionBlock(update);
   }
@@ -396,7 +342,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
   public void onSignal(final String sigName) {
     LOGGER.info("SIGNAL HANDLING: Received signal " + sigName + ", triggering full update");
     fullUpdateHMS.set(true);
-    fullUpdateHMSWait.set(true);
     fullUpdateNN.set(true);
   }
 

http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
deleted file mode 100644
index 4bfc473..0000000
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.hdfs;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.sentry.provider.db.SentryPolicyStorePlugin.SentryPluginException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class UpdateForwarder<K extends Updateable.Update> implements
-    Updateable<K>, Closeable {
-
-  private final AtomicLong lastSeenSeqNum = new AtomicLong(0);
-  protected final AtomicLong lastCommittedSeqNum = new AtomicLong(0);
-  // Updates should be handled in order
-  private final Executor updateHandler = Executors.newSingleThreadExecutor();
-
-  // Update log is used when propagate updates to a downstream cache.
-  // The preUpdate log stores all commits that were applied to this cache.
-  // When the update log is filled to capacity (getMaxUpdateLogSize()), all
-  // entries are cleared and a compact image if the state of the cache is
-  // appended to the log.
-  // The first entry in an update log (consequently the first preUpdate a
-  // downstream cache sees) will be a full image. All subsequent entries are
-  // partial edits
-  protected final LinkedList<K> updateLog = new LinkedList<K>();
-  // UpdateLog is disabled when getMaxUpdateLogSize() = 0;
-  private final int maxUpdateLogSize;
-
-  private final ImageRetriever<K> imageRetreiver;
-
-  private volatile Updateable<K> updateable;
-
-  private final ReadWriteLock lock = new ReentrantReadWriteLock();
-  protected static final long INIT_SEQ_NUM = -2;
-  protected static final int INIT_UPDATE_RETRY_DELAY = 5000;
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(UpdateForwarder.class);
-  private static final String UPDATABLE_TYPE_NAME = "update_forwarder";
-
-  public UpdateForwarder(Configuration conf, Updateable<K> updateable,
-      ImageRetriever<K> imageRetreiver, int maxUpdateLogSize, boolean shouldInit) {
-    this(conf, updateable, imageRetreiver, maxUpdateLogSize, INIT_UPDATE_RETRY_DELAY, shouldInit);
-  }
-
-  protected UpdateForwarder(Configuration conf, Updateable<K> updateable, //NOPMD
-      ImageRetriever<K> imageRetreiver, int maxUpdateLogSize,
-      int initUpdateRetryDelay, boolean shouldInit) {
-    this.maxUpdateLogSize = maxUpdateLogSize;
-    this.imageRetreiver = imageRetreiver;
-    if (shouldInit) {
-      spawnInitialUpdater(updateable, initUpdateRetryDelay);
-    } else {
-      this.updateable = updateable;
-    }
-  }
-
-  public static <K extends Updateable.Update> UpdateForwarder<K> create(Configuration conf,
-      Updateable<K> updateable, K update, ImageRetriever<K> imageRetreiver,
-      int maxUpdateLogSize, boolean shouldInit) throws SentryPluginException {
-    return create(conf, updateable, update, imageRetreiver, maxUpdateLogSize,
-        INIT_UPDATE_RETRY_DELAY, shouldInit);
-  }
-
-  public static <K extends Updateable.Update> UpdateForwarder<K> create(Configuration conf,
-      Updateable<K> updateable, K update, ImageRetriever<K> imageRetreiver,
-      int maxUpdateLogSize, int initUpdateRetryDelay, boolean shouldInit) throws SentryPluginException {
-    return new UpdateForwarder<K>(conf, updateable, imageRetreiver,
-        maxUpdateLogSize, initUpdateRetryDelay, shouldInit);
-  }
-
-  private void spawnInitialUpdater(final Updateable<K> updateable,
-      final int initUpdateRetryDelay) {
-    K firstFullImage = null;
-    try {
-      firstFullImage = imageRetreiver.retrieveFullImage(INIT_SEQ_NUM);
-    } catch (Exception e) {
-      LOGGER.warn("InitialUpdater encountered exception !! ", e);
-      firstFullImage = null;
-      Thread initUpdater = new Thread() {
-        @Override
-        public void run() {
-          while (UpdateForwarder.this.updateable == null) {
-            try {
-              Thread.sleep(initUpdateRetryDelay);
-            } catch (InterruptedException e) {
-              LOGGER.warn("Thread interrupted !! ", e);
-              break;
-            }
-            K fullImage = null;
-            try {
-              fullImage =
-                  UpdateForwarder.this.imageRetreiver
-                  .retrieveFullImage(INIT_SEQ_NUM);
-              appendToUpdateLog(fullImage);
-            } catch (Exception e) {
-              LOGGER.warn("InitialUpdater encountered exception !! ", e);
-            }
-            if (fullImage != null) {
-              UpdateForwarder.this.updateable = updateable.updateFull(fullImage);
-            }
-          }
-        }
-      };
-      initUpdater.start();
-    }
-    if (firstFullImage != null) {
-      try {
-        appendToUpdateLog(firstFullImage);
-      } catch (Exception e) {
-        LOGGER.warn("failed to update append log: ", e);
-      }
-      this.updateable = updateable.updateFull(firstFullImage);
-    }
-  }
-  /**
-   * Handle notifications from HMS plug-in or upstream Cache
-   * @param update
-   */
-  public void handleUpdateNotification(final K update) throws SentryPluginException {
-    // Correct the seqNums on the first update
-    if (lastCommittedSeqNum.get() == INIT_SEQ_NUM) {
-      K firstUpdate = getUpdateLog().peek();
-      long firstSeqNum = update.getSeqNum() - 1;
-      if (firstUpdate != null) {
-        firstUpdate.setSeqNum(firstSeqNum);
-      }
-      lastCommittedSeqNum.set(firstSeqNum);
-      lastSeenSeqNum.set(firstSeqNum);
-    }
-    final boolean editNotMissed =
-        lastSeenSeqNum.incrementAndGet() == update.getSeqNum();
-    if (!editNotMissed) {
-      lastSeenSeqNum.set(update.getSeqNum());
-    }
-    Runnable task = new Runnable() {
-      @Override
-      public void run() {
-        K toUpdate = update;
-        if (update.hasFullImage()) {
-          updateable = updateable.updateFull(update);
-        } else {
-          if (editNotMissed) {
-            // apply partial preUpdate
-            updateable.updatePartial(Collections.singletonList(update), lock);
-          } else {
-            // Retrieve full update from External Source and
-            if (imageRetreiver != null) {
-              try {
-                toUpdate = imageRetreiver
-                    .retrieveFullImage(update.getSeqNum());
-              } catch (Exception e) {
-                LOGGER.warn("failed to retrieve full image: ", e);
-              }
-              updateable = updateable.updateFull(toUpdate);
-            }
-          }
-        }
-        try {
-          appendToUpdateLog(toUpdate);
-        } catch (Exception e) {
-          LOGGER.warn("failed to append to update log", e);
-        }
-      }
-    };
-    updateHandler.execute(task);
-  }
-
-  protected void appendToUpdateLog(K update) throws Exception {
-    synchronized (getUpdateLog()) {
-      boolean logCompacted = false;
-      if (getMaxUpdateLogSize() > 0) {
-        if (update.hasFullImage() || getUpdateLog().size() == getMaxUpdateLogSize()) {
-          // Essentially a log compaction
-          getUpdateLog().clear();
-          getUpdateLog().add(update.hasFullImage() ? update
-              : createFullImageUpdate(update.getSeqNum()));
-          logCompacted = true;
-        } else {
-          getUpdateLog().add(update);
-        }
-      }
-      lastCommittedSeqNum.set(update.getSeqNum());
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("#### Appending to Update Log ["
-            + "type=" + update.getClass() + ", "
-            + "lastCommit=" + lastCommittedSeqNum.get() + ", "
-            + "lastSeen=" + lastSeenSeqNum.get() + ", "
-            + "logCompacted=" + logCompacted + "]");
-      }
-    }
-  }
-
-  /**
-   * Return all updates from requested seqNum (inclusive)
-   * @param seqNum
-   * @return
-   */
-  public List<K> getAllUpdatesFrom(long seqNum) throws Exception {
-    List<K> retVal = new LinkedList<K>();
-    synchronized (getUpdateLog()) {
-      long currSeqNum = lastCommittedSeqNum.get();
-      if (LOGGER.isDebugEnabled() && updateable != null) {
-        LOGGER.debug("#### GetAllUpdatesFrom ["
-            + "type=" + updateable.getClass() + ", "
-            + "reqSeqNum=" + seqNum + ", "
-            + "lastCommit=" + lastCommittedSeqNum.get() + ", "
-            + "lastSeen=" + lastSeenSeqNum.get() + ", "
-            + "getMaxUpdateLogSize()=" + getUpdateLog().size() + "]");
-      }
-      if (getMaxUpdateLogSize() == 0) {
-        // no updatelog configured..
-        return retVal;
-      }
-      K head = getUpdateLog().peek();
-      if (head == null) {
-        return retVal;
-      }
-      if (seqNum > currSeqNum + 1) {
-        // This process has probably restarted since downstream
-        // recieved last update
-        retVal.addAll(getUpdateLog());
-        return retVal;
-      }
-      if (head.getSeqNum() > seqNum) {
-        // Caller has diverged greatly..
-        if (head.hasFullImage()) {
-          // head is a refresh(full) image
-          // Send full image along with partial updates
-          for (K u : getUpdateLog()) {
-            retVal.add(u);
-          }
-        } else {
-          // Create a full image
-          // clear updateLog
-          // add fullImage to head of Log
-          // NOTE : This should ideally never happen
-          K fullImage = createFullImageUpdate(currSeqNum);
-          getUpdateLog().clear();
-          getUpdateLog().add(fullImage);
-          retVal.add(fullImage);
-        }
-      } else {
-        // increment iterator to requested seqNum
-        Iterator<K> iter = getUpdateLog().iterator();
-        while (iter.hasNext()) {
-          K elem = iter.next();
-          if (elem.getSeqNum() >= seqNum) {
-            retVal.add(elem);
-          }
-        }
-      }
-    }
-    return retVal;
-  }
-
-  public boolean areAllUpdatesCommited() {
-    return lastCommittedSeqNum.get() == lastSeenSeqNum.get();
-  }
-
-  public long getLastCommitted() {
-    return lastCommittedSeqNum.get();
-  }
-
-  public long getLastSeen() {
-    return lastSeenSeqNum.get();
-  }
-
-  @Override
-  public Updateable<K> updateFull(K update) {
-    return (updateable != null) ? updateable.updateFull(update) : null;
-  }
-
-  @Override
-  public void updatePartial(Iterable<K> updates, ReadWriteLock lock) {
-    if (updateable != null) {
-      updateable.updatePartial(updates, lock);
-    }
-  }
-
-  @Override
-  public long getLastUpdatedSeqNum() {
-    return (updateable != null) ? updateable.getLastUpdatedSeqNum() : INIT_SEQ_NUM;
-  }
-
-  @Override
-  public K createFullImageUpdate(long currSeqNum) throws Exception {
-    return (updateable != null) ? updateable.createFullImageUpdate(currSeqNum) : null;
-  }
-
-  @Override
-  public String getUpdateableTypeName() {
-    // TODO Auto-generated method stub
-    return UPDATABLE_TYPE_NAME;
-  }
-
-  protected LinkedList<K> getUpdateLog() {
-    return updateLog;
-  }
-
-  protected int getMaxUpdateLogSize() {
-    return maxUpdateLogSize;
-  }
-
-  @Override
-  public void close() throws IOException {
-  }
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java
deleted file mode 100644
index 03c67d6..0000000
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.hdfs;
-
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-
-public class UpdateablePermissions implements Updateable<PermissionsUpdate>{
-  private static final String UPDATABLE_TYPE_NAME = "perm_update";
-
-  private AtomicLong seqNum = new AtomicLong();
-  private final ImageRetriever<PermissionsUpdate> imageRetreiver;
-
-  public UpdateablePermissions(
-      ImageRetriever<PermissionsUpdate> imageRetreiver) {
-    this.imageRetreiver = imageRetreiver;
-  }
-
-  @Override
-  public PermissionsUpdate createFullImageUpdate(long currSeqNum) throws Exception {
-    return imageRetreiver.retrieveFullImage(currSeqNum);
-  }
-
-  @Override
-  public long getLastUpdatedSeqNum() {
-    return seqNum.get();
-  }
-
-  @Override
-  public void updatePartial(Iterable<PermissionsUpdate> update,
-      ReadWriteLock lock) {
-    for (PermissionsUpdate permsUpdate : update) {
-      seqNum.set(permsUpdate.getSeqNum());
-    }
-  }
-
-  @Override
-  public Updateable<PermissionsUpdate> updateFull(PermissionsUpdate update) {
-    UpdateablePermissions other = new UpdateablePermissions(imageRetreiver);
-    other.seqNum.set(update.getSeqNum());
-    return other;
-  }
-
-  @Override
-  public String getUpdateableTypeName() {
-    return UPDATABLE_TYPE_NAME;
-  }
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
deleted file mode 100644
index d12b134..0000000
--- a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
+++ /dev/null
@@ -1,359 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sentry.hdfs;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.locks.ReadWriteLock;
-
-import org.apache.thrift.TException;
-import org.junit.Assert;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.sentry.hdfs.Updateable.Update;
-import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
-import org.junit.After;
-import org.junit.Assume;
-import org.junit.Test;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-
-public class TestUpdateForwarder {
-
-  public static class DummyUpdate implements Update {
-    private long seqNum = 0;
-    private boolean hasFullUpdate = false;
-    private String state;
-    public DummyUpdate() {
-      this(0, false);
-    }
-    public DummyUpdate(long seqNum, boolean hasFullUpdate) {
-      this.seqNum = seqNum;
-      this.hasFullUpdate = hasFullUpdate;
-    }
-    public String getState() {
-      return state;
-    }
-    public DummyUpdate setState(String stuff) {
-      this.state = stuff;
-      return this;
-    }
-    @Override
-    public boolean hasFullImage() {
-      return hasFullUpdate;
-    }
-    @Override
-    public long getSeqNum() {
-      return seqNum;
-    }
-    @Override
-    public void setSeqNum(long seqNum) {
-      this.seqNum = seqNum;
-    }
-    @Override
-    public byte[] serialize() throws IOException {
-      return state.getBytes();
-    }
-
-    @Override
-    public void deserialize(byte[] data) throws IOException {
-      state = new String(data);
-    }
-
-    @Override
-    public String JSONSerialize() throws TException {
-      return state;
-    }
-
-    @Override
-    public void JSONDeserialize(String update) throws TException {
-      state = new String(update);
-    }
-  }
-
-  static class DummyUpdatable implements Updateable<DummyUpdate> {
-
-    private List<String> state = new LinkedList<String>();
-    private long lastUpdatedSeqNum = 0;
-
-    @Override
-    public void updatePartial(Iterable<DummyUpdate> update, ReadWriteLock lock) {
-      for (DummyUpdate u : update) {
-        state.add(u.getState());
-        lastUpdatedSeqNum = u.seqNum;
-      }
-    }
-
-    @Override
-    public Updateable<DummyUpdate> updateFull(DummyUpdate update) {
-      DummyUpdatable retVal = new DummyUpdatable();
-      retVal.lastUpdatedSeqNum = update.seqNum;
-      retVal.state = Lists.newArrayList(update.state.split(","));
-      return retVal;
-    }
-
-    @Override
-    public long getLastUpdatedSeqNum() {
-      return lastUpdatedSeqNum;
-    }
-
-    @Override
-    public DummyUpdate createFullImageUpdate(long currSeqNum) {
-      DummyUpdate retVal = new DummyUpdate(currSeqNum, true);
-      retVal.state = Joiner.on(",").join(state);
-      return retVal;
-    }
-
-    public String getState() {
-      return Joiner.on(",").join(state);
-    }
-
-    @Override
-    public String getUpdateableTypeName() {
-      // TODO Auto-generated method stub
-      return "DummyUpdator";
-    }
-  }
-
-  static class DummyImageRetreiver implements ImageRetriever<DummyUpdate> {
-
-    private String state;
-    public void setState(String state) {
-      this.state = state;
-    }
-    @Override
-    public DummyUpdate retrieveFullImage(long currSeqNum) {
-      DummyUpdate retVal = new DummyUpdate(currSeqNum, true);
-      retVal.state = state;
-      return retVal;
-    }
-  }
-
-  protected Configuration testConf = new Configuration();
-  protected UpdateForwarder<DummyUpdate> updateForwarder;
-
-  @After
-  public void cleanup() throws Exception {
-    if (updateForwarder != null) {
-      updateForwarder.close();
-      updateForwarder = null;
-    }
-  }
-
-  @Test
-  public void testInit() throws Exception {
-    DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
-    imageRetreiver.setState("a,b,c");
-    updateForwarder = UpdateForwarder.create(
-        testConf, new DummyUpdatable(), new DummyUpdate(), imageRetreiver, 10, true);
-    Assert.assertEquals(-2, updateForwarder.getLastUpdatedSeqNum());
-    List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
-    Assert.assertTrue(allUpdates.size() == 1);
-    Assert.assertEquals("a,b,c", allUpdates.get(0).getState());
-
-    // If the current process has restarted the input seqNum will be > currSeq
-    allUpdates = updateForwarder.getAllUpdatesFrom(100);
-    Assert.assertTrue(allUpdates.size() == 1);
-    Assert.assertEquals("a,b,c", allUpdates.get(0).getState());
-    Assert.assertEquals(-2, allUpdates.get(0).getSeqNum());
-    allUpdates = updateForwarder.getAllUpdatesFrom(-1);
-    Assert.assertEquals(0, allUpdates.size());
-  }
-
-  @Test
-  public void testUpdateReceive() throws Exception {
-    DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
-    imageRetreiver.setState("a,b,c");
-    updateForwarder = UpdateForwarder.create(
-        testConf, new DummyUpdatable(), new DummyUpdate(), imageRetreiver, 5, true);
-    updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
-    while(!updateForwarder.areAllUpdatesCommited()) {
-      Thread.sleep(100);
-    }
-    Assert.assertEquals(5, updateForwarder.getLastUpdatedSeqNum());
-    List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
-    Assert.assertEquals(2, allUpdates.size());
-    Assert.assertEquals("a,b,c", allUpdates.get(0).getState());
-    Assert.assertEquals("d", allUpdates.get(1).getState());
-  }
-
-  // This happens when we the first update from HMS is a -1 (If the heartbeat
-  // thread checks Sentry's current seqNum before any update has come in)..
-  // This will lead the first and second entries in the updatelog to differ
-  // by more than +1..
-  @Test
-  public void testUpdateReceiveWithNullImageRetriver() throws Exception {
-    Assume.assumeTrue(!testConf.getBoolean(ServerConfig.SENTRY_HA_ENABLED,
-        false));
-    updateForwarder = UpdateForwarder.create(
-        testConf, new DummyUpdatable(), new DummyUpdate(), null, 5, false);
-    updateForwarder.handleUpdateNotification(new DummyUpdate(-1, true).setState("a"));
-    while(!updateForwarder.areAllUpdatesCommited()) {
-      Thread.sleep(100);
-    }
-    List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(1);
-    Assert.assertEquals("a", allUpdates.get(0).getState());
-    updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("b"));
-    while(!updateForwarder.areAllUpdatesCommited()) {
-      Thread.sleep(100);
-    }
-    updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("c"));
-    while(!updateForwarder.areAllUpdatesCommited()) {
-      Thread.sleep(100);
-    }
-    Assert.assertEquals(7, updateForwarder.getLastUpdatedSeqNum());
-    allUpdates = updateForwarder.getAllUpdatesFrom(0);
-    Assert.assertEquals(2, allUpdates.size());
-    Assert.assertEquals("b", allUpdates.get(0).getState());
-    Assert.assertEquals("c", allUpdates.get(1).getState());
-  }
-
-  @Test
-  public void testGetUpdates() throws Exception {
-    DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
-    imageRetreiver.setState("a,b,c");
-    updateForwarder = UpdateForwarder.create(
-        testConf, new DummyUpdatable(), new DummyUpdate(), imageRetreiver, 5, true);
-    updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
-    while(!updateForwarder.areAllUpdatesCommited()) {
-      Thread.sleep(100);
-    }
-    Assert.assertEquals(5, updateForwarder.getLastUpdatedSeqNum());
-    List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
-    Assert.assertEquals(2, allUpdates.size());
-
-    updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("e"));
-    updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("f"));
-
-    while(!updateForwarder.areAllUpdatesCommited()) {
-      Thread.sleep(100);
-    }
-    Assert.assertEquals(7, updateForwarder.getLastUpdatedSeqNum());
-    allUpdates = updateForwarder.getAllUpdatesFrom(0);
-    Assert.assertEquals(4, allUpdates.size());
-    Assert.assertEquals("a,b,c", allUpdates.get(0).getState());
-    Assert.assertEquals(4, allUpdates.get(0).getSeqNum());
-    Assert.assertEquals("d", allUpdates.get(1).getState());
-    Assert.assertEquals(5, allUpdates.get(1).getSeqNum());
-    Assert.assertEquals("e", allUpdates.get(2).getState());
-    Assert.assertEquals(6, allUpdates.get(2).getSeqNum());
-    Assert.assertEquals("f", allUpdates.get(3).getState());
-    Assert.assertEquals(7, allUpdates.get(3).getSeqNum());
-
-    updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setState("g"));
-    while(!updateForwarder.areAllUpdatesCommited()) {
-      Thread.sleep(100);
-    }
-    Assert.assertEquals(8, updateForwarder.getLastUpdatedSeqNum());
-    allUpdates = updateForwarder.getAllUpdatesFrom(8);
-    Assert.assertEquals(1, allUpdates.size());
-    Assert.assertEquals("g", allUpdates.get(0).getState());
-  }
-
-  @Test
-  public void testGetUpdatesAfterExternalEntityReset() throws Exception {
-    /*
-     * Disabled for Sentry HA. Since the sequence numbers are trakced in ZK, the
-     * lower sequence updates are ignored which causes this test to fail in HA
-     * mode
-     */
-    Assume.assumeTrue(!testConf.getBoolean(ServerConfig.SENTRY_HA_ENABLED,
-        false));
-
-    DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
-    imageRetreiver.setState("a,b,c");
-    updateForwarder = UpdateForwarder.create(
-        testConf, new DummyUpdatable(), new DummyUpdate(), imageRetreiver, 5, true);
-    updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
-    while(!updateForwarder.areAllUpdatesCommited()) {
-      Thread.sleep(100);
-    }
-
-    updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("e"));
-    updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("f"));
-
-    while(!updateForwarder.areAllUpdatesCommited()) {
-      Thread.sleep(100);
-    }
-    Assert.assertEquals(7, updateForwarder.getLastUpdatedSeqNum());
-    List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
-    Assert.assertEquals(4, allUpdates.size());
-    Assert.assertEquals("f", allUpdates.get(3).getState());
-    Assert.assertEquals(7, allUpdates.get(3).getSeqNum());
-
-    updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setState("g"));
-    while(!updateForwarder.areAllUpdatesCommited()) {
-      Thread.sleep(100);
-    }
-    Assert.assertEquals(8, updateForwarder.getLastUpdatedSeqNum());
-    allUpdates = updateForwarder.getAllUpdatesFrom(8);
-    Assert.assertEquals(1, allUpdates.size());
-    Assert.assertEquals("g", allUpdates.get(0).getState());
-
-    imageRetreiver.setState("a,b,c,d,e,f,g,h");
-
-    // New update comes with SeqNum = 1
-    updateForwarder.handleUpdateNotification(new DummyUpdate(1, false).setState("h"));
-    while(!updateForwarder.areAllUpdatesCommited()) {
-      Thread.sleep(100);
-    }
-    // NN plugin asks for next update
-    allUpdates = updateForwarder.getAllUpdatesFrom(9);
-    Assert.assertEquals(1, allUpdates.size());
-    Assert.assertEquals("a,b,c,d,e,f,g,h", allUpdates.get(0).getState());
-    // Assert.assertEquals(1, allUpdates.get(0).getSeqNum());
-  }
-
-  @Test
-  public void testUpdateLogCompression() throws Exception {
-    DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
-    imageRetreiver.setState("a,b,c");
-    updateForwarder = UpdateForwarder.create(
-        testConf, new DummyUpdatable(), new DummyUpdate(), imageRetreiver, 5, true);
-    updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
-    while(!updateForwarder.areAllUpdatesCommited()) {
-      Thread.sleep(100);
-    }
-    Assert.assertEquals(5, updateForwarder.getLastUpdatedSeqNum());
-    List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
-    Assert.assertEquals(2, allUpdates.size());
-
-    updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("e"));
-    updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("f"));
-    updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setState("g"));
-    updateForwarder.handleUpdateNotification(new DummyUpdate(9, false).setState("h"));
-    updateForwarder.handleUpdateNotification(new DummyUpdate(10, false).setState("i"));
-    updateForwarder.handleUpdateNotification(new DummyUpdate(11, false).setState("j"));
-
-    while(!updateForwarder.areAllUpdatesCommited()) {
-      Thread.sleep(100);
-    }
-    Assert.assertEquals(11, updateForwarder.getLastUpdatedSeqNum());
-    allUpdates = updateForwarder.getAllUpdatesFrom(0);
-    Assert.assertEquals(3, allUpdates.size());
-    Assert.assertEquals("a,b,c,d,e,f,g,h", allUpdates.get(0).getState());
-    Assert.assertEquals(9, allUpdates.get(0).getSeqNum());
-    Assert.assertEquals("i", allUpdates.get(1).getState());
-    Assert.assertEquals(10, allUpdates.get(1).getSeqNum());
-    Assert.assertEquals("j", allUpdates.get(2).getState());
-    Assert.assertEquals(11, allUpdates.get(2).getSeqNum());
-  }
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
index 3536579..d1edcb1 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
@@ -3404,6 +3404,33 @@ public class SentryStore {
   }
 
   /**
+   * Get the list of MSentryPermChange objects greater than and
+   * equal with the given ChangeID.
+   *
+   * @param changeID
+   * @return the list of MSentryPermChange objects
+   * @throws Exception
+   */
+  public List<MSentryPermChange> getMSentryPermChanges(final long changeID)
+      throws Exception {
+    return tm.executeTransaction(
+    new TransactionBlock<List<MSentryPermChange>>() {
+      public List<MSentryPermChange> execute(PersistenceManager pm) throws Exception {
+        Query query = pm.newQuery(MSentryPermChange.class);
+        query.setFilter("this.changeID >= t");
+        query.declareParameters("long t");
+        List<MSentryPermChange> permChanges =
+            (List<MSentryPermChange>)query.execute(changeID);
+        if (permChanges == null) {
+          noSuchUpdate(changeID);
+        }
+
+        return permChanges;
+      }
+    });
+  }
+
+  /**
    * Get the MSentryPermChange object by ChangeID.
    *
    * @param changeID the given changeID.
@@ -3429,6 +3456,57 @@ public class SentryStore {
   }
 
   /**
+   * Find the MSentryPermChange object by ChangeID.
+   *
+   * @param changeID
+   * @return true if found the MSentryPermChange object, otherwise false.
+   * @throws Exception
+   */
+  public boolean findMSentryPermChangeByID(final long changeID) throws Exception {
+    return tm.executeTransaction(
+    new TransactionBlock<Boolean>() {
+      public Boolean execute(PersistenceManager pm) throws Exception {
+        Query query = pm.newQuery(MSentryPermChange.class);
+        query.setFilter("this.changeID == t");
+        query.declareParameters("long t");
+        List<MSentryPermChange> pathChanges =
+            (List<MSentryPermChange>)query.execute(changeID);
+        if (pathChanges == null) {
+          return false;
+        } else {
+          return true;
+        }
+      }
+    });
+  }
+
+  /**
+   * Get the list of MSentryPathChange objects greater than and
+   * equal with the given ChangeID.
+   *
+   * @param changeID
+   * @return the list of MSentryPathChange objects
+   * @throws Exception
+   */
+  public List<MSentryPathChange> getMSentryPathChanges(final long changeID)
+        throws Exception {
+    return tm.executeTransaction(
+    new TransactionBlock<List<MSentryPathChange>>() {
+      public List<MSentryPathChange> execute(PersistenceManager pm) throws Exception {
+        Query query = pm.newQuery(MSentryPathChange.class);
+        query.setFilter("this.changeID >= t");
+        query.declareParameters("long t");
+        List<MSentryPathChange> pathChanges =
+            (List<MSentryPathChange>)query.execute(changeID);
+        if (pathChanges == null) {
+          noSuchUpdate(changeID);
+        }
+        return pathChanges;
+      }
+    });
+  }
+
+  /**
    * Get the MSentryPathChange object by ChangeID.
    */
   public MSentryPathChange getMSentryPathChangeByID(final long changeID) throws Exception {
@@ -3451,6 +3529,30 @@ public class SentryStore {
   }
 
   /**
+   * Find the MSentryPathChange object by ChangeID.
+   *
+   * @param changeID
+   * @return true if found the MSentryPathChange object, otherwise false.
+   * @throws Exception
+   */
+  public boolean findMSentryPathChangeByID(final long changeID) throws Exception {
+    return tm.executeTransaction(
+    new TransactionBlock<Boolean>() {
+      public Boolean execute(PersistenceManager pm) throws Exception {
+        Query query = pm.newQuery(MSentryPathChange.class);
+        query.setFilter("this.changeID == t");
+        query.declareParameters("long t");
+        List<MSentryPathChange> pathChanges =
+            (List<MSentryPathChange>)query.execute(changeID);
+        if (pathChanges == null) {
+          return false;
+        } else {
+          return true;
+        }
+      }
+    });
+  }
+  /**
    * Execute Perm/Path UpdateTransaction and corresponding actual
    * action transaction, e.g dropSentryRole, in a single transaction.
    * The order of the transaction does not matter because there is no

http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
index be59c1e..7d818c1 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
@@ -158,8 +158,9 @@ public class SentryService implements Callable, SigUtils.SigListener {
     if (notificationLogEnabled) {
       try {
         hmsFollowerExecutor = Executors.newScheduledThreadPool(1);
+        //TODO: make initialDelay and period time to be configurable
         hmsFollowerExecutor.scheduleAtFixedRate(new HMSFollower(conf, leaderMonitor),
-                60000, 500, TimeUnit.MILLISECONDS);
+                30000, 500, TimeUnit.MILLISECONDS);
       } catch (Exception e) {
         //TODO: Handle
         LOGGER.error("Could not start HMSFollower");

http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
index 4f4d3e6..7b6d753 100644
--- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
+++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
@@ -221,7 +221,7 @@ public class TestHDFSIntegration {
       @Override
       public Void run() throws Exception {
         HiveConf hiveConf = new HiveConf();
-        hiveConf.set("sentry.metastore.plugins", "org.apache.sentry.hdfs.MetastorePlugin");
+        //hiveConf.set("sentry.metastore.plugins", "org.apache.sentry.hdfs.MetastorePlugin");
         hiveConf.set("sentry.service.client.server.rpc-address", "localhost");
         hiveConf.set("sentry.hdfs.service.client.server.rpc-address", "localhost");
         hiveConf.set("sentry.hdfs.service.client.server.rpc-port", String.valueOf(sentryPort));
@@ -261,7 +261,8 @@ public class TestHDFSIntegration {
         hiveConf.set("hive.metastore.authorization.storage.checks", "true");
         hiveConf.set("hive.metastore.uris", "thrift://localhost:" + hmsPort);
         hiveConf.set("hive.metastore.pre.event.listeners", "org.apache.sentry.binding.metastore.MetastoreAuthzBinding");
-        hiveConf.set("hive.metastore.event.listeners", "org.apache.sentry.binding.metastore.SentryMetastorePostEventListener");
+        hiveConf.set("hive.metastore.event.listeners", "org.apache.sentry.binding.metastore.SentryMetastorePostEventListenerNotificationLog");
+        hiveConf.set("hcatalog.message.factory.impl.json","org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageFactory");
         hiveConf.set("hive.security.authorization.task.factory", "org.apache.sentry.binding.hive.SentryHiveAuthorizationTaskFactoryImpl");
         hiveConf.set("hive.server2.session.hook", "org.apache.sentry.binding.hive.HiveAuthzBindingSessionHook");
         hiveConf.set("sentry.metastore.service.users", "hive");// queries made by hive user (beeline) skip meta store check
@@ -451,6 +452,7 @@ public class TestHDFSIntegration {
           properties.put(ServerConfig.RPC_ADDRESS, "localhost");
           properties.put(ServerConfig.RPC_PORT, String.valueOf(sentryPort > 0 ? sentryPort : 0));
           properties.put(ServerConfig.SENTRY_VERIFY_SCHEM_VERSION, "false");
+          properties.put(ServerConfig.SENTRY_NOTIFICATION_LOG_ENABLED,"true");
 
           properties.put(ServerConfig.SENTRY_STORE_GROUP_MAPPING, ServerConfig.SENTRY_STORE_LOCAL_GROUP_MAPPING);
           properties.put(ServerConfig.SENTRY_STORE_GROUP_MAPPING_RESOURCE, policyFileLocation.getPath());