You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ha...@apache.org on 2017/03/24 19:53:10 UTC

[2/2] sentry git commit: SENTRY-1613: Add propagating logic for Perm/Path updates in Sentry service (Hao Hao, Reviewed by: Alexander Kolbasov and Lei Xu)

SENTRY-1613: Add propagating logic for Perm/Path updates in Sentry service (Hao Hao, Reviewed by: Alexander Kolbasov and Lei Xu)

Change-Id: I1223a45df8ab1c169772b2ffe92762f0dcc4e82e


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/2811311e
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/2811311e
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/2811311e

Branch: refs/heads/sentry-ha-redesign
Commit: 2811311ea6dfe2e26af67a545333486ca0e89092
Parents: 268ee50
Author: hahao <ha...@cloudera.com>
Authored: Thu Mar 23 17:51:15 2017 -0700
Committer: hahao <ha...@cloudera.com>
Committed: Fri Mar 24 12:51:39 2017 -0700

----------------------------------------------------------------------
 .../org/apache/sentry/hdfs/DeltaRetriever.java  |  67 ++++
 .../org/apache/sentry/hdfs/ImageRetriever.java  |   7 +-
 .../apache/sentry/hdfs/ThriftSerializer.java    |  10 +-
 .../apache/sentry/hdfs/DBUpdateForwarder.java   |  88 ++++
 .../org/apache/sentry/hdfs/MetastorePlugin.java | 397 -------------------
 .../apache/sentry/hdfs/PathDeltaRetriever.java  |  76 ++++
 .../apache/sentry/hdfs/PathImageRetriever.java  |  26 +-
 .../apache/sentry/hdfs/PermDeltaRetriever.java  |  76 ++++
 .../apache/sentry/hdfs/PermImageRetriever.java  |  14 +-
 .../sentry/hdfs/SentryHDFSServiceProcessor.java |  29 +-
 .../sentry/hdfs/SentryHdfsMetricsUtil.java      |  19 -
 .../org/apache/sentry/hdfs/SentryPlugin.java    | 168 +++-----
 .../org/apache/sentry/hdfs/UpdateForwarder.java | 335 ----------------
 .../sentry/hdfs/UpdateablePermissions.java      |  63 ---
 .../apache/sentry/hdfs/TestUpdateForwarder.java | 359 -----------------
 .../db/service/persistent/SentryStore.java      | 176 +++++++-
 .../sentry/service/thrift/HMSFollower.java      |   2 -
 .../db/service/persistent/TestSentryStore.java  |  13 +-
 .../tests/e2e/hdfs/TestHDFSIntegration.java     |   2 +-
 .../e2e/hdfs/TestHDFSIntegrationAdvanced.java   |   1 +
 .../tests/e2e/hdfs/TestHDFSIntegrationBase.java |   3 +-
 .../e2e/hdfs/TestHDFSIntegrationEnd2End.java    |   1 +
 22 files changed, 572 insertions(+), 1360 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/DeltaRetriever.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/DeltaRetriever.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/DeltaRetriever.java
