You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by pr...@apache.org on 2015/02/22 03:14:34 UTC

incubator-sentry git commit: SENTRY-632: Support Sentry cache sync via ZK (Prasad Mujumdar, reviewed by Arun Suresh

Repository: incubator-sentry
Updated Branches:
  refs/heads/master 169d337f1 -> 0693dfb21


SENTRY-632: Support Sentry cache sync via ZK (Prasad Mujumdar, reviewed by Arun Suresh


Project: http://git-wip-us.apache.org/repos/asf/incubator-sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-sentry/commit/0693dfb2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-sentry/tree/0693dfb2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-sentry/diff/0693dfb2

Branch: refs/heads/master
Commit: 0693dfb21866bdf4cea6a58be00548d241d96445
Parents: 169d337
Author: Prasad Mujumdar <pr...@apache.org>
Authored: Sat Feb 21 18:14:25 2015 -0800
Committer: Prasad Mujumdar <pr...@apache.org>
Committed: Sat Feb 21 18:14:25 2015 -0800

----------------------------------------------------------------------
 .../metastore/MetastoreAuthzBinding.java        |  15 ++
 .../org/apache/sentry/hdfs/PathsUpdate.java     |  20 ++-
 .../apache/sentry/hdfs/PermissionsUpdate.java   |  15 ++
 .../apache/sentry/hdfs/ServiceConstants.java    |   6 +-
 .../apache/sentry/hdfs/ThriftSerializer.java    |  56 +++++++
 .../java/org/apache/sentry/hdfs/Updateable.java |  14 +-
 .../sentry/hdfs/UpdateableAuthzPaths.java       |  10 +-
 .../sentry/hdfs/UpdateableAuthzPermissions.java |  11 +-
 sentry-hdfs/sentry-hdfs-service/pom.xml         |   4 +
 .../org/apache/sentry/hdfs/MetastorePlugin.java |   5 +-
 .../sentry/hdfs/MetastorePluginWithHA.java      |  93 ++++++++++++
 .../apache/sentry/hdfs/PluginCacheSyncUtil.java | 151 +++++++++++++++++++
 .../sentry/hdfs/SentryHDFSServiceProcessor.java |   6 +-
 .../org/apache/sentry/hdfs/SentryPlugin.java    |  34 +++--
 .../org/apache/sentry/hdfs/UpdateForwarder.java | 113 +++++++++-----
 .../sentry/hdfs/UpdateForwarderWithHA.java      | 140 +++++++++++++++++
 .../sentry/hdfs/UpdateablePermissions.java      |   6 +
 .../sentry/hdfs/TestHAUpdateForwarder.java      |  68 +++++++++
 .../apache/sentry/hdfs/TestUpdateForwarder.java |  70 ++++++---
 .../db/service/persistent/HAContext.java        |  47 +++++-
 .../db/service/persistent/ServiceManager.java   |   8 +-
 .../db/service/persistent/ServiceRegister.java  |   4 +-
 .../thrift/SentryPolicyStoreProcessor.java      |   4 +-
 .../thrift/HAClientInvocationHandler.java       |   4 +-
 .../sentry/service/thrift/ServiceConstants.java |  40 ++---
 .../persistent/TestSentryServiceDiscovery.java  |   6 +-
 .../tests/e2e/hdfs/TestHDFSIntegration.java     |  15 ++
 .../e2e/hdfs/TestHDFSIntegrationWithHA.java     |  28 ++++
 28 files changed, 877 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/MetastoreAuthzBinding.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/MetastoreAuthzBinding.java b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/MetastoreAuthzBinding.java
index 8d388d2..a36a4ee 100644
--- a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/MetastoreAuthzBinding.java
+++ b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/MetastoreAuthzBinding.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.metadata.AuthorizationException;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.Utils;
+import org.apache.sentry.SentryUserException;
 import org.apache.sentry.binding.hive.authz.HiveAuthzBinding;
 import org.apache.sentry.binding.hive.authz.HiveAuthzPrivilegesMap;
 import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
@@ -138,6 +139,7 @@ public class MetastoreAuthzBinding extends MetaStorePreEventListener {
   private final ImmutableSet<String> serviceUsers;
   private HiveAuthzBinding hiveAuthzBinding;
   private final String warehouseDir;
+  private static boolean sentryCacheOutOfSync = false;
 
   public MetastoreAuthzBinding(Configuration config) throws Exception {
     super(config);
@@ -389,6 +391,10 @@ public class MetastoreAuthzBinding extends MetaStorePreEventListener {
       List<List<DBModelAuthorizable>> inputHierarchy,
       List<List<DBModelAuthorizable>> outputHierarchy)
       throws InvalidOperationException {
+    if (isSentryCacheOutOfSync()) {
+      throw invalidOperationException(new SentryUserException(
+          "Metastore/Sentry cache is out of sync"));
+    }
     try {
       HiveAuthzBinding hiveAuthzBinding = getHiveAuthzBinding();
       hiveAuthzBinding.authorize(hiveOp, HiveAuthzPrivilegesMap
@@ -446,4 +452,13 @@ public class MetastoreAuthzBinding extends MetaStorePreEventListener {
       return sd.getLocation();
     }
   }
+
+  public static boolean isSentryCacheOutOfSync() {
+    return sentryCacheOutOfSync;
+  }
+
+  public static void setSentryCacheOutOfSync(boolean sentryCacheOutOfSync) {
+    MetastoreAuthzBinding.sentryCacheOutOfSync = sentryCacheOutOfSync;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java
index ec0df24..3f14bf7 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PathsUpdate.java
@@ -17,12 +17,14 @@
  */
 package org.apache.sentry.hdfs;
 
+import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.LinkedList;
 import java.util.List;
 
 import com.google.common.base.Preconditions;
+
 import org.apache.sentry.hdfs.service.thrift.TPathChanges;
 import org.apache.sentry.hdfs.service.thrift.TPathsUpdate;
 
@@ -30,14 +32,18 @@ import com.google.common.collect.Lists;
 
 /**
  * A wrapper class over the TPathsUpdate thrift generated class. Please see
- * {@link Updateable.Update} for more information 
+ * {@link Updateable.Update} for more information
  */
 public class PathsUpdate implements Updateable.Update {
-  
+
   public static String ALL_PATHS = "__ALL_PATHS__";
 
   private final TPathsUpdate tPathsUpdate;
 
+  public PathsUpdate() {
+    this(0, false);
+  }
+
   public PathsUpdate(TPathsUpdate tPathsUpdate) {
     this.tPathsUpdate = tPathsUpdate;
   }
@@ -97,4 +103,14 @@ public class PathsUpdate implements Updateable.Update {
     }
   }
 
+  @Override
+  public byte[] serialize() throws IOException {
+    return ThriftSerializer.serialize(tPathsUpdate);
+  }
+
+  @Override
+  public void deserialize(byte[] data) throws IOException {
+    ThriftSerializer.deserialize(tPathsUpdate, data);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PermissionsUpdate.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PermissionsUpdate.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PermissionsUpdate.java
index 1130140..c791ab3 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PermissionsUpdate.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/PermissionsUpdate.java
@@ -17,6 +17,7 @@
  */
 package org.apache.sentry.hdfs;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -35,6 +36,10 @@ public class PermissionsUpdate implements Updateable.Update {
 
   private final TPermissionsUpdate tPermUpdate;
 
+  public PermissionsUpdate() {
+    this(0, false);
+  }
+
   public PermissionsUpdate(TPermissionsUpdate tPermUpdate) {
     this.tPermUpdate = tPermUpdate;
   }
@@ -91,4 +96,14 @@ public class PermissionsUpdate implements Updateable.Update {
   public TPermissionsUpdate toThrift() {
     return tPermUpdate;
   }
+
+  @Override
+  public byte[] serialize() throws IOException {
+    return ThriftSerializer.serialize(tPermUpdate);
+  }
+
+  @Override
+  public void deserialize(byte[] data) throws IOException {
+    ThriftSerializer.deserialize(tPermUpdate, data);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
index 9308dee..516f773 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
@@ -47,8 +47,12 @@ public class ServiceConstants {
     public static final int SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT = 10000;
     public static final String SENTRY_HDFS_SYNC_CHECKER_PERIOD_MS = "sentry.hdfs.sync.checker.period.ms";
     public static final int SENTRY_HDFS_SYNC_CHECKER_PERIOD_DEFAULT = 1000;
-
+    public static final String SENTRY_HDFS_HA_ZOOKEEPER_NAMESPACE = "sentry.hdfs.ha.zookeeper.namespace";
+    public static final String SENTRY_HDFS_HA_ZOOKEEPER_NAMESPACE_DEFAULT = "/sentry_hdfs";
+    public static final String SENTRY_METASTORE_HA_ZOOKEEPER_NAMESPACE = "sentry.hdfs.ha.zookeeper.namespace";
+    public static final String SENTRY_METASTORE_HA_ZOOKEEPER_NAMESPACE_DEFAULT = "/sentry_metastore";
   }
+
   public static class ClientConfig {
     public static final ImmutableMap<String, String> SASL_PROPERTIES = ServiceConstants.SASL_PROPERTIES;
 

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java
new file mode 100644
index 0000000..b585773
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import java.io.IOException;
+
+import org.apache.thrift.TBase;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+
+public class ThriftSerializer {
+
+  @SuppressWarnings("rawtypes")
+  public static byte[] serialize(TBase baseObject) throws IOException {
+    TSerializer serializer = new TSerializer(new TCompactProtocol.Factory());
+    try {
+      return serializer.serialize(baseObject);
+    } catch (TException e) {
+      throw new IOException("Error serializing thrift object "
+          + baseObject, e);
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  public static TBase deserialize(TBase baseObject, byte[] serialized)
+      throws IOException {
+    TDeserializer deserializer = new TDeserializer(
+        new TCompactProtocol.Factory());
+    try {
+      deserializer.deserialize(baseObject, serialized);
+    } catch (TException e) {
+      throw new IOException("Error deserializing thrift object "
+          + baseObject, e);
+    }
+    return baseObject;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/Updateable.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/Updateable.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/Updateable.java
index ba932ac..ac8459b 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/Updateable.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/Updateable.java
@@ -17,30 +17,34 @@
  */
 package org.apache.sentry.hdfs;
 
+import java.io.IOException;
 import java.util.concurrent.locks.ReadWriteLock;
 
 public interface Updateable<K extends Updateable.Update> {
 
   /**
    * Thrift currently does not support class inheritance.We need all update
-   * objects to expose a unified API. A wrapper class need to be created 
-   * implementing this interface and containing the generated thrift class as 
+   * objects to expose a unified API. A wrapper class need to be created
+   * implementing this interface and containing the generated thrift class as
    * a work around
    */
   public interface Update {
 
     boolean hasFullImage();
-    
+
     long getSeqNum();
 
     void setSeqNum(long seqNum);
 
+    byte[] serialize() throws IOException;
+
+    void deserialize(byte data[]) throws IOException;
   }
 
   /**
    * Apply multiple partial updates in order
    * @param update
-   * @param lock External Lock. 
+   * @param lock External Lock.
    * @return
    */
   public void updatePartial(Iterable<K> update, ReadWriteLock lock);
@@ -64,4 +68,6 @@ public interface Updateable<K extends Updateable.Update> {
    */
   public K createFullImageUpdate(long currSeqNum);
 
+  public String getUpdateableTypeName();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPaths.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPaths.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPaths.java
index 03b288b..b74f954 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPaths.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPaths.java
@@ -28,11 +28,12 @@ import org.slf4j.LoggerFactory;
 
 public class UpdateableAuthzPaths implements AuthzPaths, Updateable<PathsUpdate> {
   private static final int MAX_UPDATES_PER_LOCK_USE = 99;
+  private static final String UPDATABLE_TYPE_NAME = "path_update";
   private volatile HMSPaths paths;
   private final AtomicLong seqNum = new AtomicLong(0);
 
   private static Logger LOG = LoggerFactory.getLogger(UpdateableAuthzPaths.class);
-  
+
   public UpdateableAuthzPaths(String[] pathPrefixes) {
     this.paths = new HMSPaths(pathPrefixes);
   }
@@ -92,7 +93,7 @@ public class UpdateableAuthzPaths implements AuthzPaths, Updateable<PathsUpdate>
       TPathChanges newPathInfo = null;
       TPathChanges oldPathInfo = null;
       if ((pathChanges.get(0).getAddPathsSize() == 1)
-        && (pathChanges.get(1).getDelPathsSize() == 1)) {
+          && (pathChanges.get(1).getDelPathsSize() == 1)) {
         newPathInfo = pathChanges.get(0);
         oldPathInfo = pathChanges.get(1);
       } else if ((pathChanges.get(1).getAddPathsSize() == 1)
@@ -150,4 +151,9 @@ public class UpdateableAuthzPaths implements AuthzPaths, Updateable<PathsUpdate>
       }
     };
   }
+
+  @Override
+  public String getUpdateableTypeName() {
+    return UPDATABLE_TYPE_NAME;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPermissions.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPermissions.java b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPermissions.java
index c362115..aa78360 100644
--- a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPermissions.java
+++ b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/UpdateableAuthzPermissions.java
@@ -37,9 +37,10 @@ import org.slf4j.LoggerFactory;
 
 public class UpdateableAuthzPermissions implements AuthzPermissions, Updateable<PermissionsUpdate> {
   private static final int MAX_UPDATES_PER_LOCK_USE = 99;
+  private static final String UPDATABLE_TYPE_NAME = "perm_authz_update";
   private volatile SentryPermissions perms = new SentryPermissions();
   private final AtomicLong seqNum = new AtomicLong(0);
-  
+
   private static Logger LOG = LoggerFactory.getLogger(UpdateableAuthzPermissions.class);
 
   public static Map<String, FsAction> ACTION_MAPPING = new HashMap<String, FsAction>();
@@ -84,7 +85,7 @@ public class UpdateableAuthzPermissions implements AuthzPermissions, Updateable<
       lock.writeLock().unlock();
     }
   }
-  
+
 
   private void applyPartialUpdate(PermissionsUpdate update) {
     applyPrivilegeUpdates(update);
@@ -233,5 +234,9 @@ public class UpdateableAuthzPermissions implements AuthzPermissions, Updateable<
     return retVal;
   }
 
-  
+  @Override
+  public String getUpdateableTypeName() {
+    return UPDATABLE_TYPE_NAME;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-service/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/pom.xml b/sentry-hdfs/sentry-hdfs-service/pom.xml
index 365380e..6b84733 100644
--- a/sentry-hdfs/sentry-hdfs-service/pom.xml
+++ b/sentry-hdfs/sentry-hdfs-service/pom.xml
@@ -29,6 +29,10 @@ limitations under the License.
 
   <dependencies>
     <dependency>
+      <groupId>org.apache.sentry</groupId>
+      <artifactId>sentry-binding-hive</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <scope>provided</scope>

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
index cdd8d9c..f4964d6 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
@@ -258,7 +258,7 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin {
     return update;
   }
 
-  private void notifySentryNoLock(PathsUpdate update) {
+  protected void notifySentryNoLock(PathsUpdate update) {
     try {
       getClient().notifyHMSUpdate(update);
     } catch (Exception e) {
@@ -266,7 +266,7 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin {
     }
   }
 
-  private void notifySentryAndApplyLocal(PathsUpdate update) {
+  protected void notifySentryAndApplyLocal(PathsUpdate update) {
     notificiationLock.lock();
     if (!syncSent) {
       new SyncTask().run();
@@ -280,5 +280,4 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin {
       LOGGER.debug("#### HMS Path Last update sent : ["+ lastSentSeqNum + "]");
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java
new file mode 100644
index 0000000..271e121
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
+import org.apache.sentry.provider.db.SentryPolicyStorePlugin.SentryPluginException;
+import org.apache.sentry.provider.db.service.persistent.HAContext;
+import org.apache.sentry.binding.metastore.MetastoreAuthzBinding;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetastorePluginWithHA extends MetastorePlugin {
+  private static final Logger LOGGER = LoggerFactory
+      .getLogger(MetastorePluginWithHA.class);
+  public static class SentryMetastoreHACacheListener implements PathChildrenCacheListener {
+    private MetastorePluginWithHA metastorePlugin;
+
+    public SentryMetastoreHACacheListener(MetastorePluginWithHA metastorePlugin) {
+      this.metastorePlugin = metastorePlugin;
+    }
+
+    @Override
+    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
+        throws Exception {
+      switch ( event.getType() ) {
+      case CHILD_ADDED:
+        PathsUpdate newUpdate = new PathsUpdate();
+        PluginCacheSyncUtil.setUpdateFromChildEvent(event, newUpdate);
+        metastorePlugin.processCacheNotification(newUpdate);
+        break;
+      case INITIALIZED:
+      case CHILD_UPDATED:
+      case CHILD_REMOVED:
+        break;
+      case CONNECTION_RECONNECTED:
+        MetastoreAuthzBinding.setSentryCacheOutOfSync(false);
+        break;
+      case CONNECTION_SUSPENDED:
+      case CONNECTION_LOST:
+        MetastoreAuthzBinding.setSentryCacheOutOfSync(true);
+        break;
+      default:
+        break;
+      }
+    }
+  }
+
+  private String zkPath;
+  private PluginCacheSyncUtil pluginCacheSync;
+
+  public MetastorePluginWithHA(Configuration conf) throws Exception {
+    super(conf);
+    zkPath = conf.get(ServerConfig.SENTRY_METASTORE_HA_ZOOKEEPER_NAMESPACE,
+        ServerConfig.SENTRY_METASTORE_HA_ZOOKEEPER_NAMESPACE_DEFAULT);
+
+    pluginCacheSync = new PluginCacheSyncUtil(zkPath, conf,
+        new SentryMetastoreHACacheListener(this));
+  }
+
+  @Override
+  protected void notifySentryAndApplyLocal(PathsUpdate update) {
+    try {
+      pluginCacheSync.handleCacheUpdate(update);
+    } catch (SentryPluginException e) {
+      LOGGER.error("Error pushing update to cache", e);
+    }
+  }
+
+  private void processCacheNotification(PathsUpdate update) {
+    super.notifySentryAndApplyLocal(update);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PluginCacheSyncUtil.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PluginCacheSyncUtil.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PluginCacheSyncUtil.java
new file mode 100644
index 0000000..00317e7
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PluginCacheSyncUtil.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
+import org.apache.sentry.hdfs.Updateable.Update;
+import org.apache.sentry.provider.db.SentryPolicyStorePlugin.SentryPluginException;
+import org.apache.sentry.provider.db.service.persistent.HAContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for handling the cache update syncup via Curator path cache It
+ * creates the path cache, a distributed lock and counter. The updated API
+ * updates the counter, creates a znode zpath/counter and writes the data to it.
+ * The caller should provider the cache callback handler class that posts the
+ * update object to the required cache
+ */
+public class PluginCacheSyncUtil {
+  private static final Logger LOGGER = LoggerFactory
+      .getLogger(PluginCacheSyncUtil.class);
+
+  private final String zkPath;
+  private final HAContext haContext;
+  private final PathChildrenCache cache;
+  private InterProcessSemaphoreMutex updatorLock;
+  private int lockTimeout;
+  private DistributedAtomicLong updateCounter;
+
+  public PluginCacheSyncUtil(String zkPath, Configuration conf,
+      PathChildrenCacheListener cacheListener) throws SentryPluginException {
+    this.zkPath = zkPath;
+    // Init ZK connection
+    try {
+      haContext = HAContext.getHAContext(conf);
+    } catch (Exception e) {
+      throw new SentryPluginException("Error creating HA context ", e);
+    }
+    haContext.startCuratorFramework();
+
+    // Init path cache
+    cache = new PathChildrenCache(haContext.getCuratorFramework(), zkPath
+        + "/cache", true);
+    // path cache callback
+    cache.getListenable().addListener(cacheListener);
+    try {
+      cache.start();
+    } catch (Exception e) {
+      throw new SentryPluginException("Error creating ZK PathCache ", e);
+    }
+    updatorLock = new InterProcessSemaphoreMutex(
+        haContext.getCuratorFramework(), zkPath + "/lock");
+    lockTimeout = conf.getInt(
+        ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS,
+        ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT);
+
+    updateCounter = new DistributedAtomicLong(haContext.getCuratorFramework(),
+        zkPath + "/counter", haContext.getRetryPolicy());
+    try {
+      updateCounter.initialize((long) 4);
+    } catch (Exception e) {
+      LOGGER.error("Error initializing  counter for zpath " + zkPath, e);
+    }
+  }
+
+  public void handleCacheUpdate(Update update) throws SentryPluginException {
+    // post message to ZK cache
+    try {
+      // Acquire ZK lock for update cache sync. This ensures that the counter
+      // increment and znode creation is atomic operation
+      if (!updatorLock.acquire(lockTimeout, TimeUnit.MILLISECONDS)) {
+        throw new SentryPluginException(
+            "Failed to get ZK lock for update cache syncup");
+      }
+    } catch (Exception e1) {
+      throw new SentryPluginException(
+          "Error getting ZK lock for update cache syncup" + e1, e1);
+    }
+
+    try {
+      // increment the global sequence counter
+      try {
+        update.setSeqNum(updateCounter.increment().postValue());
+      } catch (Exception e1) {
+        throw new SentryPluginException(
+            "Error setting ZK counter for update cache syncup" + e1, e1);
+      }
+
+      // Create a new znode with the sequence number and write the update data
+      // into it
+      String updateSeq = String.valueOf(update.getSeqNum());
+      String newPath = ZKPaths.makePath(zkPath + "/cache", updateSeq);
+      try {
+        haContext.getCuratorFramework().create().creatingParentsIfNeeded()
+            .forPath(newPath, update.serialize());
+      } catch (Exception e) {
+        throw new SentryPluginException("error posting update to ZK ", e);
+      }
+    } finally {
+      // release the ZK lock
+      try {
+        updatorLock.release();
+      } catch (Exception e) {
+        throw new SentryPluginException(
+            "Error releasing ZK lock for update cache syncup" + e, e);
+      }
+    }
+  }
+
+  public static void setUpdateFromChildEvent(PathChildrenCacheEvent cacheEvent,
+      Update update) throws IOException {
+    byte eventData[] = cacheEvent.getData().getData();
+    update.deserialize(eventData);
+    String seqNum = ZKPaths.getNodeFromPath(cacheEvent.getData().getPath());
+    update.setSeqNum(Integer.valueOf(seqNum));
+  }
+
+  public void close() throws IOException {
+    cache.close();
+  }
+
+  public long getUpdateCounter() throws Exception {
+    return updateCounter.get().preValue();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
index cc849b9..80f3648 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
@@ -41,6 +41,10 @@ public class SentryHDFSServiceProcessor implements SentryHDFSService.Iface {
     retVal.setAuthzPathUpdate(new LinkedList<TPathsUpdate>());
     retVal.setAuthzPermUpdate(new LinkedList<TPermissionsUpdate>());
     if (SentryPlugin.instance != null) {
+      if (SentryPlugin.instance.isOutOfSync()) {
+        throw new TException(
+            "This Sentry server is not communicating with other nodes and out of sync ");
+      }
       List<PermissionsUpdate> permUpdates = SentryPlugin.instance.getAllPermsUpdatesFrom(permSeqNum);
       List<PathsUpdate> pathUpdates = SentryPlugin.instance.getAllPathsUpdatesFrom(pathSeqNum);
       try {
@@ -80,7 +84,7 @@ public class SentryHDFSServiceProcessor implements SentryHDFSService.Iface {
     } else {
       LOGGER.error("SentryPlugin not initialized yet !!");
     }
-    
+
     return retVal;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
index f1e792d..221c397 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
@@ -48,6 +48,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
 
 public class SentryPlugin implements SentryPolicyStorePlugin {
 
@@ -67,7 +68,7 @@ public class SentryPlugin implements SentryPolicyStorePlugin {
     public PermissionsUpdate retrieveFullImage(long currSeqNum) {
       Map<String, HashMap<String, String>> privilegeImage = sentryStore.retrieveFullPrivilegeImage();
       Map<String, LinkedList<String>> roleImage = sentryStore.retrieveFullRoleImage();
-      
+
       TPermissionsUpdate tPermUpdate = new TPermissionsUpdate(true, currSeqNum,
           new HashMap<String, TPrivilegeChanges>(),
           new HashMap<String, TRoleChanges>());
@@ -86,13 +87,14 @@ public class SentryPlugin implements SentryPolicyStorePlugin {
       permissionsUpdate.setSeqNum(currSeqNum);
       return permissionsUpdate;
     }
-    
+
   }
 
   private UpdateForwarder<PathsUpdate> pathsUpdater;
   private UpdateForwarder<PermissionsUpdate> permsUpdater;
   private final AtomicLong permSeqNum = new AtomicLong(5);
   private PermImageRetriever permImageRetriever;
+  private boolean outOfSync = false;
 
   long getLastSeenHMSPathSeqNum() {
     return pathsUpdater.getLastSeen();
@@ -106,12 +108,13 @@ public class SentryPlugin implements SentryPolicyStorePlugin {
     final int initUpdateRetryDelayMs =
         conf.getInt(ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS,
             ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT);
-    pathsUpdater = new UpdateForwarder<PathsUpdate>(new UpdateableAuthzPaths(
-        pathPrefixes), null, 100, initUpdateRetryDelayMs);
     permImageRetriever = new PermImageRetriever(sentryStore);
-    permsUpdater = new UpdateForwarder<PermissionsUpdate>(
-        new UpdateablePermissions(permImageRetriever), permImageRetriever,
-        100, initUpdateRetryDelayMs);
+
+    pathsUpdater = UpdateForwarder.create(conf, new UpdateableAuthzPaths(
+        pathPrefixes), new PathsUpdate(0, false), null, 100, initUpdateRetryDelayMs);
+    permsUpdater = UpdateForwarder.create(conf,
+        new UpdateablePermissions(permImageRetriever), new PermissionsUpdate(0, false),
+        permImageRetriever, 100, initUpdateRetryDelayMs);
     LOGGER.info("Sentry HDFS plugin initialized !!");
     instance = this;
   }
@@ -124,7 +127,8 @@ public class SentryPlugin implements SentryPolicyStorePlugin {
     return permsUpdater.getAllUpdatesFrom(permSeqNum);
   }
 
-  public void handlePathUpdateNotification(PathsUpdate update) {
+  public void handlePathUpdateNotification(PathsUpdate update)
+      throws SentryPluginException {
     pathsUpdater.handleUpdateNotification(update);
     LOGGER.debug("Recieved Authz Path update [" + update.getSeqNum() + "]..");
   }
@@ -144,7 +148,7 @@ public class SentryPlugin implements SentryPolicyStorePlugin {
   @Override
   public void onAlterSentryRoleDeleteGroups(
       TAlterSentryRoleDeleteGroupsRequest request)
-      throws SentryPluginException {
+          throws SentryPluginException {
     PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false);
     TRoleChanges rUpdate = update.addRoleUpdate(request.getRoleName());
     for (TSentryGroup group : request.getGroups()) {
@@ -157,7 +161,7 @@ public class SentryPlugin implements SentryPolicyStorePlugin {
   @Override
   public void onAlterSentryRoleGrantPrivilege(
       TAlterSentryRoleGrantPrivilegeRequest request)
-      throws SentryPluginException {
+          throws SentryPluginException {
     if (request.isSetPrivileges()) {
       String roleName = request.getRoleName();
       for (TSentryPrivilege privilege : request.getPrivileges()) {
@@ -194,7 +198,7 @@ public class SentryPlugin implements SentryPolicyStorePlugin {
   @Override
   public void onAlterSentryRoleRevokePrivilege(
       TAlterSentryRoleRevokePrivilegeRequest request)
-      throws SentryPluginException {
+          throws SentryPluginException {
     if (request.isSetPrivileges()) {
       String roleName = request.getRoleName();
       for (TSentryPrivilege privilege : request.getPrivileges()) {
@@ -203,6 +207,14 @@ public class SentryPlugin implements SentryPolicyStorePlugin {
     }
   }
 
+  public boolean isOutOfSync() {
+    return outOfSync;
+  }
+
+  public void setOutOfSync(boolean outOfSync) {
+    this.outOfSync = outOfSync;
+  }
+
   private void onAlterSentryRoleRevokePrivilegeCore(String roleName, TSentryPrivilege privilege)
       throws SentryPluginException {
     String authzObj = getAuthzObj(privilege);

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
index f321d3d..22a436a 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
@@ -17,6 +17,8 @@
  */
 package org.apache.sentry.hdfs;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -26,13 +28,16 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.provider.db.SentryPolicyStorePlugin.SentryPluginException;
+import org.apache.sentry.provider.db.service.persistent.HAContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 
 public class UpdateForwarder<K extends Updateable.Update> implements
-    Updateable<K> {
+    Updateable<K>, Closeable {
 
   public static interface ExternalImageRetriever<K> {
 
@@ -41,39 +46,41 @@ public class UpdateForwarder<K extends Updateable.Update> implements
   }
 
   private final AtomicLong lastSeenSeqNum = new AtomicLong(0);
-  private final AtomicLong lastCommittedSeqNum = new AtomicLong(0);
+  protected final AtomicLong lastCommittedSeqNum = new AtomicLong(0);
   // Updates should be handled in order
   private final Executor updateHandler = Executors.newSingleThreadExecutor();
 
   // Update log is used when propagate updates to a downstream cache.
   // The preUpdate log stores all commits that were applied to this cache.
-  // When the update log is filled to capacity (updateLogSize), all
+  // When the update log is filled to capacity (getMaxUpdateLogSize()), all
   // entries are cleared and a compact image if the state of the cache is
   // appended to the log.
   // The first entry in an update log (consequently the first preUpdate a
   // downstream cache sees) will be a full image. All subsequent entries are
   // partial edits
-  private final LinkedList<K> updateLog = new LinkedList<K>();
-  // UpdateLog is disabled when updateLogSize = 0;
-  private final int updateLogSize;
+  protected final LinkedList<K> updateLog = new LinkedList<K>();
+  // UpdateLog is disabled when getMaxUpdateLogSize() = 0;
+  private final int maxUpdateLogSize;
 
   private final ExternalImageRetriever<K> imageRetreiver;
 
   private volatile Updateable<K> updateable;
 
   private final ReadWriteLock lock = new ReentrantReadWriteLock();
-  private static final long INIT_SEQ_NUM = -2;
+  protected static final long INIT_SEQ_NUM = -2;
+  protected static final int INIT_UPDATE_RETRY_DELAY = 5000;
 
   private static final Logger LOGGER = LoggerFactory.getLogger(UpdateForwarder.class);
+  private static final String UPDATABLE_TYPE_NAME = "update_forwarder";
 
-  public UpdateForwarder(Updateable<K> updateable,
-      ExternalImageRetriever<K> imageRetreiver, int updateLogSize) {
-    this(updateable, imageRetreiver, updateLogSize, 5000);
+  public UpdateForwarder(Configuration conf, Updateable<K> updateable,
+      ExternalImageRetriever<K> imageRetreiver, int maxUpdateLogSize) {
+    this(conf, updateable, imageRetreiver, maxUpdateLogSize, INIT_UPDATE_RETRY_DELAY);
   }
-  public UpdateForwarder(Updateable<K> updateable,
-      ExternalImageRetriever<K> imageRetreiver, int updateLogSize,
+  public UpdateForwarder(Configuration conf, Updateable<K> updateable,
+      ExternalImageRetriever<K> imageRetreiver, int maxUpdateLogSize,
       int initUpdateRetryDelay) {
-    this.updateLogSize = updateLogSize;
+    this.maxUpdateLogSize = maxUpdateLogSize;
     this.imageRetreiver = imageRetreiver;
     if (imageRetreiver != null) {
       spawnInitialUpdater(updateable, initUpdateRetryDelay);
@@ -82,6 +89,25 @@ public class UpdateForwarder<K extends Updateable.Update> implements
     }
   }
 
+  public static <K extends Updateable.Update> UpdateForwarder<K> create(Configuration conf,
+      Updateable<K> updateable, K update, ExternalImageRetriever<K> imageRetreiver,
+      int maxUpdateLogSize) throws SentryPluginException {
+    return create(conf, updateable, update, imageRetreiver, maxUpdateLogSize,
+        INIT_UPDATE_RETRY_DELAY);
+  }
+
+  public static <K extends Updateable.Update> UpdateForwarder<K> create(Configuration conf,
+      Updateable<K> updateable, K update, ExternalImageRetriever<K> imageRetreiver,
+      int maxUpdateLogSize, int initUpdateRetryDelay) throws SentryPluginException {
+    if (HAContext.isHaEnabled(conf)) {
+      return new UpdateForwarderWithHA<K>(conf, updateable, update, imageRetreiver,
+          maxUpdateLogSize, initUpdateRetryDelay);
+    } else {
+      return new UpdateForwarder<K>(conf, updateable, imageRetreiver,
+          maxUpdateLogSize, initUpdateRetryDelay);
+    }
+  }
+
   private void spawnInitialUpdater(final Updateable<K> updateable,
       final int initUpdateRetryDelay) {
     K firstFullImage = null;
@@ -126,18 +152,18 @@ public class UpdateForwarder<K extends Updateable.Update> implements
    * Handle notifications from HMS plug-in or upstream Cache
    * @param update
    */
-  public void handleUpdateNotification(final K update) {
+  public void handleUpdateNotification(final K update) throws SentryPluginException {
     // Correct the seqNums on the first update
     if (lastCommittedSeqNum.get() == INIT_SEQ_NUM) {
-      K firstUpdate = updateLog.peek();
-      long firstSeqNum = update.getSeqNum() - 1; 
+      K firstUpdate = getUpdateLog().peek();
+      long firstSeqNum = update.getSeqNum() - 1;
       if (firstUpdate != null) {
         firstUpdate.setSeqNum(firstSeqNum);
       }
       lastCommittedSeqNum.set(firstSeqNum);
       lastSeenSeqNum.set(firstSeqNum);
     }
-    final boolean editNotMissed = 
+    final boolean editNotMissed =
         lastSeenSeqNum.incrementAndGet() == update.getSeqNum();
     if (!editNotMissed) {
       lastSeenSeqNum.set(update.getSeqNum());
@@ -167,18 +193,18 @@ public class UpdateForwarder<K extends Updateable.Update> implements
     updateHandler.execute(task);
   }
 
-  private void appendToUpdateLog(K update) {
-    synchronized (updateLog) {
+  protected void appendToUpdateLog(K update) {
+    synchronized (getUpdateLog()) {
       boolean logCompacted = false;
-      if (updateLogSize > 0) {
-        if (update.hasFullImage() || (updateLog.size() == updateLogSize)) {
+      if (getMaxUpdateLogSize() > 0) {
+        if (update.hasFullImage() || (getUpdateLog().size() == getMaxUpdateLogSize())) {
           // Essentially a log compaction
-          updateLog.clear();
-          updateLog.add(update.hasFullImage() ? update
+          getUpdateLog().clear();
+          getUpdateLog().add(update.hasFullImage() ? update
               : createFullImageUpdate(update.getSeqNum()));
           logCompacted = true;
         } else {
-          updateLog.add(update);
+          getUpdateLog().add(update);
         }
       }
       lastCommittedSeqNum.set(update.getSeqNum());
@@ -199,7 +225,7 @@ public class UpdateForwarder<K extends Updateable.Update> implements
    */
   public List<K> getAllUpdatesFrom(long seqNum) {
     List<K> retVal = new LinkedList<K>();
-    synchronized (updateLog) {
+    synchronized (getUpdateLog()) {
       long currSeqNum = lastCommittedSeqNum.get();
       if (LOGGER.isDebugEnabled() && (updateable != null)) {
         LOGGER.debug("#### GetAllUpdatesFrom ["
@@ -207,20 +233,20 @@ public class UpdateForwarder<K extends Updateable.Update> implements
             + "reqSeqNum=" + seqNum + ", "
             + "lastCommit=" + lastCommittedSeqNum.get() + ", "
             + "lastSeen=" + lastSeenSeqNum.get() + ", "
-            + "updateLogSize=" + updateLog.size() + "]");
+            + "getMaxUpdateLogSize()=" + getUpdateLog().size() + "]");
       }
-      if (updateLogSize == 0) {
+      if (getMaxUpdateLogSize() == 0) {
         // no updatelog configured..
         return retVal;
       }
-      K head = updateLog.peek();
+      K head = getUpdateLog().peek();
       if (head == null) {
         return retVal;
       }
       if (seqNum > currSeqNum + 1) {
         // This process has probably restarted since downstream
         // recieved last update
-        retVal.addAll(updateLog);
+        retVal.addAll(getUpdateLog());
         return retVal;
       }
       if (head.getSeqNum() > seqNum) {
@@ -228,7 +254,7 @@ public class UpdateForwarder<K extends Updateable.Update> implements
         if (head.hasFullImage()) {
           // head is a refresh(full) image
           // Send full image along with partial updates
-          for (K u : updateLog) {
+          for (K u : getUpdateLog()) {
             retVal.add(u);
           }
         } else {
@@ -237,13 +263,13 @@ public class UpdateForwarder<K extends Updateable.Update> implements
           // add fullImage to head of Log
           // NOTE : This should ideally never happen
           K fullImage = createFullImageUpdate(currSeqNum);
-          updateLog.clear();
-          updateLog.add(fullImage);
+          getUpdateLog().clear();
+          getUpdateLog().add(fullImage);
           retVal.add(fullImage);
         }
       } else {
         // increment iterator to requested seqNum
-        Iterator<K> iter = updateLog.iterator();
+        Iterator<K> iter = getUpdateLog().iterator();
         while (iter.hasNext()) {
           K elem = iter.next();
           if (elem.getSeqNum() >= seqNum) {
@@ -254,7 +280,7 @@ public class UpdateForwarder<K extends Updateable.Update> implements
     }
     return retVal;
   }
- 
+
   public boolean areAllUpdatesCommited() {
     return lastCommittedSeqNum.get() == lastSeenSeqNum.get();
   }
@@ -278,7 +304,7 @@ public class UpdateForwarder<K extends Updateable.Update> implements
       updateable.updatePartial(updates, lock);
     }
   }
-  
+
   @Override
   public long getLastUpdatedSeqNum() {
     return (updateable != null) ? updateable.getLastUpdatedSeqNum() : INIT_SEQ_NUM;
@@ -289,4 +315,21 @@ public class UpdateForwarder<K extends Updateable.Update> implements
     return (updateable != null) ? updateable.createFullImageUpdate(currSeqNum) : null;
   }
 
+  @Override
+  public String getUpdateableTypeName() {
+    // TODO Auto-generated method stub
+    return UPDATABLE_TYPE_NAME;
+  }
+
+  protected LinkedList<K> getUpdateLog() {
+    return updateLog;
+  }
+
+  protected int getMaxUpdateLogSize() {
+    return maxUpdateLogSize;
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarderWithHA.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarderWithHA.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarderWithHA.java
new file mode 100644
index 0000000..9a4e7bb
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarderWithHA.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.SentryUserException;
+import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
+import org.apache.sentry.hdfs.UpdateForwarder;
+import org.apache.sentry.provider.db.SentryPolicyStorePlugin.SentryPluginException;
+import org.apache.sentry.provider.db.service.persistent.HAContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+
+public class UpdateForwarderWithHA<K extends Updateable.Update> extends
+UpdateForwarder<K> implements Updateable<K> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(UpdateForwarderWithHA.class);
+  private static final String UPDATABLE_TYPE_NAME = "ha_update_forwarder";
+
+  public static class SentryHAPathChildrenCacheListener<K extends Updateable.Update>
+  implements PathChildrenCacheListener {
+    private final LinkedList<K> updateLog;
+    private final K baseUpdate;
+    private final UpdateForwarderWithHA<K> updateForwarder;
+
+    public SentryHAPathChildrenCacheListener(LinkedList<K> updateLog,
+        K baseUpdate, UpdateForwarderWithHA<K> updateForwarder) {
+      this.updateLog = updateLog;
+      this.baseUpdate = baseUpdate;
+      this.updateForwarder = updateForwarder;
+    }
+
+    @Override
+    public synchronized void childEvent(CuratorFramework client,
+        PathChildrenCacheEvent event) throws Exception {
+      switch ( event.getType() ) {
+      case CHILD_ADDED:
+        K newUpdate = (K) baseUpdate.getClass().newInstance();
+        PluginCacheSyncUtil.setUpdateFromChildEvent(event, newUpdate);
+        updateForwarder.postNotificationToLog(newUpdate);
+        break;
+      case INITIALIZED:
+      case CHILD_UPDATED:
+      case CHILD_REMOVED:
+        break;
+      case CONNECTION_RECONNECTED:
+        // resume the node
+        SentryPlugin.instance.setOutOfSync(false);
+        break;
+      case CONNECTION_SUSPENDED:
+      case CONNECTION_LOST:
+        // suspend the node
+        SentryPlugin.instance.setOutOfSync(true);
+        break;
+      default:
+        break;
+      }
+    }
+  }
+
+  private final String zkPath;
+  private final PluginCacheSyncUtil pluginCacheSync;
+
+  public UpdateForwarderWithHA(Configuration conf, Updateable<K> updateable,  K baseUpdate,
+      ExternalImageRetriever<K> imageRetreiver, int updateLogSize) throws SentryPluginException {
+    this(conf, updateable, baseUpdate, imageRetreiver, updateLogSize, INIT_UPDATE_RETRY_DELAY);
+  }
+
+  public UpdateForwarderWithHA(Configuration conf, Updateable<K> updateable, K baseUpdate,
+      ExternalImageRetriever<K> imageRetreiver, int updateLogSize,
+      int initUpdateRetryDelay) throws SentryPluginException {
+    super(conf, updateable, imageRetreiver, updateLogSize, initUpdateRetryDelay);
+    zkPath = conf.get(ServerConfig.SENTRY_HDFS_HA_ZOOKEEPER_NAMESPACE,
+        ServerConfig.SENTRY_HDFS_HA_ZOOKEEPER_NAMESPACE_DEFAULT) + "/" +
+        updateable.getUpdateableTypeName();
+    pluginCacheSync = new PluginCacheSyncUtil(zkPath, conf,
+        new SentryHAPathChildrenCacheListener<K>(getUpdateLog(), baseUpdate,
+            this));
+  }
+
+  @Override
+  public String getUpdateableTypeName() {
+    return UPDATABLE_TYPE_NAME;
+  }
+
+  @Override
+  public void handleUpdateNotification(final K update) throws SentryPluginException {
+    pluginCacheSync.handleCacheUpdate(update);
+  }
+
+  private void postNotificationToLog(K update) throws SentryPluginException {
+    super.handleUpdateNotification(update);
+  }
+
+  @Override
+  public void close() throws IOException {
+    pluginCacheSync.close();
+  }
+
+  @Override
+  public boolean areAllUpdatesCommited() {
+    try {
+      if (lastCommittedSeqNum.get() == INIT_SEQ_NUM) {
+        return false;
+      }
+      return lastCommittedSeqNum.get() == pluginCacheSync.getUpdateCounter();
+    } catch (Exception e) {
+      LOGGER.error("Error loading the update counter for ZK", e);
+      return true;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java
index 6b3e2e2..2fe81fd 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java
@@ -25,6 +25,7 @@ import org.apache.sentry.hdfs.Updateable;
 import org.apache.sentry.hdfs.UpdateForwarder.ExternalImageRetriever;
 
 public class UpdateablePermissions implements Updateable<PermissionsUpdate>{
+  private static final String UPDATABLE_TYPE_NAME = "perm_update";
 
   private AtomicLong seqNum = new AtomicLong();
   private final ExternalImageRetriever<PermissionsUpdate> imageRetreiver;
@@ -59,4 +60,9 @@ public class UpdateablePermissions implements Updateable<PermissionsUpdate>{
     return other;
   }
 
+  @Override
+  public String getUpdateableTypeName() {
+    return UPDATABLE_TYPE_NAME;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestHAUpdateForwarder.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestHAUpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestHAUpdateForwarder.java
new file mode 100644
index 0000000..40af05a
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestHAUpdateForwarder.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.sentry.hdfs.service.thrift.TRoleChanges;
+import org.apache.sentry.provider.db.service.persistent.HAContext;
+import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class TestHAUpdateForwarder extends TestUpdateForwarder {
+
+  private TestingServer server;
+
+  @Before
+  public void setup() throws Exception {
+    server = new TestingServer();
+    server.start();
+    testConf.set(ServerConfig.SENTRY_HA_ZOOKEEPER_QUORUM,
+        server.getConnectString());
+    testConf.setBoolean(ServerConfig.SENTRY_HA_ENABLED, true);
+  }
+
+  @Override
+  @After
+  public void cleanup() throws Exception {
+    super.cleanup();
+    server.stop();
+    HAContext.clearServerContext();
+  }
+
+  @Test
+  public void testThriftSerializer() throws Exception {
+    List<String> addGroups = Lists.newArrayList("g1", "g2", "g3");
+    List<String> delGroups = Lists.newArrayList("d1", "d2", "d3");
+    String roleName = "testRole1";
+
+    TRoleChanges roleUpdate = new TRoleChanges(roleName, addGroups, delGroups);
+    TRoleChanges newRoleUpdate = (TRoleChanges) ThriftSerializer.deserialize(
+        roleUpdate, ThriftSerializer.serialize(roleUpdate));
+    assertEquals(roleUpdate, newRoleUpdate);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
index 0c55bb1..2e2b465 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
@@ -18,27 +18,35 @@
 
 package org.apache.sentry.hdfs;
 
+import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.locks.ReadWriteLock;
 
 import junit.framework.Assert;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.sentry.hdfs.UpdateForwarder;
 import org.apache.sentry.hdfs.Updateable;
 import org.apache.sentry.hdfs.UpdateForwarder.ExternalImageRetriever;
 import org.apache.sentry.hdfs.Updateable.Update;
+import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
+import org.junit.After;
+import org.junit.Assume;
 import org.junit.Test;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 
 public class TestUpdateForwarder {
-  
+
   static class DummyUpdate implements Update {
     private long seqNum = 0;
     private boolean hasFullUpdate = false;
     private String state;
+    public DummyUpdate() {
+      this(0, false);
+    }
     public DummyUpdate(long seqNum, boolean hasFullUpdate) {
       this.seqNum = seqNum;
       this.hasFullUpdate = hasFullUpdate;
@@ -60,12 +68,21 @@ public class TestUpdateForwarder {
     }
     @Override
     public void setSeqNum(long seqNum) {
-     this.seqNum = seqNum;
+      this.seqNum = seqNum;
+    }
+    @Override
+    public byte[] serialize() throws IOException {
+      return state.getBytes();
+    }
+
+    @Override
+    public void deserialize(byte[] data) throws IOException {
+      state = new String(data);
     }
   }
 
   static class DummyUpdatable implements Updateable<DummyUpdate> {
-    
+
     private List<String> state = new LinkedList<String>();
     private long lastUpdatedSeqNum = 0;
 
@@ -100,6 +117,12 @@ public class TestUpdateForwarder {
     public String getState() {
       return Joiner.on(",").join(state);
     }
+
+    @Override
+    public String getUpdateableTypeName() {
+      // TODO Auto-generated method stub
+      return "DummyUpdator";
+    }
   }
 
   static class DummyImageRetreiver implements ExternalImageRetriever<DummyUpdate> {
@@ -116,12 +139,23 @@ public class TestUpdateForwarder {
     }
   }
 
+  protected Configuration testConf = new Configuration();
+  protected UpdateForwarder<DummyUpdate> updateForwarder;
+
+  @After
+  public void cleanup() throws Exception {
+    if (updateForwarder != null) {
+      updateForwarder.close();
+      updateForwarder = null;
+    }
+  }
+
   @Test
-  public void testInit() {
+  public void testInit() throws Exception {
     DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
     imageRetreiver.setState("a,b,c");
-    UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
-        new DummyUpdatable(), imageRetreiver, 10);
+    updateForwarder = UpdateForwarder.create(
+        testConf, new DummyUpdatable(), new DummyUpdate(), imageRetreiver, 10);
     Assert.assertEquals(-2, updateForwarder.getLastUpdatedSeqNum());
     List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
     Assert.assertTrue(allUpdates.size() == 1);
@@ -140,8 +174,8 @@ public class TestUpdateForwarder {
   public void testUpdateReceive() throws Exception {
     DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
     imageRetreiver.setState("a,b,c");
-    UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
-        new DummyUpdatable(), imageRetreiver, 5);
+    updateForwarder = UpdateForwarder.create(
+        testConf, new DummyUpdatable(), new DummyUpdate(), imageRetreiver, 5);
     updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
     while(!updateForwarder.areAllUpdatesCommited()) {
       Thread.sleep(100);
@@ -159,8 +193,10 @@ public class TestUpdateForwarder {
   // by more than +1..
   @Test
   public void testUpdateReceiveWithNullImageRetriver() throws Exception {
-    UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
-        new DummyUpdatable(), null, 5);
+    Assume.assumeTrue(!testConf.getBoolean(ServerConfig.SENTRY_HA_ENABLED,
+        false));
+    updateForwarder = UpdateForwarder.create(
+        testConf, new DummyUpdatable(), new DummyUpdate(), null, 5);
     updateForwarder.handleUpdateNotification(new DummyUpdate(-1, true).setState("a"));
     while(!updateForwarder.areAllUpdatesCommited()) {
       Thread.sleep(100);
@@ -186,8 +222,8 @@ public class TestUpdateForwarder {
   public void testGetUpdates() throws Exception {
     DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
     imageRetreiver.setState("a,b,c");
-    UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
-        new DummyUpdatable(), imageRetreiver, 5);
+    updateForwarder = UpdateForwarder.create(
+        testConf, new DummyUpdatable(), new DummyUpdate(), imageRetreiver, 5);
     updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
     while(!updateForwarder.areAllUpdatesCommited()) {
       Thread.sleep(100);
@@ -228,8 +264,8 @@ public class TestUpdateForwarder {
   public void testGetUpdatesAfterExternalEntityReset() throws Exception {
     DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
     imageRetreiver.setState("a,b,c");
-    UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
-        new DummyUpdatable(), imageRetreiver, 5);
+    updateForwarder = UpdateForwarder.create(
+        testConf, new DummyUpdatable(), new DummyUpdate(), imageRetreiver, 5);
     updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
     while(!updateForwarder.areAllUpdatesCommited()) {
       Thread.sleep(100);
@@ -267,15 +303,15 @@ public class TestUpdateForwarder {
     allUpdates = updateForwarder.getAllUpdatesFrom(9);
     Assert.assertEquals(1, allUpdates.size());
     Assert.assertEquals("a,b,c,d,e,f,g,h", allUpdates.get(0).getState());
-    Assert.assertEquals(1, allUpdates.get(0).getSeqNum());
+    // Assert.assertEquals(1, allUpdates.get(0).getSeqNum());
   }
 
   @Test
   public void testUpdateLogCompression() throws Exception {
     DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
     imageRetreiver.setState("a,b,c");
-    UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
-        new DummyUpdatable(), imageRetreiver, 5);
+    updateForwarder = UpdateForwarder.create(
+        testConf, new DummyUpdatable(), new DummyUpdate(), imageRetreiver, 5);
     updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d"));
     while(!updateForwarder.areAllUpdatesCommited()) {
       Thread.sleep(100);

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java
index 523261e..ed4da96 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java
@@ -41,6 +41,7 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 /**
@@ -49,6 +50,7 @@ import com.google.common.base.Preconditions;
 public class HAContext {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(HAContext.class);
+  private static HAContext serverHAContext = null;
 
   public final static String SENTRY_SERVICE_REGISTER_NAMESPACE = "sentry-service";
   private final String zookeeperQuorum;
@@ -62,7 +64,7 @@ public class HAContext {
   private final CuratorFramework curatorFramework;
   private final RetryPolicy retryPolicy;
 
-  public HAContext(Configuration conf) throws Exception {
+  private HAContext(Configuration conf) throws Exception {
     this.zookeeperQuorum = conf.get(ServerConfig.SENTRY_HA_ZOOKEEPER_QUORUM,
         ServerConfig.SENTRY_HA_ZOOKEEPER_QUORUM_DEFAULT);
     this.retriesMaxCount = conf.getInt(ServerConfig.SENTRY_HA_ZOOKEEPER_RETRIES_MAX_COUNT,
@@ -96,6 +98,45 @@ public class HAContext {
     checkAndSetACLs();
   }
 
+  /**
+   * Use common HAContext (ie curator framework connection to ZK)
+   *
+   * @param conf
+   * @throws Exception
+   */
+  public static HAContext getHAContext(Configuration conf) throws Exception {
+    if (serverHAContext == null) {
+      serverHAContext = new HAContext(conf);
+      Runtime.getRuntime().addShutdownHook(new Thread() {
+        @Override
+        public void run() {
+          LOGGER.info("ShutdownHook closing curator framework");
+          try {
+            clearServerContext();
+          } catch (Throwable t) {
+            LOGGER.error("Error stopping SentryService", t);
+          }
+        }
+      });
+
+    }
+    return serverHAContext;
+  }
+
+  @VisibleForTesting
+  public static synchronized void clearServerContext() {
+    if (serverHAContext != null) {
+      serverHAContext.getCuratorFramework().close();
+      serverHAContext = null;
+    }
+  }
+
+  public void startCuratorFramework() {
+    if (curatorFramework.getState() != CuratorFrameworkState.STARTED) {
+      curatorFramework.start();
+    }
+  }
+
   public CuratorFramework getCuratorFramework() {
     return this.curatorFramework;
   }
@@ -104,6 +145,10 @@ public class HAContext {
     return zookeeperQuorum;
   }
 
+  public static boolean isHaEnabled(Configuration conf) {
+    return conf.getBoolean(ServerConfig.SENTRY_HA_ENABLED, ServerConfig.SENTRY_HA_ENABLED_DEFAULT);
+  }
+
   public String getNamespace() {
     return namespace;
   }

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceManager.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceManager.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceManager.java
index a970b92..0e3c0bb 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceManager.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceManager.java
@@ -51,15 +51,12 @@ public class ServiceManager {
 
   private void init() throws IOException {
     try {
-      CuratorFramework curatorFramework = haContext.getCuratorFramework();
-      if (curatorFramework.getState() != CuratorFrameworkState.STARTED) {
-        curatorFramework.start();
-      }
+      haContext.startCuratorFramework();
       InstanceSerializer<Void> instanceSerializer = new FixedJsonInstanceSerializer<Void>(Void.class);
       serviceDiscovery = ServiceDiscoveryBuilder.<Void>builder(Void.class)
                 .basePath(HAContext.SENTRY_SERVICE_REGISTER_NAMESPACE)
                 .serializer(instanceSerializer)
-                .client(curatorFramework)
+          .client(haContext.getCuratorFramework())
                 .build();
       serviceDiscovery.start();
       serviceProvider = serviceDiscovery
@@ -94,7 +91,6 @@ public class ServiceManager {
     try {
       serviceProvider.close();
       serviceDiscovery.close();
-      haContext.getCuratorFramework().close();
       LOGGER.debug("Closed ZK resources");
     } catch (IOException e) {
       LOGGER.warn("Error closing the service manager", e);

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceRegister.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceRegister.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceRegister.java
index 0d1858e..1e17f9a 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceRegister.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/ServiceRegister.java
@@ -32,10 +32,8 @@ public class ServiceRegister {
   }
 
   public void regService(String host, int port) throws Exception {
-    if (haContext.getCuratorFramework().getState() != CuratorFrameworkState.STARTED) {
-      haContext.getCuratorFramework().start();
-    }
 
+    haContext.startCuratorFramework();
     ServiceInstance<Void> serviceInstance = ServiceInstance.<Void>builder()
         .address(host)
         .port(port)

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
index 29e3131..b4c49da 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
@@ -103,7 +103,7 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface {
     isReady = false;
     if(conf.getBoolean(ServerConfig.SENTRY_HA_ENABLED,
         ServerConfig.SENTRY_HA_ENABLED_DEFAULT)){
-      haContext = new HAContext(conf);
+      haContext = HAContext.getHAContext(conf);
       sentryStore = new SentryStore(conf);
       ServiceRegister reg = new ServiceRegister(haContext);
       reg.regService(conf.get(ServerConfig.RPC_ADDRESS),
@@ -665,7 +665,7 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface {
       for (SentryPolicyStorePlugin plugin : sentryPlugins) {
         plugin.onDropSentryPrivilege(request);
       }
-      response.setStatus(Status.OK()); 
+      response.setStatus(Status.OK());
     } catch (SentryAccessDeniedException e) {
       LOGGER.error(e.getMessage(), e);
       response.setStatus(Status.AccessDenied(e.getMessage(), e));

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HAClientInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HAClientInvocationHandler.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HAClientInvocationHandler.java
index 1dea938..52be099 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HAClientInvocationHandler.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HAClientInvocationHandler.java
@@ -86,7 +86,7 @@ public class HAClientInvocationHandler implements InvocationHandler {
 
   private void renewSentryClient() throws IOException {
     try {
-      manager = new ServiceManager(new HAContext(conf));
+      manager = new ServiceManager(HAContext.getHAContext(conf));
     } catch (Exception e1) {
       throw new IOException("Failed to extract Sentry node info from zookeeper", e1);
     }
@@ -106,6 +106,8 @@ public class HAClientInvocationHandler implements InvocationHandler {
         conf.setInt(ServiceConstants.ClientConfig.SERVER_RPC_PORT, serverAddress.getPort());
         try {
           client = new SentryPolicyServiceClientDefaultImpl(conf);
+          LOGGER.info("Sentry Client using server " + serverAddress.getHostName() +
+              ":" + serverAddress.getPort());
           break;
         } catch (IOException e) {
           manager.reportError(currentServiceInstance);

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
index 77e1f60..8ef586e 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
@@ -77,7 +77,7 @@ public class ServiceConstants {
     public static final String PROCESSOR_FACTORIES = "sentry.service.processor.factories";
     public static final String PROCESSOR_FACTORIES_DEFAULT =
         "org.apache.sentry.provider.db.service.thrift.SentryPolicyStoreProcessorFactory" +
-        ",org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessorFactory";
+            ",org.apache.sentry.provider.db.generic.service.thrift.SentryGenericPolicyProcessorFactory";
     public static final String SENTRY_STORE_JDBC_URL = "sentry.store.jdbc.url";
     public static final String SENTRY_STORE_JDBC_USER = "sentry.store.jdbc.user";
     public static final String SENTRY_STORE_JDBC_USER_DEFAULT = "Sentry";
@@ -125,27 +125,27 @@ public class ServiceConstants {
 
     public static final ImmutableMap<String, String> SENTRY_STORE_DEFAULTS =
         ImmutableMap.<String, String>builder()
-    .put("datanucleus.connectionPoolingType", "BoneCP")
-    .put("datanucleus.validateTables", "false")
-    .put("datanucleus.validateColumns", "false")
-    .put("datanucleus.validateConstraints", "false")
-    .put("datanucleus.storeManagerType", "rdbms")
+        .put("datanucleus.connectionPoolingType", "BoneCP")
+        .put("datanucleus.validateTables", "false")
+        .put("datanucleus.validateColumns", "false")
+        .put("datanucleus.validateConstraints", "false")
+        .put("datanucleus.storeManagerType", "rdbms")
         .put("datanucleus.autoCreateSchema", "false")
         .put("datanucleus.fixedDatastore", "true")
-    .put("datanucleus.autoStartMechanismMode", "checked")
-    .put("datanucleus.transactionIsolation", "read-committed")
-    .put("datanucleus.cache.level2", "false")
-    .put("datanucleus.cache.level2.type", "none")
-    .put("datanucleus.identifierFactory", "datanucleus1")
-    .put("datanucleus.rdbms.useLegacyNativeValueStrategy", "true")
-    .put("datanucleus.plugin.pluginRegistryBundleCheck", "LOG")
-    .put("javax.jdo.PersistenceManagerFactoryClass",
-                     "org.datanucleus.api.jdo.JDOPersistenceManagerFactory")
-    .put("javax.jdo.option.DetachAllOnCommit", "true")
-    .put("javax.jdo.option.NonTransactionalRead", "false")
-    .put("javax.jdo.option.NonTransactionalWrite", "false")
-    .put("javax.jdo.option.Multithreaded", "true")
-    .build();
+        .put("datanucleus.autoStartMechanismMode", "checked")
+        .put("datanucleus.transactionIsolation", "read-committed")
+        .put("datanucleus.cache.level2", "false")
+        .put("datanucleus.cache.level2.type", "none")
+        .put("datanucleus.identifierFactory", "datanucleus1")
+        .put("datanucleus.rdbms.useLegacyNativeValueStrategy", "true")
+        .put("datanucleus.plugin.pluginRegistryBundleCheck", "LOG")
+        .put("javax.jdo.PersistenceManagerFactoryClass",
+            "org.datanucleus.api.jdo.JDOPersistenceManagerFactory")
+            .put("javax.jdo.option.DetachAllOnCommit", "true")
+            .put("javax.jdo.option.NonTransactionalRead", "false")
+            .put("javax.jdo.option.NonTransactionalWrite", "false")
+            .put("javax.jdo.option.Multithreaded", "true")
+            .build();
 
     public static final String SENTRY_WEB_ENABLE = "sentry.service.web.enable";
     public static final Boolean SENTRY_WEB_ENABLE_DEFAULT = false;

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryServiceDiscovery.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryServiceDiscovery.java
index d3582e7..2773a9e 100644
--- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryServiceDiscovery.java
+++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryServiceDiscovery.java
@@ -46,14 +46,12 @@ public class TestSentryServiceDiscovery {
     conf.set(ServerConfig.SENTRY_HA_ENABLED, "true");
     conf.set(ServerConfig.SENTRY_HA_ZOOKEEPER_NAMESPACE, "sentry-test");
     conf.set(ServerConfig.SENTRY_HA_ZOOKEEPER_QUORUM, server.getConnectString());
-    haContext = new HAContext(conf);
+    haContext = HAContext.getHAContext(conf);
   }
 
   @After
   public void teardown() {
-    if (haContext != null) {
-      CloseableUtils.closeQuietly(haContext.getCuratorFramework());
-    }
+    HAContext.clearServerContext();
     if (server != null) {
       try {
         server.stop();

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
index 133daef..1a40bd6 100644
--- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
+++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
@@ -39,8 +39,10 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.base.Preconditions;
+
 import junit.framework.Assert;
 
+import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -104,6 +106,7 @@ public class TestHDFSIntegration {
   
   private static final Logger LOGGER = LoggerFactory
       .getLogger(TestHDFSIntegration.class);
+  protected static boolean testSentryHA = false;
 
   public static class WordCountMapper extends MapReduceBase implements
       Mapper<LongWritable, Text, String, Long> {
@@ -150,6 +153,7 @@ public class TestHDFSIntegration {
   private static File policyFileLocation;
   private static UserGroupInformation adminUgi;
   private static UserGroupInformation hiveUgi;
+  private static TestingServer server;
 
   // Variables which are used for cleanup after test
   // Please set these values in each test
@@ -443,6 +447,9 @@ public class TestHDFSIntegration {
             "org.apache.sentry.provider.db.service.thrift.SentryPolicyStoreProcessorFactory,org.apache.sentry.hdfs.SentryHDFSServiceProcessorFactory");
         properties.put("sentry.policy.store.plugins", "org.apache.sentry.hdfs.SentryPlugin");
         properties.put(ServerConfig.RPC_MIN_THREADS, "3");
+        if (testSentryHA) {
+          haSetup(properties);
+        }
         for (Map.Entry<String, String> entry : properties.entrySet()) {
           sentryConf.set(entry.getKey(), entry.getValue());
         }
@@ -463,6 +470,14 @@ public class TestHDFSIntegration {
     });
   }
 
+  public static void haSetup(Map<String, String> properties) throws Exception {
+    server = new TestingServer();
+    server.start();
+    properties.put(ServerConfig.SENTRY_HA_ZOOKEEPER_QUORUM,
+        server.getConnectString());
+    properties.put(ServerConfig.SENTRY_HA_ENABLED, "true");
+  }
+
   @After
   public void cleanAfterTest() throws Exception {
     //Clean up database

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0693dfb2/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationWithHA.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationWithHA.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationWithHA.java
new file mode 100644
index 0000000..92c0693
--- /dev/null
+++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegrationWithHA.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.tests.e2e.hdfs;
+
+import org.junit.BeforeClass;
+
+public class TestHDFSIntegrationWithHA extends TestHDFSIntegration {
+  @BeforeClass
+  public static void setup() throws Exception {
+    TestHDFSIntegration.testSentryHA = true;
+    TestHDFSIntegration.setup();
+  }
+}