You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by pr...@apache.org on 2015/03/31 19:32:52 UTC

incubator-sentry git commit: SENTRY-676: Address Sentry HA issues in secure cluster. (Prasad Mujudmar, reviewed by Colin Ma)

Repository: incubator-sentry
Updated Branches:
  refs/heads/master 51f9d262f -> a9a6c6f4a


SENTRY-676: Address Sentry HA issues in secure cluster. (Prasad Mujudmar, reviewed by Colin Ma)


Project: http://git-wip-us.apache.org/repos/asf/incubator-sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-sentry/commit/a9a6c6f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-sentry/tree/a9a6c6f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-sentry/diff/a9a6c6f4

Branch: refs/heads/master
Commit: a9a6c6f4a1d88a8aca5a541218deff2ce47fcd5f
Parents: 51f9d26
Author: Prasad Mujumdar <pr...@apache.org>
Authored: Tue Mar 31 10:32:52 2015 -0700
Committer: Prasad Mujumdar <pr...@apache.org>
Committed: Tue Mar 31 10:32:52 2015 -0700

----------------------------------------------------------------------
 .../SentryMetastorePostEventListener.java       |   6 +-
 sentry-dist/src/main/assembly/bin.xml           |  10 +
 sentry-hdfs/sentry-hdfs-common/pom.xml          |   7 +-
 .../apache/sentry/hdfs/SentryAuthzUpdate.java   |  41 ++++
 .../sentry/hdfs/SentryHDFSServiceClient.java    | 217 +------------------
 .../SentryHDFSServiceClientDefaultImpl.java     | 213 ++++++++++++++++++
 .../hdfs/SentryHDFSServiceClientFactory.java    |  43 ++++
 .../sentry/hdfs/SentryHdfsServiceException.java |  33 +++
 .../apache/sentry/hdfs/ServiceConstants.java    |   3 +-
 .../hdfs/ha/HdfsHAClientInvocationHandler.java  | 144 ++++++++++++
 .../hdfs/SentryHdfsServiceIntegrationBase.java  |   4 +-
 .../sentry/hdfs/SentryAuthorizationInfo.java    |   2 +-
 .../org/apache/sentry/hdfs/SentryUpdater.java   |   4 +-
 .../org/apache/sentry/hdfs/MetastorePlugin.java |  27 ++-
 .../sentry/hdfs/MetastorePluginWithHA.java      |  22 +-
 .../apache/sentry/hdfs/PluginCacheSyncUtil.java |  15 +-
 .../apache/sentry/hdfs/TestUpdateForwarder.java |   8 +
 .../db/service/persistent/HAContext.java        |  51 +++--
 .../thrift/SentryPolicyStoreProcessor.java      |   6 +-
 19 files changed, 604 insertions(+), 252 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/a9a6c6f4/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListener.java