new file mode 100644
index 0000000..0e58593
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/DeltaRetriever.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.hdfs;
+
+import java.util.Collection;
+
+import static org.apache.sentry.hdfs.Updateable.Update;
+
+/**
+ * DeltaRetriever obtains a delta update of either Sentry Permissions or Sentry
+ * representation of HMS Paths.
+ * <p>
+ * Sentry permissions are represented as {@link PermissionsUpdate} and HMS Paths
+ * are represented as {@link PathsUpdate}. The delta update contains change
+ * from a state to another.
+ * The {@link #retrieveDelta(long)} method obtains such delta update from a persistent storage.
+ * Delta update is propagated to a consumer of Sentry, such as HDFS NameNode whenever
+ * the consumer needs to synchronize the update.
+ */
+public interface DeltaRetriever<K extends Update> {
+
+  /**
+   * Retrieves all delta updates of type {@link Update} newer than or equal with
+   * the given sequence number/change ID (inclusive) from a persistent storage.
+   * An empty collection can be returned.
+   *
+   * @param seqNum the given seq number
+   * @return a collect of delta updates of type K
+   * @throws Exception when there is an error in operation on persistent storage
+   */
+  Collection<K> retrieveDelta(long seqNum) throws Exception;
+
+  /**
+   * Checks if there the delta update is available, given the sequence number/change
+   * ID, from a persistent storage.
+   *
+   * @param seqNum the given seq number
+   * @return true if there are such delta updates available.
+   *         Otherwise it will be false.
+   * @throws Exception when there is an error in operation on persistent storage
+   */
+  boolean isDeltaAvailable(long seqNum) throws Exception;
+
+  /**
+   * Gets the latest updated delta ID.
+   *
+   * @return the latest updated delta ID.
+   * @throws Exception when there is an error in operation on persistent storage
+   */
+  long getLatestDeltaID() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ImageRetriever.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ImageRetriever.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ImageRetriever.java
index 0e40756..e96140d 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ImageRetriever.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ImageRetriever.java
@@ -25,7 +25,7 @@ import static org.apache.sentry.hdfs.Updateable.Update;
  * ({@code PathsUpdate}).
  * <p>
  * The snapshot image should represent a consistent state.
- * The {@link #retrieveFullImage(long)} method obtains such state snapshot from
+ * The {@link #retrieveFullImage()} method obtains such state snapshot from
  * a persistent storage.
  * The Snapshots are propagated to a consumer of Sentry, such as HDFS NameNode,
  * whenever the consumer needs to synchronize its full state.
@@ -33,13 +33,12 @@ import static org.apache.sentry.hdfs.Updateable.Update;
 public interface ImageRetriever<K extends Update> {
 
   /**
-   * Retrieve a complete snapshot of type {@code Update} from a persistent storage.
+   * Retrieves a complete snapshot of type {@code Update} from a persistent storage.
    *
-   * @param seqNum
    * @return a complete snapshot of type {@link Update}, e.g {@link PermissionsUpdate}
    *         or {@link PathsUpdate}
    * @throws Exception
    */
-  K retrieveFullImage(long seqNum) throws Exception;
+  K retrieveFullImage() throws Exception;
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java
index 69aa098..d7b9923 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java
@@ -25,12 +25,12 @@ import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TSimpleJSONProtocol;
+import org.apache.thrift.protocol.TJSONProtocol;
 
 public class ThriftSerializer {
 
-  final static private TSimpleJSONProtocol.Factory tSimpleJSONProtocol =
-          new TSimpleJSONProtocol.Factory();
+  final static private TJSONProtocol.Factory tJSONProtocol =
+          new TJSONProtocol.Factory();
 
   // Use default max thrift message size here.
   // TODO: Figure out a way to make maxMessageSize configurable, eg. create a serializer singleton at startup by
@@ -67,13 +67,13 @@ public class ThriftSerializer {
 
   public static String serializeToJSON(TBase base) throws TException  {
     // Initiate a new TSerializer each time for thread safety.
-    TSerializer tSerializer = new TSerializer(tSimpleJSONProtocol);
+    TSerializer tSerializer = new TSerializer(tJSONProtocol);
     return tSerializer.toString(base);
   }
 
   public static void deserializeFromJSON(TBase base, String dataInJson) throws TException {
     // Initiate a new TDeserializer each time for thread safety.
-    TDeserializer tDeserializer = new TDeserializer(tSimpleJSONProtocol);
+    TDeserializer tDeserializer = new TDeserializer(tJSONProtocol);
     tDeserializer.fromString(base, dataInJson);
   }
 

http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/DBUpdateForwarder.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/DBUpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/DBUpdateForwarder.java
new file mode 100644
index 0000000..b8542b3
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/DBUpdateForwarder.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.sentry.provider.db.service.persistent.SentryStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * DBUpdateForwarder propagates a complete snapshot or delta update of either
+ * Sentry Permissions ({@code PermissionsUpdate}) or Sentry representation of
+ * HMS Paths ({@code PathsUpdate}), retrieved from a persistent storage, to a
+ * Sentry client, e.g HDFS NameNode.
+ * <p>
+ * It is a thread safe class, as all the underlying database operation is thread safe.
+ */
+@ThreadSafe
+class DBUpdateForwarder<K extends Updateable.Update> {
+
+  private final ImageRetriever<K> imageRetriever;
+  private final DeltaRetriever<K> deltaRetriever;
+  private static final Logger LOGGER = LoggerFactory.getLogger(DBUpdateForwarder.class);
+
+  DBUpdateForwarder(final ImageRetriever<K> imageRetriever,
+      final DeltaRetriever<K> deltaRetriever) {
+    this.imageRetriever = imageRetriever;
+    this.deltaRetriever = deltaRetriever;
+  }
+
+  /**
+   * Retrieves all delta updates from the requested sequence number (inclusive) from
+   * a persistent storage.
+   * It first checks if there is such newer deltas exists in the persistent storage.
+   * If there is, returns a list of delta updates.
+   * Otherwise, a complete snapshot will be returned.
+   *
+   * @param seqNum the requested sequence number
+   * @return a list of delta updates, e.g. {@link PathsUpdate} or {@link PermissionsUpdate}
+   */
+   List<K> getAllUpdatesFrom(long seqNum) throws Exception {
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("#### GetAllUpdatesFrom [reqSeqNum = {} ]", seqNum);
+    }
+
+    // No newer updates available than the requested one.
+    long curSeqNum = deltaRetriever.getLatestDeltaID();
+    if (seqNum > curSeqNum) {
+      return Collections.emptyList();
+    }
+
+    // Checks if there is newer deltas exists in the persistent storage.
+    // If there is, returns a list of delta updates.
+    if ((seqNum != SentryStore.INIT_CHANGE_ID) &&
+          deltaRetriever.isDeltaAvailable(seqNum)) {
+      Collection<K> deltas = deltaRetriever.retrieveDelta(seqNum);
+      if (!deltas.isEmpty()) {
+        return new LinkedList<>(deltas);
+      }
+    }
+
+    // Otherwise, a complete snapshot will be returned.
+    List<K> retVal = new LinkedList<>();
+    retVal.add(imageRetriever.retrieveFullImage());
+    return retVal;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
deleted file mode 100644
index 16ffa1b..0000000
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
+++ /dev/null
@@ -1,397 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.hdfs;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import com.codahale.metrics.Timer;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
-import org.apache.hadoop.hive.metastore.MetaStorePreEventListener;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
-import org.apache.sentry.provider.db.SentryMetastoreListenerPlugin;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- * Plugin implementation of {@link SentryMetastoreListenerPlugin} that hooks
- * into the sites in the {@link MetaStorePreEventListener} that deal with
- * creation/updation and deletion for paths.
- */
-public class MetastorePlugin extends SentryMetastoreListenerPlugin {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(MetastorePlugin.class);
-
-  private static final String initializationFailureMsg = "Cache failed to initialize, cannot send path updates to Sentry." +
-          " Please review HMS error logs during startup for additional information. If the initialization failure is due" +
-          " to SentryMalformedPathException, you will need to rectify the malformed path in HMS db and restart HMS";
-
-  class SyncTask implements Runnable {
-    @Override
-    public void run() {
-      if (!notificiationLock.tryLock()) {
-        // No need to sync.. as metastore is in the process of pushing an update..
-        return;
-      }
-      if (MetastorePlugin.this.authzPaths == null) {
-        LOGGER.warn(initializationFailureMsg);
-        return;
-      }
-      try {
-        long lastSeenBySentry =
-            MetastorePlugin.this.getClient().getLastSeenHMSPathSeqNum();
-        long lastSent = lastSentSeqNum;
-        if (lastSeenBySentry != lastSent) {
-          LOGGER.warn("#### Sentry not in sync with HMS [" + lastSeenBySentry + ", "
-              + lastSent + "]");
-          PathsUpdate fullImageUpdate =
-              MetastorePlugin.this.authzPaths.createFullImageUpdate(lastSent);
-          notifySentryNoLock(fullImageUpdate);
-          LOGGER.warn("#### Synced Sentry with update [" + lastSent + "]");
-        }
-      } catch (Exception e) {
-        sentryClient = null;
-        LOGGER.error("Error talking to Sentry HDFS Service !!", e);
-      } finally {
-        syncSent = true;
-        notificiationLock.unlock();
-      }
-    }
-  }
-
-  private final Configuration conf;
-  private SentryHDFSServiceClient sentryClient;
-  private volatile UpdateableAuthzPaths authzPaths;
-  private Lock notificiationLock;
-
-  // Initialized to some value > 1.
-  protected static final AtomicLong seqNum = new AtomicLong(5);
-
-  // Has to match the value of seqNum
-  protected static volatile long lastSentSeqNum = seqNum.get();
-  private volatile boolean syncSent = false;
-  private volatile boolean initComplete = false;
-  private volatile boolean queueFlushComplete = false;
-  private volatile Throwable initError = null;
-  private final Queue<PathsUpdate> updateQueue = new LinkedList<PathsUpdate>();
-
-  private final ExecutorService threadPool; //NOPMD
-  private final Configuration sentryConf;
-
-  static class ProxyHMSHandler extends HMSHandler {
-    public ProxyHMSHandler(String name, HiveConf conf) throws MetaException {
-      super(name, conf);
-    }
-  }
-
-  public MetastorePlugin(Configuration conf, Configuration sentryConf) {
-    this.notificiationLock = new ReentrantLock();
-
-    if (!(conf instanceof HiveConf)) {
-        String error = "Configuration is not an instanceof HiveConf";
-        LOGGER.error(error);
-        throw new RuntimeException(error);
-    }
-    this.conf = new HiveConf((HiveConf)conf);
-
-    this.sentryConf = new Configuration(sentryConf);
-    this.conf.unset(HiveConf.ConfVars.METASTORE_PRE_EVENT_LISTENERS.varname);
-    this.conf.unset(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS.varname);
-    this.conf.unset(HiveConf.ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname);
-    this.conf.unset(HiveConf.ConfVars.METASTOREURIS.varname);
-    Thread initUpdater = new Thread() {
-      @Override
-      public void run() {
-        MetastoreCacheInitializer cacheInitializer = null;
-        try {
-          cacheInitializer =
-                  new MetastoreCacheInitializer(new ProxyHMSHandler("sentry.hdfs",
-                        (HiveConf) MetastorePlugin.this.conf),
-                          MetastorePlugin.this.conf);
-          MetastorePlugin.this.authzPaths =
-                  cacheInitializer.createInitialUpdate();
-          LOGGER.info("#### Metastore Plugin initialization complete !!");
-          synchronized (updateQueue) {
-            while (!updateQueue.isEmpty()) {
-              PathsUpdate update = updateQueue.poll();
-              if (update != null) {
-                processUpdate(update);
-              }
-            }
-            queueFlushComplete = true;
-          }
-          LOGGER.info("#### Finished flushing queued updates to Sentry !!");
-        } catch (Exception e) {
-          LOGGER.error("#### Could not create Initial AuthzPaths or HMSHandler !!", e);
-          initError = e;
-        } finally {
-          if (cacheInitializer != null) {
-            try {
-              cacheInitializer.close();
-            } catch (Exception e) {
-              LOGGER.info("#### Exception while closing cacheInitializer !!", e);
-            }
-          }
-          initComplete = true;
-        }
-      }
-    };
-    if (this.conf.getBoolean(
-            ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_ASYNC_INIT_ENABLE,
-            ServerConfig
-                    .SENTRY_HDFS_SYNC_METASTORE_CACHE_ASYNC_INIT_ENABLE_DEFAULT)) {
-      LOGGER.warn("#### Metastore Cache initialization is set to aync..." +
-              "HDFS ACL synchronization will not happen until metastore" +
-              "cache initialization is completed !!");
-      initUpdater.start();
-    } else {
-      initUpdater.run(); //NOPMD
-    }
-    try {
-      sentryClient = SentryHDFSServiceClientFactory.create(sentryConf);
-    } catch (Exception e) {
-      sentryClient = null;
-      LOGGER.error("Could not connect to Sentry HDFS Service !!", e);
-    }
-    ScheduledExecutorService newThreadPool = Executors.newScheduledThreadPool(1);
-    newThreadPool.scheduleWithFixedDelay(new SyncTask(),
-            this.conf.getLong(ServerConfig
-                            .SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS,
-                    ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT),
-            this.conf.getLong(ServerConfig.SENTRY_HDFS_SYNC_CHECKER_PERIOD_MS,
-                    ServerConfig.SENTRY_HDFS_SYNC_CHECKER_PERIOD_DEFAULT),
-            TimeUnit.MILLISECONDS);
-    this.threadPool = newThreadPool;
-  }
-
-  @Override
-  public void addPath(String authzObj, String path) {
-    List<String> pathTree = null;
-    try {
-      pathTree = PathsUpdate.parsePath(path);
-    } catch (SentryMalformedPathException e) {
-      LOGGER.error("Unexpected path in addPath: authzObj = " + authzObj + " , path = " + path);
-      e.printStackTrace();
-      return;
-    }
-    if(pathTree == null) {
-      return;
-    }
-    LOGGER.debug("#### HMS Path Update ["
-        + "OP : addPath, "
-        + "authzObj : " + authzObj.toLowerCase() + ", "
-        + "path : " + path + "]");
-    PathsUpdate update = createHMSUpdate();
-    update.newPathChange(authzObj.toLowerCase()).addToAddPaths(pathTree);
-    notifySentryAndApplyLocal(update);
-  }
-
-  @Override
-  public void removeAllPaths(String authzObj, List<String> childObjects) {
-    LOGGER.debug("#### HMS Path Update ["
-        + "OP : removeAllPaths, "
-        + "authzObj : " + authzObj.toLowerCase() + ", "
-        + "childObjs : " + (childObjects == null ? "[]" : childObjects) + "]");
-    PathsUpdate update = createHMSUpdate();
-    if (childObjects != null) {
-      for (String childObj : childObjects) {
-        update.newPathChange(authzObj.toLowerCase() + "." + childObj).addToDelPaths(
-            Lists.newArrayList(PathsUpdate.ALL_PATHS));
-      }
-    }
-    update.newPathChange(authzObj.toLowerCase()).addToDelPaths(
-            Lists.newArrayList(PathsUpdate.ALL_PATHS));
-    notifySentryAndApplyLocal(update);
-  }
-
-  @Override
-  public void removePath(String authzObj, String path) {
-    if ("*".equals(path)) {
-      removeAllPaths(authzObj.toLowerCase(), null);
-    } else {
-      List<String> pathTree = null;
-      try {
-        pathTree = PathsUpdate.parsePath(path);
-      } catch (SentryMalformedPathException e) {
-        LOGGER.error("Unexpected path in removePath: authzObj = " + authzObj + " , path = " + path);
-        e.printStackTrace();
-        return;
-      }
-      if(pathTree == null) {
-        return;
-      }
-      LOGGER.debug("#### HMS Path Update ["
-          + "OP : removePath, "
-          + "authzObj : " + authzObj.toLowerCase() + ", "
-          + "path : " + path + "]");
-      PathsUpdate update = createHMSUpdate();
-      update.newPathChange(authzObj.toLowerCase()).addToDelPaths(pathTree);
-      notifySentryAndApplyLocal(update);
-    }
-  }
-
-  @Override
-  public void renameAuthzObject(String oldName, String oldPath, String newName,
-      String newPath) {
-    String oldNameLC = oldName != null ? oldName.toLowerCase() : null;
-    String newNameLC = newName != null ? newName.toLowerCase() : null;
-    PathsUpdate update = createHMSUpdate();
-    LOGGER.debug("#### HMS Path Update ["
-        + "OP : renameAuthzObject, "
-        + "oldName : " + oldNameLC + ","
-        + "oldPath : " + oldPath + ","
-        + "newName : " + newNameLC + ","
-        + "newPath : " + newPath + "]");
-    List<String> newPathTree = null;
-    try {
-      newPathTree = PathsUpdate.parsePath(newPath);
-    } catch (SentryMalformedPathException e) {
-      LOGGER.error("Unexpected path in renameAuthzObject while parsing newPath: oldName=" + oldName + ", oldPath=" + oldPath +
-      ", newName=" + newName + ", newPath=" + newPath);
-      e.printStackTrace();
-      return;
-    }
-
-    if( newPathTree != null ) {
-      update.newPathChange(newNameLC).addToAddPaths(newPathTree);
-    }
-    List<String> oldPathTree = null;
-    try {
-      oldPathTree = PathsUpdate.parsePath(oldPath);
-    } catch (SentryMalformedPathException e) {
-      LOGGER.error("Unexpected path in renameAuthzObject while parsing oldPath: oldName=" + oldName + ", oldPath=" + oldPath +
-              ", newName=" + newName + ", newPath=" + newPath);
-      e.printStackTrace();
-      return;
-    }
-
-    if( oldPathTree != null ) {
-      update.newPathChange(oldNameLC).addToDelPaths(oldPathTree);
-    }
-    notifySentryAndApplyLocal(update);
-  }
-
-  private SentryHDFSServiceClient getClient() {
-    if (sentryClient == null) {
-      try {
-        sentryClient = SentryHDFSServiceClientFactory.create(sentryConf);
-      } catch (Exception e) {
-        sentryClient = null;
-        LOGGER.error("Could not connect to Sentry HDFS Service !!", e);
-      }
-    }
-    return sentryClient;
-  }
-
-  private PathsUpdate createHMSUpdate() {
-    PathsUpdate update = new PathsUpdate(seqNum.incrementAndGet(), false);
-    LOGGER.debug("#### Creating HMS Path Update SeqNum : [" + seqNum.get() + "]");
-    return update;
-  }
-
-  protected void notifySentryNoLock(PathsUpdate update) {
-    final Timer.Context timerContext =
-        SentryHdfsMetricsUtil.getNotifyHMSUpdateTimer.time();
-    try {
-      getClient().notifyHMSUpdate(update);
-    } catch (Exception e) {
-      LOGGER.error("Could not send update to Sentry HDFS Service !!", e);
-      SentryHdfsMetricsUtil.getFailedNotifyHMSUpdateCounter.inc();
-    } finally {
-      timerContext.stop();
-    }
-  }
-
-  protected void notifySentry(PathsUpdate update) {
-    notificiationLock.lock();
-    try {
-      if (!syncSent) {
-        new SyncTask().run();
-      }
-
-      notifySentryNoLock(update);
-    } finally {
-      lastSentSeqNum = update.getSeqNum();
-      notificiationLock.unlock();
-      LOGGER.debug("#### HMS Path Last update sent : ["+ lastSentSeqNum + "]");
-    }
-  }
-
-  protected void applyLocal(PathsUpdate update) {
-    final Timer.Context timerContext =
-        SentryHdfsMetricsUtil.getApplyLocalUpdateTimer.time();
-    if(authzPaths == null) {
-      LOGGER.error(initializationFailureMsg);
-      return;
-    }
-    authzPaths.updatePartial(Lists.newArrayList(update), new ReentrantReadWriteLock());
-    timerContext.stop();
-    SentryHdfsMetricsUtil.getApplyLocalUpdateHistogram.update(
-        update.getPathChanges().size());
-  }
-
-  private void notifySentryAndApplyLocal(PathsUpdate update) {
-    if(authzPaths == null) {
-      LOGGER.error(initializationFailureMsg);
-      return;
-    }
-    if (initComplete) {
-      processUpdate(update);
-    } else {
-      if (initError == null) {
-        synchronized (updateQueue) {
-          if (!queueFlushComplete) {
-            updateQueue.add(update);
-          } else {
-            processUpdate(update);
-          }
-        }
-      } else {
-        StringWriter sw = new StringWriter();
-        initError.printStackTrace(new PrintWriter(sw));
-        LOGGER.error("#### Error initializing Metastore Plugin" +
-                "[" + sw.toString() + "] !!");
-        throw new RuntimeException(initError);
-      }
-      LOGGER.warn("#### Path update [" + update.getSeqNum() + "] not sent to Sentry.." +
-              "Metastore hasn't been initialized yet !!");
-    }
-  }
-
-  protected void processUpdate(PathsUpdate update) {
-    applyLocal(update);
-    notifySentry(update);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathDeltaRetriever.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathDeltaRetriever.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathDeltaRetriever.java
new file mode 100644
index 0000000..cea5b9d
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathDeltaRetriever.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import org.apache.sentry.provider.db.service.model.MSentryPathChange;
+import org.apache.sentry.provider.db.service.persistent.SentryStore;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * PathDeltaRetriever retrieves delta updates of Hive Paths from a persistent
+ * storage and translates them into a collection of {@code PathsUpdate} that the
+ * consumers, such as HDFS NameNode, can understand.
+ * <p>
+ * It is a thread safe class, as all the underlying database operation is thread safe.
+ */
+@ThreadSafe
+public class PathDeltaRetriever implements DeltaRetriever<PathsUpdate> {
+
+  private final SentryStore sentryStore;
+
+  PathDeltaRetriever(SentryStore sentryStore) {
+    this.sentryStore = sentryStore;
+  }
+
+  @Override
+  public Collection<PathsUpdate> retrieveDelta(long seqNum) throws Exception {
+    Collection<MSentryPathChange> mSentryPathChanges =
+            sentryStore.getMSentryPathChanges(seqNum);
+    if (mSentryPathChanges.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    Collection<PathsUpdate> updates = new ArrayList<>(mSentryPathChanges.size());
+    for (MSentryPathChange mSentryPathChange : mSentryPathChanges) {
+      // Gets the changeID from the persisted MSentryPathChange.
+      long changeID = mSentryPathChange.getChangeID();
+      // Creates a corresponding PathsUpdate and deserialize the
+      // persisted delta update in JSON format to TPathsUpdate with
+      // associated changeID.
+      PathsUpdate pathsUpdate = new PathsUpdate();
+      pathsUpdate.JSONDeserialize(mSentryPathChange.getPathChange());
+      pathsUpdate.setSeqNum(changeID);
+      updates.add(pathsUpdate);
+    }
+    return updates;
+  }
+
+  @Override
+  public boolean isDeltaAvailable(long seqNum) throws Exception {
+    return sentryStore.pathChangeExists(seqNum);
+  }
+
+  @Override
+  public long getLatestDeltaID() throws Exception {
+    return sentryStore.getLastProcessedPathChangeID();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java
index 16a1604..0eaac80 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java
@@ -18,28 +18,35 @@
 package org.apache.sentry.hdfs;
 
 import com.codahale.metrics.Timer;
+import com.google.common.collect.Lists;
 import org.apache.sentry.hdfs.service.thrift.TPathChanges;
 import org.apache.sentry.provider.db.service.persistent.PathsImage;
 import org.apache.sentry.provider.db.service.persistent.SentryStore;
 
+import javax.annotation.concurrent.ThreadSafe;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * PathImageRetriever obtains a complete snapshot of Hive Paths from a persistent
- * storage and translate it into {@code PathsUpdate} that the consumers, such as
- * HDFS NameNod, can understand.
+ * storage and translates it into {@code PathsUpdate} that the consumers, such as
+ * HDFS NameNode, can understand.
+ * <p>
+ * It is a thread safe class, as all the underlying database operation is thread safe.
  */
+@ThreadSafe
 public class PathImageRetriever implements ImageRetriever<PathsUpdate> {
 
   private final SentryStore sentryStore;
+  private final static String[] root = {"/"};
 
   PathImageRetriever(SentryStore sentryStore) {
     this.sentryStore = sentryStore;
   }
 
   @Override
-  public PathsUpdate retrieveFullImage(long seqNum) throws Exception {
+  public PathsUpdate retrieveFullImage() throws Exception {
     try (final Timer.Context timerContext =
         SentryHdfsMetricsUtil.getRetrievePathFullImageTimer.time()) {
 
@@ -54,8 +61,7 @@ public class PathImageRetriever implements ImageRetriever<PathsUpdate> {
       // Adds all <hiveObj, paths> mapping to be included in this paths update.
       // And label it with the latest delta change sequence number for consumer
       // to be aware of the next delta change it should continue with.
-      // TODO: use curSeqNum from DB instead of seqNum when doing SENTRY-1613
-      PathsUpdate pathsUpdate = new PathsUpdate(seqNum, true);
+      PathsUpdate pathsUpdate = new PathsUpdate(curSeqNum, true);
       for (Map.Entry<String, Set<String>> pathEnt : pathImage.entrySet()) {
         TPathChanges pathChange = pathsUpdate.newPathChange(pathEnt.getKey());
 
@@ -66,7 +72,15 @@ public class PathImageRetriever implements ImageRetriever<PathsUpdate> {
 
       SentryHdfsMetricsUtil.getPathChangesHistogram.update(pathsUpdate
             .getPathChanges().size());
+
+      // Translate PathsUpdate that contains a full image to TPathsDump for
+      // consumer (NN) to be able to quickly construct UpdateableAuthzPaths
+      // from TPathsDump.
+      UpdateableAuthzPaths authzPaths = new UpdateableAuthzPaths(root);
+      authzPaths.updatePartial(Lists.newArrayList(pathsUpdate),
+          new ReentrantReadWriteLock());
+      pathsUpdate.toThrift().setPathsDump(authzPaths.getPathsDump().createPathsDump());
       return pathsUpdate;
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermDeltaRetriever.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermDeltaRetriever.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermDeltaRetriever.java
new file mode 100644
index 0000000..9649b02
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermDeltaRetriever.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import org.apache.sentry.provider.db.service.model.MSentryPermChange;
+import org.apache.sentry.provider.db.service.persistent.SentryStore;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * PermDeltaRetriever retrieves delta updates of Sentry permission from a persistent
+ * storage and translates it into a collection of {@code PermissionsUpdate} that the
+ * consumers, such as HDFS NameNode, can understand.
+ * <p>
+ * It is a thread safe class, as all the underlying database operation is thread safe.
+ */
+@ThreadSafe
+public class PermDeltaRetriever implements DeltaRetriever<PermissionsUpdate> {
+
+  private final SentryStore sentryStore;
+
+  PermDeltaRetriever(SentryStore sentryStore) {
+    this.sentryStore = sentryStore;
+  }
+
+  @Override
+  public Collection<PermissionsUpdate> retrieveDelta(long seqNum) throws Exception {
+    Collection<MSentryPermChange> mSentryPermChanges =
+            sentryStore.getMSentryPermChanges(seqNum);
+    if (mSentryPermChanges.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    Collection<PermissionsUpdate> updates = new ArrayList<>(mSentryPermChanges.size());
+    for (MSentryPermChange mSentryPermChange : mSentryPermChanges) {
+      // Get the changeID from the persisted MSentryPermChange
+      long changeID = mSentryPermChange.getChangeID();
+      // Create a corresponding PermissionsUpdate and deserialize the
+      // persisted delta update in JSON format to TPermissionsUpdate with
+      // associated changeID.
+      PermissionsUpdate permsUpdate = new PermissionsUpdate();
+      permsUpdate.JSONDeserialize(mSentryPermChange.getPermChange());
+      permsUpdate.setSeqNum(changeID);
+      updates.add(permsUpdate);
+    }
+    return updates;
+  }
+
+  @Override
+  public boolean isDeltaAvailable(long seqNum) throws Exception {
+    return sentryStore.permChangeExists(seqNum);
+  }
+
+  @Override
+  public long getLatestDeltaID() throws Exception {
+    return sentryStore.getLastProcessedPermChangeID();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermImageRetriever.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermImageRetriever.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermImageRetriever.java
index 3017c9e..5964f17 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermImageRetriever.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermImageRetriever.java
@@ -24,6 +24,7 @@ import org.apache.sentry.hdfs.service.thrift.TRoleChanges;
 import org.apache.sentry.provider.db.service.persistent.PermissionsImage;
 import org.apache.sentry.provider.db.service.persistent.SentryStore;
 
+import javax.annotation.concurrent.ThreadSafe;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -31,9 +32,12 @@ import java.util.Map;
 
 /**
  * PermImageRetriever obtains a complete snapshot of Sentry permission from a persistent
- * storage and translate it into {@code PermissionsUpdate} that the consumers, such as
- * HDFS NameNod, can understand.
+ * storage and translates it into {@code PermissionsUpdate} that the consumers, such as
+ * HDFS NameNode, can understand.
+ * <p>
+ * It is a thread safe class, as all the underlying database operation is thread safe.
  */
+@ThreadSafe
 public class PermImageRetriever implements ImageRetriever<PermissionsUpdate> {
 
   private final SentryStore sentryStore;
@@ -43,7 +47,7 @@ public class PermImageRetriever implements ImageRetriever<PermissionsUpdate> {
   }
 
   @Override
-  public PermissionsUpdate retrieveFullImage(long seqNum) throws Exception {
+  public PermissionsUpdate retrieveFullImage() throws Exception {
     try(Timer.Context timerContext =
         SentryHdfsMetricsUtil.getRetrievePermFullImageTimer.time()) {
 
@@ -80,8 +84,6 @@ public class PermImageRetriever implements ImageRetriever<PermissionsUpdate> {
       }
 
       PermissionsUpdate permissionsUpdate = new PermissionsUpdate(tPermUpdate);
-      // TODO: use curSeqNum from DB instead of seqNum when doing SENTRY-1567
-      permissionsUpdate.setSeqNum(seqNum);
       SentryHdfsMetricsUtil.getPrivilegeChangesHistogram.update(
           tPermUpdate.getPrivilegeChangesSize());
       SentryHdfsMetricsUtil.getRoleChangesHistogram.update(
@@ -90,4 +92,4 @@ public class PermImageRetriever implements ImageRetriever<PermissionsUpdate> {
     }
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
index e4f3f58..395618a 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
@@ -42,10 +42,6 @@ public class SentryHDFSServiceProcessor implements SentryHDFSService.Iface {
     retVal.setAuthzPathUpdate(new LinkedList<TPathsUpdate>());
     retVal.setAuthzPermUpdate(new LinkedList<TPermissionsUpdate>());
     if (SentryPlugin.instance != null) {
-      if (SentryPlugin.instance.isOutOfSync()) {
-        throw new TException(
-            "This Sentry server is not communicating with other nodes and out of sync ");
-      }
       final Timer.Context timerContext =
           SentryHdfsMetricsUtil.getAllAuthzUpdatesTimer.time();
       try {
@@ -99,33 +95,12 @@ public class SentryHDFSServiceProcessor implements SentryHDFSService.Iface {
 
   @Override
   public void handle_hms_notification(TPathsUpdate update) throws TException {
-    final Timer.Context timerContext =
-        SentryHdfsMetricsUtil.getHandleHmsNotificationTimer.time();
-    try {
-      PathsUpdate hmsUpdate = new PathsUpdate(update);
-      if (SentryPlugin.instance != null) {
-        SentryPlugin.instance.handlePathUpdateNotification(hmsUpdate);
-        LOGGER.debug("Authz Paths update [" + hmsUpdate.getSeqNum() + "]..");
-      } else {
-        LOGGER.error("SentryPlugin not initialized yet !!");
-      }
-    } catch (Exception e) {
-      LOGGER.error("Error handling notification from HMS", e);
-      SentryHdfsMetricsUtil.getFailedHandleHmsNotificationCounter.inc();
-      throw new TException(e);
-    } finally {
-      timerContext.stop();
-      SentryHdfsMetricsUtil.getHandleHmsPathChangeHistogram.update(
-          update.getPathChangesSize());
-      if (update.isHasFullImage()) {
-        SentryHdfsMetricsUtil.getHandleHmsHasFullImageCounter.inc();
-      }
-    }
+    throw new UnsupportedOperationException("handle_hms_notification");
   }
 
   @Override
   public long check_hms_seq_num(long pathSeqNum) throws TException {
-    return SentryPlugin.instance.getLastSeenHMSPathSeqNum();
+    throw new UnsupportedOperationException("check_hms_seq_num");
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsMetricsUtil.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsMetricsUtil.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsMetricsUtil.java
index be14569..28bf20e 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsMetricsUtil.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsMetricsUtil.java
@@ -83,25 +83,6 @@ public class SentryHdfsMetricsUtil {
       MetricRegistry.name(PathImageRetriever.class, "retrieve-path-full-image",
           "path-changes-size"));
 
-
-  // Metrics for notifySentry HMS update in MetaStorePlugin
-  // The timer used for each notifySentry
-  public static final Timer getNotifyHMSUpdateTimer = sentryMetrics.getTimer(
-      MetricRegistry.name(MetastorePlugin.class, "notify-sentry-HMS-update"));
-  // The number of failed notifySentry
-  public static final Counter getFailedNotifyHMSUpdateCounter = sentryMetrics.getCounter(
-      MetricRegistry.name(MetastorePlugin.class, "notify-sentry-HMS-update",
-          "failed-num"));
-
-  // Metrics for applyLocal update in MetastorePlugin
-  // The time used for each applyLocal
-  public static final Timer getApplyLocalUpdateTimer = sentryMetrics.getTimer(
-      MetricRegistry.name(MetastorePlugin.class, "apply-local-update"));
-  // The size of path changes for each applyLocal
-  public static final Histogram getApplyLocalUpdateHistogram = sentryMetrics.getHistogram(
-      MetricRegistry.name(MetastorePlugin.class, "apply-local-update",
-          "path-change-size"));
-
   private SentryHdfsMetricsUtil() {
     // Make constructor private to avoid instantiation
   }

http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
index 029f9d5..0bd0833 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
@@ -46,22 +46,22 @@ import org.slf4j.LoggerFactory;
 import static org.apache.sentry.hdfs.Updateable.Update;
 
   /**
-   * SentryPlugin facilitates HDFS synchronization between HMS and NameNode.
+   * SentryPlugin listens to all sentry permission update events, persists permission
+   * changes into database. It also facilitates HDFS synchronization between HMS and NameNode.
    * <p>
-   * Normally, synchronization happens via partial (incremental) updates:
+   * Synchronization happens via a complete snapshot or partial (incremental) updates.
+   * Normally, it is the latter:
    * <ol>
    * <li>
-   * Whenever updates happen on HMS, they are immediately pushed to Sentry.
-   * Commonly, it's a single update per remote call.
+   * Whenever updates happen on HMS, a corresponding notification log is generated,
+   * and {@link HMSFollower} will process the notification event and persist it in database.
    * <li>
    * The NameNode periodically asks Sentry for updates. Sentry may return zero
-   * or more updates previously received from HMS.
+   * or more updates previously received via HMS notification log.
    * </ol>
    * <p>
-   * Each individual update is assigned a corresponding sequence number. Those
-   * numbers serve to detect the out-of-sync situations between HMS and Sentry and
-   * between Sentry and NameNode. Detecting out-of-sync situation triggers full
-   * update between the components that are out-of-sync.
+   * Each individual update is assigned a corresponding sequence number to synchronize
+   * updates between Sentry and NameNode.
    * <p>
    * SentryPlugin also implements signal-triggered mechanism of full path
    * updates from HMS to Sentry and from Sentry to NameNode, to address
@@ -69,39 +69,18 @@ import static org.apache.sentry.hdfs.Updateable.Update;
    * Those out-of-sync situations may not be detectable via the exsiting sequence
    * numbers mechanism (most likely due to the implementation bugs).
    * <p>
-   * To facilitate signal-triggered full update from HMS to Sentry and from Sentry
-   * to the NameNode, the following 3 boolean variables are defined:
-   * fullUpdateHMS, fullUpdateHMSWait, and fullUpdateNN.
-   * <ol>
-   * <li>
-   * The purpose of fullUpdateHMS is to ensure that Sentry asks HMS for full
-   * update, and does so only once per signal.
-   * <li>
-   * The purpose of fullUpdateNN is to ensure that Sentry sends full update
-   * to NameNode, and does so only once per signal.
-   * <li>
-   * The purpose of fullUpdateHMSWait is to ensure that NN update only happens
-   * after HMS update.
+   * To facilitate signal-triggered full update from Sentry to NameNode,
+   * the boolean variables 'fullUpdateNN' is used to ensure that Sentry sends full
+   * update to NameNode, and does so only once per signal.
    * </ol>
    * The details:
    * <ol>
    * <li>
-   * Upon receiving a signal, fullUpdateHMS, fullUpdateHMSWait, and fullUpdateNN
-   * are all set to true.
-   * <li>
-   * On the next call to getLastSeenHMSPathSeqNum() from HMS, Sentry checks if
-   * fullUpdateHMS == true. If yes, it returns invalid (zero) sequence number
-   * to HMS, so HMS would push full update by calling handlePathUpdateNotification()
-   * next time. fullUpdateHMS is immediately reset to false, to only trigger one
-   * full update request to HMS per signal.
-   * <li>
-   * When HMS calls handlePathUpdateNotification(), Sentry checks if the update
-   * is a full image. If it is, fullUpdateHMSWait is set to false.
+   * Upon receiving a signal, fullUpdateNN is set to true.
    * <li>
    * When NameNode calls getAllPathsUpdatesFrom() asking for partial update,
-   * Sentry checks if both fullUpdateNN == true and fullUpdateHMSWait == false.
-   * If yes, it sends full update back to NameNode and immediately resets
-   * fullUpdateNN to false.
+   * Sentry checks if both fullUpdateNN == true. If yes, it sends full update back
+   * to NameNode and immediately resets fullUpdateNN to false.
    * </ol>
    */
 
@@ -109,18 +88,12 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
 
   private static final Logger LOGGER = LoggerFactory.getLogger(SentryPlugin.class);
 
-  private final AtomicBoolean fullUpdateHMSWait = new AtomicBoolean(false);
-  private final AtomicBoolean fullUpdateHMS = new AtomicBoolean(false);
   private final AtomicBoolean fullUpdateNN = new AtomicBoolean(false);
-
   public static volatile SentryPlugin instance;
 
-  private UpdateForwarder<PathsUpdate> pathsUpdater;
-  private UpdateForwarder<PermissionsUpdate> permsUpdater;
-  // TODO: Each perm change sequence number should be generated during persistence at sentry store.
-  private final AtomicLong permSeqNum = new AtomicLong(5);
-  private PermImageRetriever permImageRetriever;
-  private boolean outOfSync = false;
+  private DBUpdateForwarder<PathsUpdate> pathsUpdater;
+  private DBUpdateForwarder<PermissionsUpdate> permsUpdater;
+
   /*
    * This number is smaller than starting sequence numbers used by NN and HMS
    * so in both cases its effect is to create appearance of out-of-sync
@@ -130,33 +103,15 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
    */
   private static final long NO_LAST_SEEN_HMS_PATH_SEQ_NUM = 0L;
 
-  /*
-   * Call from HMS to get the last known update sequence #.
-   */
-  long getLastSeenHMSPathSeqNum() {
-    if (!fullUpdateHMS.getAndSet(false)) {
-      return pathsUpdater.getLastSeen();
-    } else {
-      LOGGER.info("SIGNAL HANDLING: asking for full update from HMS");
-      return NO_LAST_SEEN_HMS_PATH_SEQ_NUM;
-    }
-  }
-
   @Override
   public void initialize(Configuration conf, SentryStore sentryStore) throws SentryPluginException {
-    final String[] pathPrefixes = conf
-        .getStrings(ServerConfig.SENTRY_HDFS_INTEGRATION_PATH_PREFIXES,
-            ServerConfig.SENTRY_HDFS_INTEGRATION_PATH_PREFIXES_DEFAULT);
-    final int initUpdateRetryDelayMs =
-        conf.getInt(ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS,
-            ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT);
-    permImageRetriever = new PermImageRetriever(sentryStore);
-
-    pathsUpdater = UpdateForwarder.create(conf, new UpdateableAuthzPaths(
-        pathPrefixes), new PathsUpdate(0, false), null, 100, initUpdateRetryDelayMs, false);
-    permsUpdater = UpdateForwarder.create(conf,
-        new UpdateablePermissions(permImageRetriever), new PermissionsUpdate(0, false),
-        permImageRetriever, 100, initUpdateRetryDelayMs, true);
+    PermImageRetriever permImageRetriever = new PermImageRetriever(sentryStore);
+    PathImageRetriever pathImageRetriever = new PathImageRetriever(sentryStore);
+    PermDeltaRetriever permDeltaRetriever = new PermDeltaRetriever(sentryStore);
+    PathDeltaRetriever pathDeltaRetriever = new PathDeltaRetriever(sentryStore);
+    pathsUpdater = new DBUpdateForwarder<>(pathImageRetriever, pathDeltaRetriever);
+    permsUpdater = new DBUpdateForwarder<>(permImageRetriever, permDeltaRetriever);
+
     LOGGER.info("Sentry HDFS plugin initialized !!");
     instance = this;
 
@@ -182,7 +137,7 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
     if (!fullUpdateNN.get()) {
       // Most common case - Sentry is NOT handling a full update.
       return pathsUpdater.getAllUpdatesFrom(pathSeqNum);
-    } else if (!fullUpdateHMSWait.get()) {
+    } else {
       /*
        * Sentry is in the middle of signal-triggered full update.
        * It already got a full update from HMS
@@ -216,10 +171,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
         LOGGER.warn("SIGNAL HANDLING: returned NULL instead of full update to NameNode (???)");
       }
       return updates;
-    } else {
-      // Sentry is handling a full update, but not yet received full update from HMS
-      LOGGER.warn("SIGNAL HANDLING: sending partial update to NameNode: still waiting for full update from HMS");
-      return pathsUpdater.getAllUpdatesFrom(pathSeqNum);
     }
   }
 
@@ -227,32 +178,17 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
     return permsUpdater.getAllUpdatesFrom(permSeqNum);
   }
 
-  /*
-   * Handle partial (most common) or full update from HMS
-   */
-  public void handlePathUpdateNotification(PathsUpdate update)
-      throws SentryPluginException {
-    pathsUpdater.handleUpdateNotification(update);
-    if (!update.hasFullImage()) { // most common case of partial update
-      LOGGER.debug("Recieved Authz Path update [" + update.getSeqNum() + "]..");
-    } else { // rare case of full update
-      LOGGER.warn("Recieved Authz Path FULL update [" + update.getSeqNum() + "]..");
-      // indicate that we're ready to send full update to NameNode
-      fullUpdateHMSWait.set(false);
-    }
-  }
-
   @Override
   public Update onAlterSentryRoleAddGroups(
       TAlterSentryRoleAddGroupsRequest request) throws SentryPluginException {
-    PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false);
+    PermissionsUpdate update = new PermissionsUpdate();
     TRoleChanges rUpdate = update.addRoleUpdate(request.getRoleName());
     for (TSentryGroup group : request.getGroups()) {
       rUpdate.addToAddGroups(group.getGroupName());
     }
 
-    permsUpdater.handleUpdateNotification(update);
-    LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + request.getRoleName() + "]..");
+    LOGGER.debug(String.format("onAlterSentryRoleAddGroups, Authz Perm preUpdate[ %s ]",
+                  request.getRoleName()));
     return update;
   }
 
@@ -260,14 +196,14 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
   public Update onAlterSentryRoleDeleteGroups(
       TAlterSentryRoleDeleteGroupsRequest request)
           throws SentryPluginException {
-    PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false);
+    PermissionsUpdate update = new PermissionsUpdate();
     TRoleChanges rUpdate = update.addRoleUpdate(request.getRoleName());
     for (TSentryGroup group : request.getGroups()) {
       rUpdate.addToDelGroups(group.getGroupName());
     }
 
-    permsUpdater.handleUpdateNotification(update);
-    LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + request.getRoleName() + "]..");
+    LOGGER.debug(String.format("onAlterSentryRoleDeleteGroups, Authz Perm preUpdate [ %s ]",
+                  request.getRoleName()));
     return update;
   }
 
@@ -296,12 +232,12 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
       return null;
     }
 
-    PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false);
+    PermissionsUpdate update = new PermissionsUpdate();
     update.addPrivilegeUpdate(authzObj).putToAddPrivileges(
         roleName, privilege.getAction().toUpperCase());
 
-    permsUpdater.handleUpdateNotification(update);
-    LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + "]..");
+    LOGGER.debug(String.format("onAlterSentryRoleGrantPrivilegeCore, Authz Perm preUpdate [ %s ]",
+                  authzObj));
     return update;
   }
 
@@ -310,13 +246,13 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
       throws SentryPluginException {
     String oldAuthz = HMSFollower.getAuthzObj(request.getOldAuthorizable());
     String newAuthz = HMSFollower.getAuthzObj(request.getNewAuthorizable());
-    PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false);
+    PermissionsUpdate update = new PermissionsUpdate();
     TPrivilegeChanges privUpdate = update.addPrivilegeUpdate(PermissionsUpdate.RENAME_PRIVS);
     privUpdate.putToAddPrivileges(newAuthz, newAuthz);
     privUpdate.putToDelPrivileges(oldAuthz, oldAuthz);
 
-    permsUpdater.handleUpdateNotification(update);
-    LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + newAuthz + ", " + oldAuthz + "]..");
+    LOGGER.debug(String.format("onRenameSentryPrivilege, Authz Perm preUpdate [ %s ]",
+                  oldAuthz));
     return update;
   }
 
@@ -339,14 +275,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
     }
   }
 
-  public boolean isOutOfSync() {
-    return outOfSync;
-  }
-
-  public void setOutOfSync(boolean outOfSync) {
-    this.outOfSync = outOfSync;
-  }
-
   private PermissionsUpdate onAlterSentryRoleRevokePrivilegeCore(String roleName, TSentryPrivilege privilege)
       throws SentryPluginException {
     String authzObj = getAuthzObj(privilege);
@@ -354,46 +282,44 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen
       return null;
     }
 
-    PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false);
+    PermissionsUpdate update = new PermissionsUpdate();
     update.addPrivilegeUpdate(authzObj).putToDelPrivileges(
         roleName, privilege.getAction().toUpperCase());
 
-    permsUpdater.handleUpdateNotification(update);
-    LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + authzObj + "]..");
+    LOGGER.debug(String.format("onAlterSentryRoleRevokePrivilegeCore, Authz Perm preUpdate [ %s ]",
+                  authzObj));
     return update;
   }
 
   @Override
   public Update onDropSentryRole(TDropSentryRoleRequest request)
       throws SentryPluginException {
-    PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false);
+    PermissionsUpdate update = new PermissionsUpdate();
     update.addPrivilegeUpdate(PermissionsUpdate.ALL_AUTHZ_OBJ).putToDelPrivileges(
         request.getRoleName(), PermissionsUpdate.ALL_AUTHZ_OBJ);
     update.addRoleUpdate(request.getRoleName()).addToDelGroups(PermissionsUpdate.ALL_GROUPS);
 
-    permsUpdater.handleUpdateNotification(update);
-    LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + request.getRoleName() + "]..");
+    LOGGER.debug(String.format("onDropSentryRole, Authz Perm preUpdate [ %s ]",
+                  request.getRoleName()));
     return update;
   }
 
   @Override
   public Update onDropSentryPrivilege(TDropPrivilegesRequest request)
       throws SentryPluginException {
-    PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false);
+    PermissionsUpdate update = new PermissionsUpdate();
     String authzObj = HMSFollower.getAuthzObj(request.getAuthorizable());
     update.addPrivilegeUpdate(authzObj).putToDelPrivileges(
         PermissionsUpdate.ALL_ROLES, PermissionsUpdate.ALL_ROLES);
 
-    permsUpdater.handleUpdateNotification(update);
-    LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + authzObj + "]..");
+    LOGGER.debug(String.format("onDropSentryPrivilege, Authz Perm preUpdate [ %s ]",
+                  authzObj));
     return update;
   }
 
   @Override
   public void onSignal(final String sigName) {
     LOGGER.info("SIGNAL HANDLING: Received signal " + sigName + ", triggering full update");
-    fullUpdateHMS.set(true);
-    fullUpdateHMSWait.set(true);
     fullUpdateNN.set(true);
   }
 

http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
deleted file mode 100644
index 22c5769..0000000
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.hdfs;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.sentry.provider.db.SentryPolicyStorePlugin.SentryPluginException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class UpdateForwarder<K extends Updateable.Update> implements
-    Updateable<K>, Closeable {
-
-  private final AtomicLong lastSeenSeqNum = new AtomicLong(0);
-  protected final AtomicLong lastCommittedSeqNum = new AtomicLong(0);
-  // Updates should be handled in order
-  private final Executor updateHandler = Executors.newSingleThreadExecutor();
-
-  // Update log is used when propagate updates to a downstream cache.
-  // The preUpdate log stores all commits that were applied to this cache.
-  // When the update log is filled to capacity (getMaxUpdateLogSize()), all
-  // entries are cleared and a compact image if the state of the cache is
-  // appended to the log.
-  // The first entry in an update log (consequently the first preUpdate a
-  // downstream cache sees) will be a full image. All subsequent entries are
-  // partial edits
-  protected final LinkedList<K> updateLog = new LinkedList<K>();
-  // UpdateLog is disabled when getMaxUpdateLogSize() = 0;
-  private final int maxUpdateLogSize;
-
-  private final ImageRetriever<K> imageRetreiver;
-
-  private volatile Updateable<K> updateable;
-
-  private final ReadWriteLock lock = new ReentrantReadWriteLock();
-  protected static final long INIT_SEQ_NUM = -2;
-  protected static final int INIT_UPDATE_RETRY_DELAY = 5000;
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(UpdateForwarder.class);
-  private static final String UPDATABLE_TYPE_NAME = "update_forwarder";
-
-  public UpdateForwarder(Configuration conf, Updateable<K> updateable,
-      ImageRetriever<K> imageRetreiver, int maxUpdateLogSize, boolean shouldInit) {
-    this(conf, updateable, imageRetreiver, maxUpdateLogSize, INIT_UPDATE_RETRY_DELAY, shouldInit);
-  }
-
-  protected UpdateForwarder(Configuration conf, Updateable<K> updateable, //NOPMD
-      ImageRetriever<K> imageRetreiver, int maxUpdateLogSize,
-      int initUpdateRetryDelay, boolean shouldInit) {
-    this.maxUpdateLogSize = maxUpdateLogSize;
-    this.imageRetreiver = imageRetreiver;
-    if (shouldInit) {
-      spawnInitialUpdater(updateable, initUpdateRetryDelay);
-    } else {
-      this.updateable = updateable;
-    }
-  }
-
-  public static <K extends Updateable.Update> UpdateForwarder<K> create(Configuration conf,
-      Updateable<K> updateable, K update, ImageRetriever<K> imageRetreiver,
-      int maxUpdateLogSize, boolean shouldInit) throws SentryPluginException {
-    return create(conf, updateable, update, imageRetreiver, maxUpdateLogSize,
-        INIT_UPDATE_RETRY_DELAY, shouldInit);
-  }
-
-  public static <K extends Updateable.Update> UpdateForwarder<K> create(Configuration conf,
-      Updateable<K> updateable, K update, ImageRetriever<K> imageRetreiver,
-      int maxUpdateLogSize, int initUpdateRetryDelay, boolean shouldInit) throws SentryPluginException {
-    return new UpdateForwarder<K>(conf, updateable, imageRetreiver,
-        maxUpdateLogSize, initUpdateRetryDelay, shouldInit);
-  }
-
-  private void spawnInitialUpdater(final Updateable<K> updateable,
-      final int initUpdateRetryDelay) {
-    K firstFullImage = null;
-    try {
-      firstFullImage = imageRetreiver.retrieveFullImage(INIT_SEQ_NUM);
-    } catch (Exception e) {
-      LOGGER.warn("InitialUpdater encountered exception !! ", e);
-      firstFullImage = null;
-      Thread initUpdater = new Thread() {
-        @Override
-        public void run() {
-          while (UpdateForwarder.this.updateable == null) {
-            try {
-              Thread.sleep(initUpdateRetryDelay);
-            } catch (InterruptedException e) {
-              LOGGER.warn("Thread interrupted !! ", e);
-              break;
-            }
-            K fullImage = null;
-            try {
-              fullImage =
-                  UpdateForwarder.this.imageRetreiver
-                  .retrieveFullImage(INIT_SEQ_NUM);
-              appendToUpdateLog(fullImage);
-            } catch (Exception e) {
-              LOGGER.warn("InitialUpdater encountered exception !! ", e);
-            }
-            if (fullImage != null) {
-              UpdateForwarder.this.updateable = updateable.updateFull(fullImage);
-            }
-          }
-        }
-      };
-      initUpdater.start();
-    }
-    if (firstFullImage != null) {
-      try {
-        appendToUpdateLog(firstFullImage);
-      } catch (Exception e) {
-        LOGGER.warn("failed to update append log: ", e);
-      }
-      this.updateable = updateable.updateFull(firstFullImage);
-    }
-  }
-  /**
-   * Handle notifications from HMS plug-in or upstream Cache
-   * @param update
-   */
-  public void handleUpdateNotification(final K update) throws SentryPluginException {
-    // Correct the seqNums on the first update
-    if (lastCommittedSeqNum.get() == INIT_SEQ_NUM) {
-      K firstUpdate = getUpdateLog().peek();
-      long firstSeqNum = update.getSeqNum() - 1;
-      if (firstUpdate != null) {
-        firstUpdate.setSeqNum(firstSeqNum);
-      }
-      lastCommittedSeqNum.set(firstSeqNum);
-      lastSeenSeqNum.set(firstSeqNum);
-    }
-    final boolean editNotMissed =
-        lastSeenSeqNum.incrementAndGet() == update.getSeqNum();
-    if (!editNotMissed) {
-      lastSeenSeqNum.set(update.getSeqNum());
-    }
-    Runnable task = new Runnable() {
-      @Override
-      public void run() {
-        K toUpdate = update;
-        if (update.hasFullImage()) {
-          updateable = updateable.updateFull(update);
-        } else {
-          if (editNotMissed) {
-            // apply partial preUpdate
-            updateable.updatePartial(Collections.singletonList(update), lock);
-          } else {
-            // Retrieve full update from External Source and
-            if (imageRetreiver != null) {
-              try {
-                toUpdate = imageRetreiver
-                    .retrieveFullImage(update.getSeqNum());
-              } catch (Exception e) {
-                LOGGER.warn("failed to retrieve full image: ", e);
-              }
-              updateable = updateable.updateFull(toUpdate);
-            }
-          }
-        }
-        try {
-          appendToUpdateLog(toUpdate);
-        } catch (Exception e) {
-          LOGGER.warn("failed to append to update log", e);
-        }
-      }
-    };
-    updateHandler.execute(task);
-  }
-
-  protected void appendToUpdateLog(K update) throws Exception {
-    synchronized (getUpdateLog()) {
-      boolean logCompacted = false;
-      if (getMaxUpdateLogSize() > 0) {
-        if (update.hasFullImage() || getUpdateLog().size() == getMaxUpdateLogSize()) {
-          // Essentially a log compaction
-          getUpdateLog().clear();
-          getUpdateLog().add(update.hasFullImage() ? update
-              : createFullImageUpdate(update.getSeqNum()));
-          logCompacted = true;
-        } else {
-          getUpdateLog().add(update);
-        }
-      }
-      lastCommittedSeqNum.set(update.getSeqNum());
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("#### Appending to Update Log ["
-            + "type=" + update.getClass() + ", "
-            + "lastCommit=" + lastCommittedSeqNum.get() + ", "
-            + "lastSeen=" + lastSeenSeqNum.get() + ", "
-            + "logCompacted=" + logCompacted + "]");
-      }
-    }
-  }
-
-  /**
-   * Return all updates from requested seqNum (inclusive)
-   * @param seqNum
-   * @return
-   */
-  public List<K> getAllUpdatesFrom(long seqNum) throws Exception {
-    List<K> retVal = new LinkedList<K>();
-    synchronized (getUpdateLog()) {
-      long currSeqNum = lastCommittedSeqNum.get();
-      if (LOGGER.isDebugEnabled() && updateable != null) {
-        LOGGER.debug("#### GetAllUpdatesFrom ["
-            + "type=" + updateable.getClass() + ", "
-            + "reqSeqNum=" + seqNum + ", "
-            + "lastCommit=" + lastCommittedSeqNum.get() + ", "
-            + "lastSeen=" + lastSeenSeqNum.get() + ", "
-            + "getMaxUpdateLogSize()=" + getUpdateLog().size() + "]");
-      }
-      if (getMaxUpdateLogSize() == 0) {
-        // no updatelog configured..
-        return retVal;
-      }
-      K head = getUpdateLog().peek();
-      if (head == null) {
-        return retVal;
-      }
-      if (seqNum > currSeqNum + 1) {
-        // This process has probably restarted since downstream
-        // recieved last update
-        retVal.addAll(getUpdateLog());
-        return retVal;
-      }
-      if (head.getSeqNum() > seqNum) {
-        // Caller has diverged greatly..
-        if (head.hasFullImage()) {
-          // head is a refresh(full) image
-          // Send full image along with partial updates
-          for (K u : getUpdateLog()) {
-            retVal.add(u);
-          }
-        } else {
-          // Create a full image
-          // clear updateLog
-          // add fullImage to head of Log
-          // NOTE : This should ideally never happen
-          K fullImage = createFullImageUpdate(currSeqNum);
-          getUpdateLog().clear();
-          getUpdateLog().add(fullImage);
-          retVal.add(fullImage);
-        }
-      } else {
-        // increment iterator to requested seqNum
-        Iterator<K> iter = getUpdateLog().iterator();
-        while (iter.hasNext()) {
-          K elem = iter.next();
-          if (elem.getSeqNum() >= seqNum) {
-            retVal.add(elem);
-          }
-        }
-      }
-    }
-    return retVal;
-  }
-
-  public boolean areAllUpdatesCommited() {
-    return lastCommittedSeqNum.get() == lastSeenSeqNum.get();
-  }
-
-  public long getLastCommitted() {
-    return lastCommittedSeqNum.get();
-  }
-
-  public long getLastSeen() {
-    return lastSeenSeqNum.get();
-  }
-
-  @Override
-  public Updateable<K> updateFull(K update) {
-    return (updateable != null) ? updateable.updateFull(update) : null;
-  }
-
-  @Override
-  public void updatePartial(Iterable<K> updates, ReadWriteLock lock) {
-    if (updateable != null) {
-      updateable.updatePartial(updates, lock);
-    }
-  }
-
-  @Override
-  public long getLastUpdatedSeqNum() {
-    return (updateable != null) ? updateable.getLastUpdatedSeqNum() : INIT_SEQ_NUM;
-  }
-
-  @Override
-  public K createFullImageUpdate(long currSeqNum) throws Exception {
-    return (updateable != null) ? updateable.createFullImageUpdate(currSeqNum) : null;
-  }
-
-  @Override
-  public String getUpdateableTypeName() {
-    // TODO Auto-generated method stub
-    return UPDATABLE_TYPE_NAME;
-  }
-
-  protected LinkedList<K> getUpdateLog() {
-    return updateLog;
-  }
-
-  protected int getMaxUpdateLogSize() {
-    return maxUpdateLogSize;
-  }
-
-  @Override
-  public void close() throws IOException {
-  }
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java
deleted file mode 100644
index 03c67d6..0000000
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.hdfs;
-
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-
-public class UpdateablePermissions implements Updateable<PermissionsUpdate>{
-  private static final String UPDATABLE_TYPE_NAME = "perm_update";
-
-  private AtomicLong seqNum = new AtomicLong();
-  private final ImageRetriever<PermissionsUpdate> imageRetreiver;
-
-  public UpdateablePermissions(
-      ImageRetriever<PermissionsUpdate> imageRetreiver) {
-    this.imageRetreiver = imageRetreiver;
-  }
-
-  @Override
-  public PermissionsUpdate createFullImageUpdate(long currSeqNum) throws Exception {
-    return imageRetreiver.retrieveFullImage(currSeqNum);
-  }
-
-  @Override
-  public long getLastUpdatedSeqNum() {
-    return seqNum.get();
-  }
-
-  @Override
-  public void updatePartial(Iterable<PermissionsUpdate> update,
-      ReadWriteLock lock) {
-    for (PermissionsUpdate permsUpdate : update) {
-      seqNum.set(permsUpdate.getSeqNum());
-    }
-  }
-
-  @Override
-  public Updateable<PermissionsUpdate> updateFull(PermissionsUpdate update) {
-    UpdateablePermissions other = new UpdateablePermissions(imageRetreiver);
-    other.seqNum.set(update.getSeqNum());
-    return other;
-  }
-
-  @Override
-  public String getUpdateableTypeName() {
-    return UPDATABLE_TYPE_NAME;
-  }
-}