----------------------------------------------------------------------
diff --git a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListener.java b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListener.java
index b6a9a47..4924669 100644
--- a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListener.java
+++ b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListener.java
@@ -56,7 +56,7 @@ public class SentryMetastorePostEventListener extends MetaStoreEventListener {
   private final HiveAuthzConf authzConf;
   private final Server server;
 
-  private List<SentryMetastoreListenerPlugin> sentryPlugins = new ArrayList<SentryMetastoreListenerPlugin>(); 
+  private List<SentryMetastoreListenerPlugin> sentryPlugins = new ArrayList<SentryMetastoreListenerPlugin>();
 
   public SentryMetastorePostEventListener(Configuration config) {
     super(config);
@@ -66,6 +66,7 @@ public class SentryMetastorePostEventListener extends MetaStoreEventListener {
     Iterable<String> pluginClasses = ConfUtilties.CLASS_SPLITTER
         .split(config.get(ServerConfig.SENTRY_METASTORE_PLUGINS,
             ServerConfig.SENTRY_METASTORE_PLUGINS_DEFAULT).trim());
+
     try {
       for (String pluginClassStr : pluginClasses) {
         Class<?> clazz = config.getClassByName(pluginClassStr);
@@ -75,7 +76,8 @@ public class SentryMetastorePostEventListener extends MetaStoreEventListener {
               + SentryMetastoreListenerPlugin.class.getName());
         }
         SentryMetastoreListenerPlugin plugin = (SentryMetastoreListenerPlugin) clazz
-            .getConstructor(Configuration.class).newInstance(config);
+            .getConstructor(Configuration.class, Configuration.class)
+            .newInstance(config, authzConf);
         sentryPlugins.add(plugin);
       }
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/a9a6c6f4/sentry-dist/src/main/assembly/bin.xml
----------------------------------------------------------------------
diff --git a/sentry-dist/src/main/assembly/bin.xml b/sentry-dist/src/main/assembly/bin.xml
index beaa348..5727fc9 100644
--- a/sentry-dist/src/main/assembly/bin.xml
+++ b/sentry-dist/src/main/assembly/bin.xml
@@ -71,6 +71,16 @@
         <include>org.apache.derby:derby</include>
       </includes>
     </dependencySet>
+    <dependencySet>
+      <outputDirectory>lib/plugins</outputDirectory>
+      <unpack>false</unpack>
+      <useProjectArtifact>false</useProjectArtifact>
+      <useStrictFiltering>true</useStrictFiltering>
+      <useTransitiveFiltering>true</useTransitiveFiltering>
+      <includes>
+        <include>org.apache.curator:curator-x-discovery</include>
+      </includes>
+    </dependencySet>
   </dependencySets>
 
   <fileSets>

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/a9a6c6f4/sentry-hdfs/sentry-hdfs-common/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/pom.xml b/sentry-hdfs/sentry-hdfs-common/pom.xml
index dfbfc86..a547593 100644
--- a/sentry-hdfs/sentry-hdfs-common/pom.xml
+++ b/sentry-hdfs/sentry-hdfs-common/pom.xml
@@ -55,6 +55,11 @@ limitations under the License.
       <scope>provided</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-x-discovery</artifactId>
+      <version>${curator.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-minikdc</artifactId>
       <scope>test</scope>
@@ -62,7 +67,7 @@ limitations under the License.
     <dependency>
       <groupId>org.apache.sentry</groupId>
       <artifactId>sentry-provider-db</artifactId>
-      <scope>test</scope>
+      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.sentry</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/a9a6c6f4/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/SentryAuthzUpdate.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/SentryAuthzUpdate.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/SentryAuthzUpdate.java
new file mode 100644
index 0000000..4cf439b
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/SentryAuthzUpdate.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import java.util.List;
+
+public class SentryAuthzUpdate {
+
+  private final List<PermissionsUpdate> permUpdates;
+  private final List<PathsUpdate> pathUpdates;
+
+  public SentryAuthzUpdate(List<PermissionsUpdate> permUpdates,
+      List<PathsUpdate> pathUpdates) {
+    this.permUpdates = permUpdates;
+    this.pathUpdates = pathUpdates;
+  }
+
+  public List<PermissionsUpdate> getPermUpdates() {
+    return permUpdates;
+  }
+
+  public List<PathsUpdate> getPathUpdates() {
+    return pathUpdates;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/a9a6c6f4/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
index 726d88c..956b855 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
@@ -18,219 +18,18 @@
 package org.apache.sentry.hdfs;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedExceptionAction;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import javax.security.auth.callback.CallbackHandler;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SaslRpcServer;
-import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.sentry.hdfs.service.thrift.SentryHDFSService;
-import org.apache.sentry.hdfs.service.thrift.SentryHDFSService.Client;
-import org.apache.sentry.hdfs.service.thrift.TAuthzUpdateResponse;
-import org.apache.sentry.hdfs.service.thrift.TPathsUpdate;
-import org.apache.sentry.hdfs.service.thrift.TPermissionsUpdate;
-import org.apache.sentry.hdfs.ServiceConstants.ClientConfig;
-import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
-import org.apache.thrift.protocol.TBinaryProtocol;
-//import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TMultiplexedProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TSaslClientTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-public class SentryHDFSServiceClient {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(SentryHDFSServiceClient.class);
 
+public interface SentryHDFSServiceClient {
   public static final String SENTRY_HDFS_SERVICE_NAME = "SentryHDFSService";
 
-  public static class SentryAuthzUpdate {
-
-    private final List<PermissionsUpdate> permUpdates;
-    private final List<PathsUpdate> pathUpdates;
-
-    public SentryAuthzUpdate(List<PermissionsUpdate> permUpdates, List<PathsUpdate> pathUpdates) {
-      this.permUpdates = permUpdates;
-      this.pathUpdates = pathUpdates;
-    }
-
-    public List<PermissionsUpdate> getPermUpdates() {
-      return permUpdates;
-    }
-
-    public List<PathsUpdate> getPathUpdates() {
-      return pathUpdates;
-    }
-  }
-  
-  /**
-   * This transport wraps the Sasl transports to set up the right UGI context for open().
-   */
-  public static class UgiSaslClientTransport extends TSaslClientTransport {
-    protected UserGroupInformation ugi = null;
-
-    public UgiSaslClientTransport(String mechanism, String authorizationId,
-        String protocol, String serverName, Map<String, String> props,
-        CallbackHandler cbh, TTransport transport, boolean wrapUgi)
-        throws IOException {
-      super(mechanism, authorizationId, protocol, serverName, props, cbh,
-          transport);
-      if (wrapUgi) {
-        ugi = UserGroupInformation.getLoginUser();
-      }
-    }
+  public void notifyHMSUpdate(PathsUpdate update)
+      throws SentryHdfsServiceException;
 
-    // open the SASL transport with using the current UserGroupInformation
-    // This is needed to get the current login context stored
-    @Override
-    public void open() throws TTransportException {
-      if (ugi == null) {
-        baseOpen();
-      } else {
-        try {
-          // ensure that the ticket is valid before connecting to service. Note that
-          // checkTGTAndReloginFromKeytab() renew the ticket only when more than 80%
-          // of ticket lifetime has passed. 
-          if (ugi.isFromKeytab()) {
-            ugi.checkTGTAndReloginFromKeytab();
-          }
+  public long getLastSeenHMSPathSeqNum() throws SentryHdfsServiceException;
 
-          ugi.doAs(new PrivilegedExceptionAction<Void>() {
-            public Void run() throws TTransportException {
-              baseOpen();
-              return null;
-            }
-          });
-        } catch (IOException e) {
-          throw new TTransportException("Failed to open SASL transport", e);
-        } catch (InterruptedException e) {
-          throw new TTransportException(
-              "Interrupted while opening underlying transport", e);
-        }
-      }
-    }
+  public SentryAuthzUpdate getAllUpdatesFrom(long permSeqNum, long pathSeqNum)
+      throws SentryHdfsServiceException;
 
-    private void baseOpen() throws TTransportException {
-      super.open();
-    }
-  }
-
-  private final Configuration conf;
-  private final InetSocketAddress serverAddress;
-  private final int connectionTimeout;
-  private boolean kerberos;
-  private TTransport transport;
-
-  private String[] serverPrincipalParts;
-  private Client client;
-  
-  public SentryHDFSServiceClient(Configuration conf) throws IOException {
-    this.conf = conf;
-    Preconditions.checkNotNull(this.conf, "Configuration object cannot be null");
-    this.serverAddress = NetUtils.createSocketAddr(Preconditions.checkNotNull(
-                           conf.get(ClientConfig.SERVER_RPC_ADDRESS), "Config key "
-                           + ClientConfig.SERVER_RPC_ADDRESS + " is required"), conf.getInt(
-                           ClientConfig.SERVER_RPC_PORT, ClientConfig.SERVER_RPC_PORT_DEFAULT));
-    this.connectionTimeout = conf.getInt(ClientConfig.SERVER_RPC_CONN_TIMEOUT,
-                                         ClientConfig.SERVER_RPC_CONN_TIMEOUT_DEFAULT);
-    kerberos = ClientConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase(
-        conf.get(ClientConfig.SECURITY_MODE, ClientConfig.SECURITY_MODE_KERBEROS).trim());
-    transport = new TSocket(serverAddress.getHostName(),
-        serverAddress.getPort(), connectionTimeout);
-    if (kerberos) {
-      String serverPrincipal = Preconditions.checkNotNull(
-          conf.get(ClientConfig.PRINCIPAL), ClientConfig.PRINCIPAL + " is required");
-
-      // Resolve server host in the same way as we are doing on server side
-      serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress());
-      LOGGER.info("Using server kerberos principal: " + serverPrincipal);
-
-      serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal);
-      Preconditions.checkArgument(serverPrincipalParts.length == 3,
-           "Kerberos principal should have 3 parts: " + serverPrincipal);
-      boolean wrapUgi = "true".equalsIgnoreCase(conf
-          .get(ClientConfig.SECURITY_USE_UGI_TRANSPORT, "true"));
-      transport = new UgiSaslClientTransport(AuthMethod.KERBEROS.getMechanismName(),
-          null, serverPrincipalParts[0], serverPrincipalParts[1],
-          ClientConfig.SASL_PROPERTIES, null, transport, wrapUgi);
-    } else {
-      serverPrincipalParts = null;
-    }
-    try {
-      transport.open();
-    } catch (TTransportException e) {
-      throw new IOException("Transport exception while opening transport: " + e.getMessage(), e);
-    }
-    LOGGER.info("Successfully opened transport: " + transport + " to " + serverAddress);
-    TProtocol tProtocol = new TBinaryProtocol(transport);
-//    if (conf.getBoolean(ClientConfig.USE_COMPACT_TRANSPORT,
-//        ClientConfig.USE_COMPACT_TRANSPORT_DEFAULT)) {
-//      tProtocol = new TCompactProtocol(transport);
-//    } else {
-//      tProtocol = new TBinaryProtocol(transport);
-//    }
-    TMultiplexedProtocol protocol = new TMultiplexedProtocol(
-      tProtocol, SentryHDFSServiceClient.SENTRY_HDFS_SERVICE_NAME);
-    client = new SentryHDFSService.Client(protocol);
-    LOGGER.info("Successfully created client");
-  }
-
-  public synchronized void notifyHMSUpdate(PathsUpdate update)
-      throws IOException {
-    try {
-      client.handle_hms_notification(update.toThrift());
-    } catch (Exception e) {
-      throw new IOException("Thrift Exception occurred !!", e);
-    }
-  }
-
-  public synchronized long getLastSeenHMSPathSeqNum()
-      throws IOException {
-    try {
-      return client.check_hms_seq_num(-1);
-    } catch (Exception e) {
-      throw new IOException("Thrift Exception occurred !!", e);
-    }
-  }
-
-  public synchronized SentryAuthzUpdate getAllUpdatesFrom(long permSeqNum, long pathSeqNum)
-      throws IOException {
-    SentryAuthzUpdate retVal = new SentryAuthzUpdate(new LinkedList<PermissionsUpdate>(), new LinkedList<PathsUpdate>());
-    try {
-      TAuthzUpdateResponse sentryUpdates = client.get_all_authz_updates_from(permSeqNum, pathSeqNum);
-      if (sentryUpdates.getAuthzPathUpdate() != null) {
-        for (TPathsUpdate pathsUpdate : sentryUpdates.getAuthzPathUpdate()) {
-          retVal.getPathUpdates().add(new PathsUpdate(pathsUpdate));
-        }
-      }
-      if (sentryUpdates.getAuthzPermUpdate() != null) {
-        for (TPermissionsUpdate permsUpdate : sentryUpdates.getAuthzPermUpdate()) {
-          retVal.getPermUpdates().add(new PermissionsUpdate(permsUpdate));
-        }
-      }
-    } catch (Exception e) {
-      throw new IOException("Thrift Exception occurred !!", e);
-    }
-    return retVal;
-  }
-
-  public void close() {
-    if (transport != null) {
-      transport.close();
-    }
-  }
+  public void close();
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/a9a6c6f4/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
new file mode 100644
index 0000000..c727403
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientDefaultImpl.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.LinkedList;
+import java.util.Map;
+
+import javax.security.auth.callback.CallbackHandler;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.sentry.hdfs.service.thrift.SentryHDFSService;
+import org.apache.sentry.hdfs.service.thrift.SentryHDFSService.Client;
+import org.apache.sentry.hdfs.service.thrift.TAuthzUpdateResponse;
+import org.apache.sentry.hdfs.service.thrift.TPathsUpdate;
+import org.apache.sentry.hdfs.service.thrift.TPermissionsUpdate;
+import org.apache.sentry.hdfs.ServiceConstants.ClientConfig;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TMultiplexedProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSaslClientTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class SentryHDFSServiceClientDefaultImpl implements SentryHDFSServiceClient {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SentryHDFSServiceClientDefaultImpl.class);
+
+  /**
+   * This transport wraps the Sasl transports to set up the right UGI context for open().
+   */
+  public static class UgiSaslClientTransport extends TSaslClientTransport {
+    protected UserGroupInformation ugi = null;
+
+    public UgiSaslClientTransport(String mechanism, String authorizationId,
+        String protocol, String serverName, Map<String, String> props,
+        CallbackHandler cbh, TTransport transport, boolean wrapUgi)
+        throws IOException {
+      super(mechanism, authorizationId, protocol, serverName, props, cbh,
+          transport);
+      if (wrapUgi) {
+        ugi = UserGroupInformation.getLoginUser();
+      }
+    }
+
+    // open the SASL transport with using the current UserGroupInformation
+    // This is needed to get the current login context stored
+    @Override
+    public void open() throws TTransportException {
+      if (ugi == null) {
+        baseOpen();
+      } else {
+        try {
+          // ensure that the ticket is valid before connecting to service. Note that
+          // checkTGTAndReloginFromKeytab() renew the ticket only when more than 80%
+          // of ticket lifetime has passed. 
+          if (ugi.isFromKeytab()) {
+            ugi.checkTGTAndReloginFromKeytab();
+          }
+
+          ugi.doAs(new PrivilegedExceptionAction<Void>() {
+            public Void run() throws TTransportException {
+              baseOpen();
+              return null;
+            }
+          });
+        } catch (IOException e) {
+          throw new TTransportException("Failed to open SASL transport", e);
+        } catch (InterruptedException e) {
+          throw new TTransportException(
+              "Interrupted while opening underlying transport", e);
+        }
+      }
+    }
+
+    private void baseOpen() throws TTransportException {
+      super.open();
+    }
+  }
+
+  private final Configuration conf;
+  private final InetSocketAddress serverAddress;
+  private final int connectionTimeout;
+  private boolean kerberos;
+  private TTransport transport;
+
+  private String[] serverPrincipalParts;
+  private Client client;
+
+  public SentryHDFSServiceClientDefaultImpl(Configuration conf) throws IOException {
+    this.conf = conf;
+    Preconditions.checkNotNull(this.conf, "Configuration object cannot be null");
+    this.serverAddress = NetUtils.createSocketAddr(Preconditions.checkNotNull(
+                           conf.get(ClientConfig.SERVER_RPC_ADDRESS), "Config key "
+                           + ClientConfig.SERVER_RPC_ADDRESS + " is required"), conf.getInt(
+                           ClientConfig.SERVER_RPC_PORT, ClientConfig.SERVER_RPC_PORT_DEFAULT));
+    this.connectionTimeout = conf.getInt(ClientConfig.SERVER_RPC_CONN_TIMEOUT,
+                                         ClientConfig.SERVER_RPC_CONN_TIMEOUT_DEFAULT);
+    kerberos = ClientConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase(
+        conf.get(ClientConfig.SECURITY_MODE, ClientConfig.SECURITY_MODE_KERBEROS).trim());
+    transport = new TSocket(serverAddress.getHostName(),
+        serverAddress.getPort(), connectionTimeout);
+    if (kerberos) {
+      String serverPrincipal = Preconditions.checkNotNull(
+          conf.get(ClientConfig.PRINCIPAL), ClientConfig.PRINCIPAL + " is required");
+
+      // Resolve server host in the same way as we are doing on server side
+      serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress());
+      LOGGER.info("Using server kerberos principal: " + serverPrincipal);
+
+      serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal);
+      Preconditions.checkArgument(serverPrincipalParts.length == 3,
+           "Kerberos principal should have 3 parts: " + serverPrincipal);
+      boolean wrapUgi = "true".equalsIgnoreCase(conf
+          .get(ClientConfig.SECURITY_USE_UGI_TRANSPORT, "true"));
+      transport = new UgiSaslClientTransport(AuthMethod.KERBEROS.getMechanismName(),
+          null, serverPrincipalParts[0], serverPrincipalParts[1],
+          ClientConfig.SASL_PROPERTIES, null, transport, wrapUgi);
+    } else {
+      serverPrincipalParts = null;
+    }
+    try {
+      transport.open();
+    } catch (TTransportException e) {
+      throw new IOException("Transport exception while opening transport: " + e.getMessage(), e);
+    }
+    LOGGER.info("Successfully opened transport: " + transport + " to " + serverAddress);
+    TProtocol tProtocol = null;
+    if (conf.getBoolean(ClientConfig.USE_COMPACT_TRANSPORT,
+        ClientConfig.USE_COMPACT_TRANSPORT_DEFAULT)) {
+      tProtocol = new TCompactProtocol(transport);
+    } else {
+      tProtocol = new TBinaryProtocol(transport);
+    }
+    TMultiplexedProtocol protocol = new TMultiplexedProtocol(
+      tProtocol, SentryHDFSServiceClient.SENTRY_HDFS_SERVICE_NAME);
+    client = new SentryHDFSService.Client(protocol);
+    LOGGER.info("Successfully created client");
+  }
+
+  public synchronized void notifyHMSUpdate(PathsUpdate update)
+      throws SentryHdfsServiceException {
+    try {
+      client.handle_hms_notification(update.toThrift());
+    } catch (Exception e) {
+      throw new SentryHdfsServiceException("Thrift Exception occurred !!", e);
+    }
+  }
+
+  public synchronized long getLastSeenHMSPathSeqNum()
+      throws SentryHdfsServiceException {
+    try {
+      return client.check_hms_seq_num(-1);
+    } catch (Exception e) {
+      throw new SentryHdfsServiceException("Thrift Exception occurred !!", e);
+    }
+  }
+
+  public synchronized SentryAuthzUpdate getAllUpdatesFrom(long permSeqNum, long pathSeqNum)
+ throws SentryHdfsServiceException {
+    SentryAuthzUpdate retVal = new SentryAuthzUpdate(new LinkedList<PermissionsUpdate>(), new LinkedList<PathsUpdate>());
+    try {
+      TAuthzUpdateResponse sentryUpdates = client.get_all_authz_updates_from(permSeqNum, pathSeqNum);
+      if (sentryUpdates.getAuthzPathUpdate() != null) {
+        for (TPathsUpdate pathsUpdate : sentryUpdates.getAuthzPathUpdate()) {
+          retVal.getPathUpdates().add(new PathsUpdate(pathsUpdate));
+        }
+      }
+      if (sentryUpdates.getAuthzPermUpdate() != null) {
+        for (TPermissionsUpdate permsUpdate : sentryUpdates.getAuthzPermUpdate()) {
+          retVal.getPermUpdates().add(new PermissionsUpdate(permsUpdate));
+        }
+      }
+    } catch (Exception e) {
+      throw new SentryHdfsServiceException("Thrift Exception occurred !!", e);
+    }
+    return retVal;
+  }
+
+  public void close() {
+    if (transport != null) {
+      transport.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/a9a6c6f4/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
new file mode 100644
index 0000000..58aa10d
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClientFactory.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import java.lang.reflect.Proxy;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.hdfs.ha.HdfsHAClientInvocationHandler;
+import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
+
+/**
+ * Client factory to create normal client or proxy with HA invocation handler
+ */
+public class SentryHDFSServiceClientFactory {
+  public static SentryHDFSServiceClient create(Configuration conf)
+      throws Exception {
+    boolean haEnabled = conf.getBoolean(ServerConfig.SENTRY_HA_ENABLED, false);
+    if (haEnabled) {
+      return (SentryHDFSServiceClient) Proxy.newProxyInstance(
+          SentryHDFSServiceClientDefaultImpl.class.getClassLoader(),
+          SentryHDFSServiceClientDefaultImpl.class.getInterfaces(),
+          new HdfsHAClientInvocationHandler(conf));
+    } else {
+      return new SentryHDFSServiceClientDefaultImpl(conf);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/a9a6c6f4/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/SentryHdfsServiceException.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/SentryHdfsServiceException.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/SentryHdfsServiceException.java
new file mode 100644
index 0000000..307d8c3
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/SentryHdfsServiceException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.hdfs;
+
+public class SentryHdfsServiceException extends RuntimeException {
+  private static final long serialVersionUID = 1511645864949767378L;
+
+  public SentryHdfsServiceException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public SentryHdfsServiceException(String message) {
+    super(message);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/a9a6c6f4/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
index 516f773..489d165 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
@@ -49,8 +49,9 @@ public class ServiceConstants {
     public static final int SENTRY_HDFS_SYNC_CHECKER_PERIOD_DEFAULT = 1000;
     public static final String SENTRY_HDFS_HA_ZOOKEEPER_NAMESPACE = "sentry.hdfs.ha.zookeeper.namespace";
     public static final String SENTRY_HDFS_HA_ZOOKEEPER_NAMESPACE_DEFAULT = "/sentry_hdfs";
-    public static final String SENTRY_METASTORE_HA_ZOOKEEPER_NAMESPACE = "sentry.hdfs.ha.zookeeper.namespace";
+    public static final String SENTRY_METASTORE_HA_ZOOKEEPER_NAMESPACE = "sentry.metastore.ha.zookeeper.namespace";
     public static final String SENTRY_METASTORE_HA_ZOOKEEPER_NAMESPACE_DEFAULT = "/sentry_metastore";
+
   }
 
   public static class ClientConfig {

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/a9a6c6f4/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ha/HdfsHAClientInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ha/HdfsHAClientInvocationHandler.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ha/HdfsHAClientInvocationHandler.java
new file mode 100644
index 0000000..ec66b2d
--- /dev/null
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ha/HdfsHAClientInvocationHandler.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs.ha;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.curator.x.discovery.ServiceInstance;
+import org.apache.sentry.hdfs.SentryHDFSServiceClientDefaultImpl;
+import org.apache.sentry.hdfs.SentryHdfsServiceException;
+import org.apache.sentry.hdfs.ServiceConstants;
+import org.apache.sentry.provider.db.service.persistent.HAContext;
+import org.apache.sentry.provider.db.service.persistent.ServiceManager;
+import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class HdfsHAClientInvocationHandler implements InvocationHandler {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(HdfsHAClientInvocationHandler.class);
+
+  private final Configuration conf;
+  private ServiceManager manager;
+  private ServiceInstance<Void> currentServiceInstance;
+  private SentryHDFSServiceClientDefaultImpl client = null;
+
+  private static final String THRIFT_EXCEPTION_MESSAGE = "Thrift exception occured ";
+  public static final String SENTRY_HA_ERROR_MESSAGE = "No Sentry server available. "
+      + "Please ensure that at least one Sentry server is online";
+
+  public HdfsHAClientInvocationHandler(Configuration conf) throws Exception {
+    this.conf = conf;
+    checkClientConf();
+  }
+
+  @Override
+  public Object invoke(Object proxy, Method method, Object[] args) throws
+ SentryHdfsServiceException {
+    Object result = null;
+    while (true) {
+      try {
+        if (!method.isAccessible()) {
+          method.setAccessible(true);
+        }
+        // The client is initialized in the first call instead of constructor.
+        // This way we can propagate the connection exception to caller cleanly
+        if (client == null) {
+          renewSentryClient();
+        }
+        result = method.invoke(client, args);
+      } catch (IllegalAccessException e) {
+        throw new SentryHdfsServiceException(e.getMessage(), e.getCause());
+      } catch (InvocationTargetException e) {
+        if (!(e.getTargetException() instanceof SentryHdfsServiceException)) {
+          throw new SentryHdfsServiceException("Error in Sentry HDFS client",
+              e.getTargetException());
+        } else {
+          LOGGER.warn(THRIFT_EXCEPTION_MESSAGE + ": Error in connect current" +
+              " service, will retry other service.", e);
+          if (client != null) {
+            client.close();
+            client = null;
+          }
+          throw (SentryHdfsServiceException) e.getTargetException();
+        }
+      } catch (IOException e1) {
+        // close() doesn't throw exception we supress that in case of connection
+        // loss. Changing SentryPolicyServiceClient#close() to throw an
+        // exception would be a backward incompatible change for Sentry clients.
+        if ("close".equals(method.getName())) {
+          return null;
+        }
+        throw new SentryHdfsServiceException(
+            "Error connecting to sentry service " + e1.getMessage(), e1);
+      }
+      return result;
+    }
+  }
+
+  // Retrieve the new connection endpoint from ZK and connect to new server
+  private void renewSentryClient() throws IOException {
+    try {
+      manager = new ServiceManager(HAContext.getHAContext(conf));
+    } catch (Exception e1) {
+      throw new IOException("Failed to extract Sentry node info from zookeeper", e1);
+    }
+
+    try {
+      while (true) {
+        currentServiceInstance = manager.getServiceInstance();
+        if (currentServiceInstance == null) {
+          throw new IOException(SENTRY_HA_ERROR_MESSAGE);
+        }
+        InetSocketAddress serverAddress =
+            ServiceManager.convertServiceInstance(currentServiceInstance);
+        conf.set(ServiceConstants.ClientConfig.SERVER_RPC_ADDRESS, serverAddress.getHostName());
+        conf.setInt(ServiceConstants.ClientConfig.SERVER_RPC_PORT, serverAddress.getPort());
+        try {
+          client = new SentryHDFSServiceClientDefaultImpl(conf);
+          LOGGER.info("Sentry Client using server " + serverAddress.getHostName() +
+              ":" + serverAddress.getPort());
+          break;
+        } catch (IOException e) {
+          manager.reportError(currentServiceInstance);
+          LOGGER.info("Transport exception while opening transport:", e, e.getMessage());
+        }
+      }
+    } finally {
+      manager.close();
+    }
+  }
+
+  private void checkClientConf() {
+    if (conf.getBoolean(ServerConfig.SENTRY_HA_ZOOKEEPER_SECURITY,
+        ServerConfig.SENTRY_HA_ZOOKEEPER_SECURITY_DEFAULT)) {
+      String serverPrincipal = Preconditions.checkNotNull(conf.get(ServerConfig.PRINCIPAL),
+          ServerConfig.PRINCIPAL + " is required");
+      Preconditions.checkArgument(serverPrincipal.contains(SecurityUtil.HOSTNAME_PATTERN),
+          ServerConfig.PRINCIPAL + " : " + serverPrincipal + " should contain " + SecurityUtil.HOSTNAME_PATTERN);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/a9a6c6f4/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/SentryHdfsServiceIntegrationBase.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/SentryHdfsServiceIntegrationBase.java b/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/SentryHdfsServiceIntegrationBase.java
index f8f7eba..7c75be9 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/SentryHdfsServiceIntegrationBase.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/SentryHdfsServiceIntegrationBase.java
@@ -70,11 +70,11 @@ public class SentryHdfsServiceIntegrationBase extends
           new PrivilegedExceptionAction<SentryHDFSServiceClient>() {
             @Override
             public SentryHDFSServiceClient run() throws Exception {
-              return new SentryHDFSServiceClient(conf);
+              return SentryHDFSServiceClientFactory.create(conf);
             }
           });
     } else {
-      hdfsClient = new SentryHDFSServiceClient(conf);
+      hdfsClient = SentryHDFSServiceClientFactory.create(conf);
     }
     hdfsClient.close();
   }

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/a9a6c6f4/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationInfo.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationInfo.java b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationInfo.java
index f9a1f65..d178c3e 100644
--- a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationInfo.java
+++ b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationInfo.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.sentry.hdfs.SentryHDFSServiceClient.SentryAuthzUpdate;
+import org.apache.sentry.hdfs.SentryAuthzUpdate;
 import org.apache.sentry.hdfs.Updateable.Update;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/a9a6c6f4/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java
index 9540397..422554e 100644
--- a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java
+++ b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java
@@ -18,7 +18,7 @@
 package org.apache.sentry.hdfs;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.sentry.hdfs.SentryHDFSServiceClient.SentryAuthzUpdate;
+import org.apache.sentry.hdfs.SentryAuthzUpdate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,7 +38,7 @@ public class SentryUpdater {
   public SentryAuthzUpdate getUpdates() {
     if (sentryClient == null) {
       try {
-        sentryClient = new SentryHDFSServiceClient(conf);
+        sentryClient = SentryHDFSServiceClientFactory.create(conf);
       } catch (Exception e) {
         LOG.error("Error connecting to Sentry ['{}'] !!",
             e.getMessage());

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/a9a6c6f4/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
index 5277eef..7106e74 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
@@ -89,12 +89,13 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin {
   private Lock notificiationLock;
 
   // Initialized to some value > 1.
-  private static final AtomicLong seqNum = new AtomicLong(5);
+  protected static final AtomicLong seqNum = new AtomicLong(5);
 
   // Has to match the value of seqNum
-  private static volatile long lastSentSeqNum = seqNum.get();
+  protected static volatile long lastSentSeqNum = seqNum.get();
   private volatile boolean syncSent = false;
   private final ExecutorService threadPool;
+  private final Configuration sentryConf;
 
   static class ProxyHMSHandler extends HMSHandler {
     public ProxyHMSHandler(String name, HiveConf conf) throws MetaException {
@@ -102,9 +103,10 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin {
     }
   }
 
-  public MetastorePlugin(Configuration conf) {
+  public MetastorePlugin(Configuration conf, Configuration sentryConf) {
     this.notificiationLock = new ReentrantLock();
     this.conf = new HiveConf((HiveConf)conf);
+    this.sentryConf = new Configuration(sentryConf);
     this.conf.unset(HiveConf.ConfVars.METASTORE_PRE_EVENT_LISTENERS.varname);
     this.conf.unset(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS.varname);
     this.conf.unset(HiveConf.ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname);
@@ -116,7 +118,7 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin {
       throw new RuntimeException(e1);
     }
     try {
-      sentryClient = new SentryHDFSServiceClient(conf);
+      sentryClient = SentryHDFSServiceClientFactory.create(sentryConf);
     } catch (Exception e) {
       sentryClient = null;
       LOGGER.error("Could not connect to Sentry HDFS Service !!", e);
@@ -242,8 +244,8 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin {
   private SentryHDFSServiceClient getClient() {
     if (sentryClient == null) {
       try {
-        sentryClient = new SentryHDFSServiceClient(conf);
-      } catch (IOException e) {
+        sentryClient = SentryHDFSServiceClientFactory.create(sentryConf);
+      } catch (Exception e) {
         sentryClient = null;
         LOGGER.error("Could not connect to Sentry HDFS Service !!", e);
       }
@@ -265,13 +267,12 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin {
     }
   }
 
-  protected void notifySentryAndApplyLocal(PathsUpdate update) {
+  protected void notifySentry(PathsUpdate update) {
     notificiationLock.lock();
     if (!syncSent) {
       new SyncTask().run();
     }
     try {
-      authzPaths.updatePartial(Lists.newArrayList(update), new ReentrantReadWriteLock());
       notifySentryNoLock(update);
     } finally {
       lastSentSeqNum = update.getSeqNum();
@@ -279,4 +280,14 @@ public class MetastorePlugin extends SentryMetastoreListenerPlugin {
       LOGGER.debug("#### HMS Path Last update sent : ["+ lastSentSeqNum + "]");
     }
   }
+
+  protected void applyLocal(PathsUpdate update) {
+    authzPaths.updatePartial(Lists.newArrayList(update), new ReentrantReadWriteLock());
+  }
+
+  protected void notifySentryAndApplyLocal(PathsUpdate update) {
+    applyLocal(update);
+    notifySentry(update);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/a9a6c6f4/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java
index 271e121..ee5e0f9 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePluginWithHA.java
@@ -18,14 +18,11 @@
 package org.apache.sentry.hdfs;
 
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
-import org.apache.curator.utils.ZKPaths;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
 import org.apache.sentry.provider.db.SentryPolicyStorePlugin.SentryPluginException;
-import org.apache.sentry.provider.db.service.persistent.HAContext;
 import org.apache.sentry.binding.metastore.MetastoreAuthzBinding;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,25 +66,34 @@ public class MetastorePluginWithHA extends MetastorePlugin {
   private String zkPath;
   private PluginCacheSyncUtil pluginCacheSync;
 
-  public MetastorePluginWithHA(Configuration conf) throws Exception {
-    super(conf);
-    zkPath = conf.get(ServerConfig.SENTRY_METASTORE_HA_ZOOKEEPER_NAMESPACE,
+  public MetastorePluginWithHA(Configuration conf, Configuration sentryConfig) throws Exception {
+    super(conf, sentryConfig);
+    zkPath = sentryConfig.get(ServerConfig.SENTRY_METASTORE_HA_ZOOKEEPER_NAMESPACE,
         ServerConfig.SENTRY_METASTORE_HA_ZOOKEEPER_NAMESPACE_DEFAULT);
 
-    pluginCacheSync = new PluginCacheSyncUtil(zkPath, conf,
+    pluginCacheSync = new PluginCacheSyncUtil(zkPath, sentryConfig,
         new SentryMetastoreHACacheListener(this));
+    // start seq# from the last global seq
+    seqNum.set(pluginCacheSync.getUpdateCounter());
+    MetastorePlugin.lastSentSeqNum = seqNum.get();
   }
 
   @Override
   protected void notifySentryAndApplyLocal(PathsUpdate update) {
     try {
+      // push to ZK in order to keep the metastore local cache in sync
       pluginCacheSync.handleCacheUpdate(update);
+
+      // notify Sentry. Note that Sentry service already has a cache
+      // sync mechanism to replicate this update to all other Sentry servers
+      notifySentry(update);
     } catch (SentryPluginException e) {
       LOGGER.error("Error pushing update to cache", e);
     }
   }
 
+  // apply the update to local cache
   private void processCacheNotification(PathsUpdate update) {
-    super.notifySentryAndApplyLocal(update);
+    super.applyLocal(update);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/a9a6c6f4/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PluginCacheSyncUtil.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PluginCacheSyncUtil.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PluginCacheSyncUtil.java
index 94c9895..e297286 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PluginCacheSyncUtil.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PluginCacheSyncUtil.java
@@ -33,6 +33,7 @@ import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
 import org.apache.sentry.hdfs.Updateable.Update;
 import org.apache.sentry.provider.db.SentryPolicyStorePlugin.SentryPluginException;
 import org.apache.sentry.provider.db.service.persistent.HAContext;
+import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -131,9 +132,15 @@ public class PluginCacheSyncUtil {
     }
 
     try {
-      // increment the global sequence counter
       try {
-        update.setSeqNum(updateCounter.increment().postValue());
+        // increment the global sequence counter if this is not a full update
+        if (!update.hasFullImage()) {
+          update.setSeqNum(updateCounter.increment().postValue());
+        } else {
+          if (updateCounter.get().preValue() < update.getSeqNum()) {
+            updateCounter.add(update.getSeqNum() - updateCounter.get().preValue());
+          }
+        }
       } catch (Exception e1) {
         throw new SentryPluginException(
             "Error setting ZK counter for update cache syncup" + e1, e1);
@@ -204,6 +211,10 @@ public class PluginCacheSyncUtil {
             haContext.getCuratorFramework().delete().forPath(pathToDelete);
             gcCounter.increment();
             LOGGER.debug("Deleted znode " + pathToDelete);
+          } catch (NoNodeException eN) {
+            // We might have endup with holes in the node counter due to network/ZK errors
+            // Ignore the delete error if the node doesn't exist and move on
+            gcCounter.increment();
           } catch (Exception e) {
             LOGGER.info("Error cleaning up node " + pathToDelete, e);
             break;

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/a9a6c6f4/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
index ee9a7a3..54a83b0 100644
--- a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
+++ b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
@@ -262,6 +262,14 @@ public class TestUpdateForwarder {
 
   @Test
   public void testGetUpdatesAfterExternalEntityReset() throws Exception {
+    /*
+     * Disabled for Sentry HA. Since the sequence numbers are trakced in ZK, the
+     * lower sequence updates are ignored which causes this test to fail in HA
+     * mode
+     */
+    Assume.assumeTrue(!testConf.getBoolean(ServerConfig.SENTRY_HA_ENABLED,
+        false));
+
     DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
     imageRetreiver.setState("a,b,c");
     updateForwarder = UpdateForwarder.create(

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/a9a6c6f4/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java
index c3aa23c..71935b1 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HAContext.java
@@ -19,6 +19,7 @@
 package org.apache.sentry.provider.db.service.persistent;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -43,6 +44,8 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
 
 /**
  * Stores the HA related context
@@ -51,6 +54,7 @@ public class HAContext {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(HAContext.class);
   private static HAContext serverHAContext = null;
+  private static boolean aclChecked = false;
 
   public final static String SENTRY_SERVICE_REGISTER_NAMESPACE = "sentry-service";
   private final String zookeeperQuorum;
@@ -64,7 +68,7 @@ public class HAContext {
   private final CuratorFramework curatorFramework;
   private final RetryPolicy retryPolicy;
 
-  private HAContext(Configuration conf) throws Exception {
+  protected HAContext(Configuration conf) throws Exception {
     this.zookeeperQuorum = conf.get(ServerConfig.SENTRY_HA_ZOOKEEPER_QUORUM,
         ServerConfig.SENTRY_HA_ZOOKEEPER_QUORUM_DEFAULT);
     this.retriesMaxCount = conf.getInt(ServerConfig.SENTRY_HA_ZOOKEEPER_RETRIES_MAX_COUNT,
@@ -81,8 +85,20 @@ public class HAContext {
       LOGGER.info("Connecting to ZooKeeper with SASL/Kerberos and using 'sasl' ACLs");
       setJaasConfiguration(conf);
       System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, "Client");
-      saslACL = Collections.singletonList(new ACL(Perms.ALL, new Id("sasl", getServicePrincipal(conf))));
+      saslACL = Lists.newArrayList();
+      saslACL.add(new ACL(Perms.ALL, new Id("sasl", getServicePrincipal(conf,
+          ServerConfig.PRINCIPAL))));
+      saslACL.add(new ACL(Perms.ALL, new Id("sasl", getServicePrincipal(conf,
+              ServerConfig.SERVER_HA_ZOOKEEPER_CLIENT_PRINCIPAL))));
       aclProvider = new SASLOwnerACLProvider();
+      String allowConnect = conf.get(ServerConfig.ALLOW_CONNECT);
+
+      if (!Strings.isNullOrEmpty(allowConnect)) {
+        for (String principal : Arrays.asList(allowConnect.split("\\s*,\\s*"))) {
+          LOGGER.info("Adding acls for " + principal);
+          saslACL.add(new ACL(Perms.ALL, new Id("sasl", principal)));
+        }
+      }
     } else {
       LOGGER.info("Connecting to ZooKeeper without authentication");
       aclProvider = new DefaultACLProvider();
@@ -95,7 +111,7 @@ public class HAContext {
         .retryPolicy(retryPolicy)
         .aclProvider(aclProvider)
         .build();
-    checkAndSetACLs();
+    startCuratorFramework();
   }
 
   /**
@@ -123,6 +139,13 @@ public class HAContext {
     return serverHAContext;
   }
 
+  // HA context for server which verifies the ZK ACLs on namespace
+  public static HAContext getHAServerContext(Configuration conf) throws Exception {
+    HAContext serverContext = getHAContext(conf);
+    serverContext.checkAndSetACLs();
+    return serverContext;
+  }
+
   @VisibleForTesting
   public static synchronized void clearServerContext() {
     if (serverHAContext != null) {
@@ -162,40 +185,42 @@ public class HAContext {
     Preconditions.checkNotNull(namespace, "Zookeeper namespace should not be null.");
   }
 
-  private String getServicePrincipal(Configuration conf) throws IOException {
-    String principal = conf.get(ServerConfig.PRINCIPAL);
+  protected String getServicePrincipal(Configuration conf, String confProperty)
+      throws IOException {
+    String principal = conf.get(confProperty);
     Preconditions.checkNotNull(principal);
     Preconditions.checkArgument(principal.length() != 0, "Server principal is not right.");
     return principal.split("[/@]")[0];
   }
 
   private void checkAndSetACLs() throws Exception {
-    if (zkSecure) {
+    if (zkSecure && !aclChecked) {
       // If znodes were previously created without security enabled, and now it is, we need to go through all existing znodes
-      // and set the ACLs for them
+      // and set the ACLs for them. This is done just once at the startup
       // We can't get the namespace znode through curator; have to go through zk client
-      if (curatorFramework.getState() != CuratorFrameworkState.STARTED) {
-        curatorFramework.start();
-      }
+      startCuratorFramework();
       String namespace = "/" + curatorFramework.getNamespace();
       if (curatorFramework.getZookeeperClient().getZooKeeper().exists(namespace, null) != null) {
         List<ACL> acls = curatorFramework.getZookeeperClient().getZooKeeper().getACL(namespace, new Stat());
-        if (!acls.get(0).getId().getScheme().equals("sasl")) {
+        if (acls.isEmpty() || !acls.get(0).getId().getScheme().equals("sasl")) {
           LOGGER.info("'sasl' ACLs not set; setting...");
           List<String> children = curatorFramework.getZookeeperClient().getZooKeeper().getChildren(namespace, null);
           for (String child : children) {
-              checkAndSetACLs(namespace + "/" + child);
+            checkAndSetACLs("/" + child);
           }
           curatorFramework.getZookeeperClient().getZooKeeper().setACL(namespace, saslACL, -1);
         }
       }
+      aclChecked = true;
+
     }
   }
 
   private void checkAndSetACLs(String path) throws Exception {
+      LOGGER.info("Setting acls on " + path);
       List<String> children = curatorFramework.getChildren().forPath(path);
       for (String child : children) {
-          checkAndSetACLs(path + "/" + child);
+        checkAndSetACLs(path + "/" + child);
       }
       curatorFramework.setACL().withACL(saslACL).forPath(path);
   }

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/a9a6c6f4/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
index b4c49da..30792f3 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
@@ -101,9 +101,9 @@ public class SentryPolicyStoreProcessor implements SentryPolicyService.Iface {
     this.notificationHandlerInvoker = new NotificationHandlerInvoker(conf,
         createHandlers(conf));
     isReady = false;
-    if(conf.getBoolean(ServerConfig.SENTRY_HA_ENABLED,
-        ServerConfig.SENTRY_HA_ENABLED_DEFAULT)){
-      haContext = HAContext.getHAContext(conf);
+    if (conf.getBoolean(ServerConfig.SENTRY_HA_ENABLED,
+        ServerConfig.SENTRY_HA_ENABLED_DEFAULT)) {
+      haContext = HAContext.getHAServerContext(conf);
       sentryStore = new SentryStore(conf);
       ServiceRegister reg = new ServiceRegister(haContext);
       reg.regService(conf.get(ServerConfig.RPC_ADDRESS),