You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by ak...@apache.org on 2017/07/11 20:20:36 UTC

[2/2] sentry git commit: SENTRY-1630: out of sequence error in HMSFollower (Alex Kolbasov, reviewed by Vamsee Yarlagadda and Na Li)

SENTRY-1630: out of sequence error in HMSFollower (Alex Kolbasov, reviewed by Vamsee Yarlagadda and Na Li)


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/747c2260
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/747c2260
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/747c2260

Branch: refs/heads/sentry-ha-redesign
Commit: 747c226013f31d02e623b82902f2bc62a87fc4e9
Parents: 5c1d559
Author: Alexander Kolbasov <ak...@cloudera.com>
Authored: Tue Jul 11 22:20:09 2017 +0200
Committer: Alexander Kolbasov <ak...@cloudera.com>
Committed: Tue Jul 11 22:20:09 2017 +0200

----------------------------------------------------------------------
 .../sentry/hdfs/FullUpdateInitializer.java      | 454 -----------------
 .../sentry/hdfs/TestFullUpdateInitializer.java  | 320 ------------
 .../service/thrift/FullUpdateInitializer.java   | 492 +++++++++++++++++++
 .../apache/sentry/service/thrift/HMSClient.java |  56 +++
 .../sentry/service/thrift/HMSFollower.java      | 168 ++-----
 .../service/thrift/HiveConnectionFactory.java   |  35 ++
 .../thrift/HiveSimpleConnectionFactory.java     | 129 +++++
 .../service/thrift/SentryKerberosContext.java   |   6 +-
 .../sentry/service/thrift/SentryService.java    |  21 +-
 .../thrift/TestFullUpdateInitializer.java       | 346 +++++++++++++
 10 files changed, 1115 insertions(+), 912 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/747c2260/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
deleted file mode 100644
index cf9774c..0000000
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/FullUpdateInitializer.java
+++ /dev/null
@@ -1,454 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.hdfs;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Manage fetching full snapshot from HMS.
- * Snapshot is represented as a map from the hive object name to
- * the set of paths for this object.
- * The hive object name is either the Hive database name or
- * Hive database name joined with Hive table name as {@code dbName.tableName}.
- * All table partitions are stored under the table object.
- * <p>
- * Once {@link FullUpdateInitializer}, the {@link FullUpdateInitializer#getFullHMSSnapshot()}
- * method should be called to get the initial update.
- * <p>
- * It is important to close the {@link FullUpdateInitializer} object to prevent resource
- * leaks.
- * <p>
- * The usual way of using {@link FullUpdateInitializer} is
- * <pre>
- * {@code
- * try (FullUpdateInitializer updateInitializer =
- *      new FullUpdateInitializer(client, authzConf)) {
- *         Map<String, Set<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot();
- *      return pathsUpdate;
- * }
- */
-public final class FullUpdateInitializer implements AutoCloseable {
-
-  /*
-   * Implementation note.
-   *
-   * The snapshot is obtained using an executor. We follow the map/reduce model.
-   * Each executor thread (mapper) obtains and returns a partial snapshot which are then
-   * reduced to a single combined snapshot by getFullHMSSnapshot().
-   *
-   * Synchronization between the getFullHMSSnapshot() and executors is done using the
-   * 'results' queue. The queue holds the futures for each scheduled task.
-   * It is initially populated by getFullHMSSnapshot and each task may add new future
-   * results to it. Only getFullHMSSnapshot() removes entries from the results queue.
-   * This guarantees that once the results queue is empty there are no pending jobs.
-   *
-   * Since there are no other data sharing, the implementation is safe without
-   * any other synchronization. It is not thread-safe for concurrent calls
-   * to getFullHMSSnapshot().
-   *
-   */
-
-  private final ExecutorService threadPool;
-  private final HiveMetaStoreClient client;
-  private final int maxPartitionsPerCall;
-  private final int maxTablesPerCall;
-  private final Deque<Future<CallResult>> results = new ConcurrentLinkedDeque<>();
-  private final int maxRetries;
-  private final int waitDurationMillis;
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(FullUpdateInitializer.class);
-
-  private static final ObjectMapping emptyObjectMapping =
-          new ObjectMapping(Collections.<String, Set<String>>emptyMap());
-
-  /**
-   * Extract path (not starting with "/") from the full URI
-   * @param uri - resource URI (usually with scheme)
-   * @return path if uri is valid or null
-   */
-  private static String pathFromURI(String uri) {
-    try {
-      return PathsUpdate.parsePath(uri);
-    } catch (SentryMalformedPathException e) {
-      LOGGER.warn(String.format("Ignoring invalid uri %s: %s",
-              uri, e.getReason()));
-      return null;
-    }
-  }
-
-  /**
-   * Mapping of object to set of paths.
-   * Used to represent partial results from executor threads. Multiple
-   * ObjectMapping objects are combined in a single mapping
-   * to get the final result.
-   */
-  private static final class ObjectMapping {
-    private final Map<String, Set<String>> objects;
-
-    ObjectMapping(Map<String, Set<String>> objects) {
-      this.objects = objects;
-    }
-
-    ObjectMapping(String authObject, String path) {
-      Set<String> values = Collections.singleton(safeIntern(path));
-      objects = ImmutableMap.of(authObject, values);
-    }
-
-    ObjectMapping(String authObject, Collection<String> paths) {
-      Set<String> values = new HashSet<>(paths);
-      objects = ImmutableMap.of(authObject, values);
-    }
-
-    Map<String, Set<String>> getObjects() {
-      return objects;
-    }
-  }
-
-  private static final class CallResult {
-    private final Exception failure;
-    private final boolean successStatus;
-    private final ObjectMapping objectMapping;
-
-    CallResult(Exception ex) {
-      failure = ex;
-      successStatus = false;
-      objectMapping = emptyObjectMapping;
-    }
-
-    CallResult(ObjectMapping objectMapping) {
-      failure = null;
-      successStatus = true;
-      this.objectMapping = objectMapping;
-    }
-
-    boolean success() {
-      return successStatus;
-    }
-
-    ObjectMapping getObjectMapping() {
-      return objectMapping;
-    }
-
-    public Exception getFailure() {
-      return failure;
-    }
-  }
-
-  private abstract class BaseTask implements Callable<CallResult> {
-
-    /**
-     *  Class represents retry strategy for BaseTask.
-     */
-    private final class RetryStrategy {
-      private int retryStrategyMaxRetries = 0;
-      private final int retryStrategyWaitDurationMillis;
-
-      private RetryStrategy(int retryStrategyMaxRetries, int retryStrategyWaitDurationMillis) {
-        this.retryStrategyMaxRetries = retryStrategyMaxRetries;
-
-        // Assign default wait duration if negative value is provided.
-        this.retryStrategyWaitDurationMillis = (retryStrategyWaitDurationMillis > 0) ?
-                retryStrategyWaitDurationMillis : 1000;
-      }
-
-      @SuppressWarnings({"squid:S1141", "squid:S2142"})
-      public CallResult exec()  {
-        // Retry logic is happening inside callable/task to avoid
-        // synchronous waiting on getting the result.
-        // Retry the failure task until reach the max retry number.
-        // Wait configurable duration for next retry.
-        //
-        // Only thrift exceptions are retried.
-        // Other exceptions are propagated up the stack.
-        Exception exception = null;
-        try {
-          // We catch all exceptions except Thrift exceptions which are retried
-          for (int i = 0; i < retryStrategyMaxRetries; i++) {
-            //noinspection NestedTryStatement
-            try {
-              return new CallResult(doTask());
-            } catch (TException ex) {
-              LOGGER.debug("Failed to execute task on " + (i + 1) + " attempts." +
-                      " Sleeping for " + retryStrategyWaitDurationMillis + " ms. Exception: " +
-                      ex.toString(), ex);
-              exception = ex;
-
-              try {
-                Thread.sleep(retryStrategyWaitDurationMillis);
-              } catch (InterruptedException ignored) {
-                // Skip the rest retries if get InterruptedException.
-                // And set the corresponding retries number.
-                LOGGER.warn("Interrupted during update fetch during iteration " + (i + 1));
-                break;
-              }
-            }
-          }
-        } catch (Exception ex) {
-          exception = ex;
-        }
-        LOGGER.error("Failed to execute task", exception);
-        // We will fail in the end, so we are shutting down the pool to prevent
-        // new tasks from being scheduled.
-        threadPool.shutdown();
-        return new CallResult(exception);
-      }
-    }
-
-    private final RetryStrategy retryStrategy;
-
-    BaseTask() {
-      retryStrategy = new RetryStrategy(maxRetries, waitDurationMillis);
-    }
-
-    @Override
-    public CallResult call() throws Exception {
-      return retryStrategy.exec();
-    }
-
-    abstract ObjectMapping doTask() throws TException;
-  }
-
-  private class PartitionTask extends BaseTask {
-    private final String dbName;
-    private final String tblName;
-    private final String authName;
-    private final List<String> partNames;
-
-    PartitionTask(String dbName, String tblName, String authName,
-                  List<String> partNames) {
-      this.dbName = safeIntern(dbName);
-      this.tblName = safeIntern(tblName);
-      this.authName = safeIntern(authName);
-      this.partNames = partNames;
-    }
-
-    @Override
-    ObjectMapping doTask() throws TException {
-      List<Partition> tblParts = client.getPartitionsByNames(dbName, tblName, partNames);
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("#### Fetching partitions " +
-                "[" + dbName + "." + tblName + "]" + "[" + partNames + "]");
-      }
-      Collection<String> partitionNames = new ArrayList<>(tblParts.size());
-      for (Partition part : tblParts) {
-        String partPath = pathFromURI(part.getSd().getLocation());
-        if (partPath != null) {
-          partitionNames.add(partPath.intern());
-        }
-      }
-      return new ObjectMapping(authName, partitionNames);
-    }
-  }
-
-  private class TableTask extends BaseTask {
-    private final String dbName;
-    private final List<String> tableNames;
-
-    TableTask(Database db, List<String> tableNames) {
-      dbName = safeIntern(db.getName());
-      this.tableNames = tableNames;
-    }
-
-    @Override
-    @SuppressWarnings({"squid:S2629", "squid:S135"})
-    ObjectMapping doTask() throws TException {
-      List<Table> tables = client.getTableObjectsByName(dbName, tableNames);
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("#### Fetching tables [" + dbName + "][" +
-        tableNames + "]");
-      }
-      Map<String, Set<String>> objectMapping = new HashMap<>(tables.size());
-      for (Table tbl : tables) {
-        // Table names are case insensitive
-        if (!tbl.getDbName().equalsIgnoreCase(dbName)) {
-          // Inconsistency in HMS data
-          LOGGER.warn(String.format("DB name %s for table %s does not match %s",
-                  tbl.getDbName(), tbl.getTableName(), dbName));
-          continue;
-        }
-
-        String tableName = safeIntern(tbl.getTableName().toLowerCase());
-        String authzObject = (dbName + "." + tableName).intern();
-        List<String> tblPartNames = client.listPartitionNames(dbName, tableName, (short) -1);
-        for (int i = 0; i < tblPartNames.size(); i += maxPartitionsPerCall) {
-          List<String> partsToFetch = tblPartNames.subList(i,
-                  Math.min(i + maxPartitionsPerCall, tblPartNames.size()));
-          Callable<CallResult> partTask = new PartitionTask(dbName,
-                  tableName, authzObject, partsToFetch);
-          results.add(threadPool.submit(partTask));
-        }
-        String tblPath = safeIntern(pathFromURI(tbl.getSd().getLocation()));
-        if (tblPath == null) {
-          continue;
-        }
-        Set<String> paths = objectMapping.get(authzObject);
-        if (paths == null) {
-          paths = new HashSet<>(1);
-          objectMapping.put(authzObject, paths);
-        }
-        paths.add(tblPath);
-      }
-      return new ObjectMapping(Collections.unmodifiableMap(objectMapping));
-    }
-  }
-
-  private class DbTask extends BaseTask {
-
-    private final String dbName;
-
-    DbTask(String dbName) {
-      //Database names are case insensitive
-      this.dbName = safeIntern(dbName.toLowerCase());
-    }
-
-    @Override
-    ObjectMapping doTask() throws TException {
-      Database db = client.getDatabase(dbName);
-      if (!dbName.equalsIgnoreCase(db.getName())) {
-        LOGGER.warn("Database name {} does not match {}", db.getName(), dbName);
-        return emptyObjectMapping;
-      }
-      List<String> allTblStr = client.getAllTables(dbName);
-      for (int i = 0; i < allTblStr.size(); i += maxTablesPerCall) {
-        List<String> tablesToFetch = allTblStr.subList(i,
-                Math.min(i + maxTablesPerCall, allTblStr.size()));
-        Callable<CallResult> tableTask = new TableTask(db, tablesToFetch);
-        results.add(threadPool.submit(tableTask));
-      }
-      String dbPath = safeIntern(pathFromURI(db.getLocationUri()));
-      return (dbPath != null) ? new ObjectMapping(dbName, dbPath) :
-              emptyObjectMapping;
-    }
-  }
-
-  public FullUpdateInitializer(HiveMetaStoreClient client, Configuration conf) {
-    this.client = client;
-    maxPartitionsPerCall = conf.getInt(
-        ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC,
-        ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC_DEFAULT);
-    maxTablesPerCall = conf.getInt(
-        ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC,
-        ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC_DEFAULT);
-    maxRetries = conf.getInt(
-            ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM,
-            ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM_DEFAULT);
-    waitDurationMillis = conf.getInt(
-            ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS,
-            ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS_DEFAULT);
-    threadPool = Executors.newFixedThreadPool(conf.getInt(
-        ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS,
-        ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS_DEFAULT));
-  }
-
-  /**
-   * Get Full HMS snapshot.
-   * @return Full snapshot of HMS objects.
-   * @throws TException if Thrift error occured
-   * @throws ExecutionException if there was a scheduling error
-   * @throws InterruptedException if processing was interrupted
-   */
-  @SuppressWarnings("squid:S00112")
-  public Map<String, Set<String>> getFullHMSSnapshot()
-          throws Exception {
-    // Get list of all HMS databases
-    List<String> allDbStr = client.getAllDatabases();
-    // Schedule async task for each database responsible for fetching per-database
-    // objects.
-    for (String dbName : allDbStr) {
-      results.add(threadPool.submit(new DbTask(dbName)));
-    }
-
-    // Resulting full snapshot
-    Map<String, Set<String>> fullSnapshot = new HashMap<>();
-
-    // As async tasks complete, merge their results into full snapshot.
-    while (!results.isEmpty()) {
-      // This is the only thread that takes elements off the results list - all other threads
-      // only add to it. Once the list is empty it can't become non-empty
-      // This means that if we check that results is non-empty we can safely call pop() and
-      // know that the result of poll() is not null.
-      Future<CallResult> result = results.pop();
-      // Wait for the task to complete
-      CallResult callResult = result.get();
-      // Fail if we got errors
-      if (!callResult.success()) {
-        throw callResult.getFailure();
-      }
-      // Merge values into fullUpdate
-      Map<String, Set<String>> objectMapping =
-              callResult.getObjectMapping().getObjects();
-      for (Map.Entry<String, Set<String>> entry: objectMapping.entrySet()) {
-        String key = entry.getKey();
-        Set<String> val = entry.getValue();
-        Set<String> existingSet = fullSnapshot.get(key);
-        if (existingSet == null) {
-          fullSnapshot.put(key, val);
-          continue;
-        }
-        existingSet.addAll(val);
-      }
-    }
-    return fullSnapshot;
-  }
-
-  @Override
-  public void close() {
-    threadPool.shutdownNow();
-    try {
-      threadPool.awaitTermination(1, TimeUnit.SECONDS);
-    } catch (InterruptedException ignored) {
-      LOGGER.warn("Interrupted shutdown");
-      Thread.currentThread().interrupt();
-    }
-  }
-
-  /**
-   * Intern a string but only if it is not null
-   * @param arg String to be interned, may be null
-   * @return interned string or null
-   */
-  static String safeIntern(String arg) {
-    return (arg != null) ? arg.intern() : null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/747c2260/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java b/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java
deleted file mode 100644
index 389e9b8..0000000
--- a/sentry-hdfs/sentry-hdfs-common/src/test/java/org/apache/sentry/hdfs/TestFullUpdateInitializer.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.hdfs;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.thrift.TException;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class TestFullUpdateInitializer {
-
-  private static Configuration conf = new Configuration();
-
-  static {
-    conf.setInt(ServiceConstants.ServerConfig
-            .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC, 1);
-    conf.setInt(ServiceConstants.ServerConfig
-            .SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC, 1);
-    conf.setInt(ServiceConstants.ServerConfig
-            .SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS, 8);
-  }
-
-  /**
-   * Representation of a Hive table. A table has a name and a list of partitions.
-   */
-  private static class HiveTable {
-    String name;
-    List<String> partitions;
-
-    HiveTable(String name) {
-      this.name = name;
-      this.partitions = new ArrayList<>();
-    }
-
-    HiveTable(String name, List<String> partitions) {
-      this.name = name;
-      this.partitions = partitions;
-      if (this.partitions == null) {
-        this.partitions = new ArrayList<>();
-      }
-    }
-
-    HiveTable add(String partition) {
-      partitions.add(partition);
-      return this;
-    }
-  }
-
-  /**
-   * Representation of a Hive database. A database has a name and a list of tables
-   */
-  private static class HiveDb {
-    String name;
-    Collection<HiveTable> tables;
-
-    HiveDb(String name) {
-      this.name = name;
-      tables = new ArrayList<>();
-    }
-
-    HiveDb(String name, Collection<HiveTable> tables) {
-      this.name = name;
-      this.tables = tables;
-      if (this.tables == null) {
-        this.tables = new ArrayList<>();
-      }
-    }
-
-    void add(HiveTable table) {
-      this.tables.add(table);
-    }
-  }
-
-  /**
-   * Representation of a full Hive snapshot. A snapshot is collection of databases
-   */
-  private static class HiveSnapshot {
-    List<HiveDb> databases = new ArrayList<>();
-
-    HiveSnapshot() {
-    }
-
-    HiveSnapshot(Collection<HiveDb> dblist) {
-      if (dblist != null) {
-        databases.addAll(dblist);
-      }
-    }
-
-    HiveSnapshot add(HiveDb db) {
-      this.databases.add(db);
-      return this;
-    }
-  }
-
-  /**
-   * Convert Hive snapshot to mock client that will return proper values
-   * for the snapshot.
-   */
-  private static class MockClient {
-    HiveMetaStoreClient client;
-
-    MockClient(HiveSnapshot snapshot) throws TException {
-      client = Mockito.mock(HiveMetaStoreClient.class);
-      List<String> dbNames = new ArrayList<>(snapshot.databases.size());
-      // Walk over all databases and mock appropriate objects
-      for (HiveDb mdb: snapshot.databases) {
-        String dbName = mdb.name;
-        dbNames.add(dbName);
-        Database db = makeDb(dbName);
-        Mockito.when(client.getDatabase(dbName)).thenReturn(db);
-        List<String> tableNames = new ArrayList<>(mdb.tables.size());
-        // Walk over all tables for the database and mock appropriate objects
-        for (HiveTable table: mdb.tables) {
-          String tableName = table.name;
-          tableNames.add(tableName);
-          Table mockTable = makeTable(dbName, tableName);
-          Mockito.when(client.getTableObjectsByName(dbName,
-                  Lists.newArrayList(tableName)))
-                  .thenReturn(Lists.newArrayList(mockTable));
-          Mockito.when(client.listPartitionNames(dbName, tableName, (short) -1))
-                  .thenReturn(table.partitions);
-          // Walk across all partitions and mock appropriate objects
-          for (String partName: table.partitions) {
-            Partition p = makePartition(dbName, tableName, partName);
-            Mockito.when(client.getPartitionsByNames(dbName, tableName,
-                    Lists.<String>newArrayList(partName)))
-                    .thenReturn(Lists.<Partition>newArrayList(p));
-          }
-        }
-        Mockito.when(client.getAllTables(dbName)).thenReturn(tableNames);
-      }
-      // Return all database names
-      Mockito.when(client.getAllDatabases()).thenReturn(dbNames);
-    }
-  }
-
-  /**
-   * Create mock database with the given name
-   * @param name Database name
-   * @return Mock database object
-   */
-  private static Database makeDb(String name) {
-    Database db = Mockito.mock(Database.class);
-    Mockito.when(db.getName()).thenReturn(name);
-    Mockito.when(db.getLocationUri()).thenReturn("hdfs:///" + name);
-    return db;
-  }
-
-  /**
-   * Create mock table
-   * @param dbName db for this table
-   * @param tableName name of the table
-   * @return mock table object
-   */
-  private static Table makeTable(String dbName, String tableName) {
-    Table table = Mockito.mock(Table.class);
-    Mockito.when(table.getDbName()).thenReturn(dbName);
-    Mockito.when(table.getTableName()).thenReturn(tableName);
-    StorageDescriptor sd = Mockito.mock(StorageDescriptor.class);
-    Mockito.when(sd.getLocation()).thenReturn(
-            String.format("hdfs:///%s/%s", dbName, tableName));
-    Mockito.when(table.getSd()).thenReturn(sd);
-    return table;
-  }
-
-  /**
-   * Create mock partition
-   * @param dbName database for this partition
-   * @param tableName table for this partition
-   * @param partName partition name
-   * @return mock partition object
-   */
-  private static Partition makePartition(String dbName, String tableName, String partName) {
-    Partition partition = Mockito.mock(Partition.class);
-    StorageDescriptor sd = Mockito.mock(StorageDescriptor.class);
-    Mockito.when(sd.getLocation()).thenReturn(
-            String.format("hdfs:///%s/%s/%s", dbName, tableName, partName));
-    Mockito.when(partition.getSd()).thenReturn(sd);
-    return partition;
-  }
-
-  @Test
-  // Test basic operation with small database
-  public void testSimple() throws Exception {
-    HiveTable tab21 = new HiveTable("tab21");
-    HiveTable tab31 = new HiveTable("tab31").add("part311").add("part312");
-    HiveDb db3 = new HiveDb("db3", Lists.newArrayList(tab31));
-    HiveDb db2 = new HiveDb("db2", Lists.newArrayList(tab21));
-    HiveDb db1 = new HiveDb("db1");
-    HiveSnapshot snap = new HiveSnapshot().add(db1).add(db2).add(db3);
-    MockClient c = new MockClient(snap);
-
-    Map<String, Set<String>> update;
-    try(FullUpdateInitializer cacheInitializer = new FullUpdateInitializer(c.client, conf)) {
-      update = cacheInitializer.getFullHMSSnapshot();
-    }
-    Assert.assertEquals(5, update.size());
-    Assert.assertEquals(Sets.newHashSet("db1"), update.get("db1"));
-    Assert.assertEquals(Sets.newHashSet("db2"), update.get("db2"));
-    Assert.assertEquals(Sets.newHashSet("db3"), update.get("db3"));
-    Assert.assertEquals(Sets.newHashSet("db2/tab21"), update.get("db2.tab21"));
-    Assert.assertEquals(Sets.newHashSet("db3/tab31",
-            "db3/tab31/part311", "db3/tab31/part312"), update.get("db3.tab31"));
-  }
-
-  @Test
-  // Test that invalid paths are handled correctly
-  public void testInvalidPaths() throws Exception {
-    //Set up mocks: db1.tb1, with tb1 returning a wrong dbname (db2)
-    Database db1 = makeDb("db1");
-
-    Table tab1 = Mockito.mock(Table.class);
-    //Return a wrong db name, so that this triggers an exception
-    Mockito.when(tab1.getDbName()).thenReturn("db2");
-    Mockito.when(tab1.getTableName()).thenReturn("tab1");
-
-    HiveMetaStoreClient client = Mockito.mock(HiveMetaStoreClient.class);
-    Mockito.when(client.getAllDatabases()).thenReturn(Lists.newArrayList("db1"));
-    Mockito.when(client.getDatabase("db1")).thenReturn(db1);
-
-    Table tab12 = Mockito.mock(Table.class);
-    Mockito.when(tab12.getDbName()).thenReturn("db1");
-    Mockito.when(tab12.getTableName()).thenReturn("tab21");
-    StorageDescriptor sd21 = Mockito.mock(StorageDescriptor.class);
-    Mockito.when(sd21.getLocation()).thenReturn("hdfs:///db1/tab21");
-    Mockito.when(tab12.getSd()).thenReturn(sd21);
-
-    Mockito.when(client.getTableObjectsByName("db1",
-            Lists.newArrayList("tab1"))).thenReturn(Lists.newArrayList(tab1));
-    Mockito.when(client.getTableObjectsByName("db1",
-            Lists.newArrayList("tab12"))).thenReturn(Lists.newArrayList(tab12));
-    Mockito.when(client.getAllTables("db1")).
-            thenReturn(Lists.newArrayList("tab1", "tab12"));
-
-
-    Map<String, Set<String>> update;
-    try(FullUpdateInitializer cacheInitializer = new FullUpdateInitializer(client, conf)) {
-      update = cacheInitializer.getFullHMSSnapshot();
-    }
-    Assert.assertEquals(2, update.size());
-    Assert.assertEquals(Sets.newHashSet("db1"), update.get("db1"));
-    Assert.assertEquals(Sets.newHashSet("db1/tab21"), update.get("db1.tab21"));
-  }
-
-  @Test
-  // Test handling of a big tables and partitions
-  public void testBig() throws Exception {
-    int ndbs = 3;
-    int ntables = 51;
-    int nparts = 131;
-
-    HiveSnapshot snap = new HiveSnapshot();
-
-    for (int i = 0; i < ndbs; i++) {
-      HiveDb db = new HiveDb("db" + i);
-      for (int j = 0; j < ntables; j++) {
-        HiveTable table = new HiveTable("table" + i + j);
-        for (int k = 0; k < nparts; k++) {
-          table.add("part" + i + j + k);
-        }
-        db.add(table);
-      }
-      snap.add(db);
-    }
-    MockClient c = new MockClient(snap);
-    Map<String, Set<String>> update;
-    try(FullUpdateInitializer cacheInitializer = new FullUpdateInitializer(c.client, conf)) {
-      update = cacheInitializer.getFullHMSSnapshot();
-    }
-    Assert.assertEquals((ntables * ndbs) + ndbs, update.size());
-    for (int i = 0; i < ndbs; i++) {
-      String dbName = "db" + i;
-      Assert.assertEquals(Sets.newHashSet(dbName), update.get(dbName));
-
-      for (int j = 0; j < ntables; j++) {
-        String tableName = "table" + i + j;
-        Set<String> values = new HashSet<>();
-        values.add(String.format("%s/%s", dbName, tableName));
-        for (int k = 0; k < nparts; k++) {
-          String partName = "part" + i + j + k;
-          values.add(String.format("%s/%s/%s", dbName, tableName, partName));
-        }
-        String authz = dbName + "." + tableName;
-        Assert.assertEquals(values, update.get(authz));
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/747c2260/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java
new file mode 100644
index 0000000..1490581
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java
@@ -0,0 +1,492 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.service.thrift;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.sentry.hdfs.PathsUpdate;
+import org.apache.sentry.hdfs.SentryMalformedPathException;
+import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Manage fetching full snapshot from HMS.
+ * Snapshot is represented as a map from the hive object name to
+ * the set of paths for this object.
+ * The hive object name is either the Hive database name or
+ * Hive database name joined with Hive table name as {@code dbName.tableName}.
+ * All table partitions are stored under the table object.
+ * <p>
+ * Once {@link FullUpdateInitializer}, the {@link FullUpdateInitializer#getFullHMSSnapshot()}
+ * method should be called to get the initial update.
+ * <p>
+ * It is important to close the {@link FullUpdateInitializer} object to prevent resource
+ * leaks.
+ * <p>
+ * The usual way of using {@link FullUpdateInitializer} is
+ * <pre>
+ * {@code
+ * try (FullUpdateInitializer updateInitializer =
+ *      new FullUpdateInitializer(clientFactory, authzConf)) {
+ *         Map<String, Set<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot();
+ *      return pathsUpdate;
+ * }
+ */
+public final class FullUpdateInitializer implements AutoCloseable {
+
+  /*
+   * Implementation note.
+   *
+   * The snapshot is obtained using an executor. We follow the map/reduce model.
+   * Each executor thread (mapper) obtains and returns a partial snapshot which are then
+   * reduced to a single combined snapshot by getFullHMSSnapshot().
+   *
+   * Synchronization between the getFullHMSSnapshot() and executors is done using the
+   * 'results' queue. The queue holds the futures for each scheduled task.
+   * It is initially populated by getFullHMSSnapshot and each task may add new future
+   * results to it. Only getFullHMSSnapshot() removes entries from the results queue.
+   * This guarantees that once the results queue is empty there are no pending jobs.
+   *
+   * Since there are no other data sharing, the implementation is safe without
+   * any other synchronization. It is not thread-safe for concurrent calls
+   * to getFullHMSSnapshot().
+   *
+   */
+
+  private final ExecutorService threadPool;
+  private final int maxPartitionsPerCall;
+  private final int maxTablesPerCall;
+  private final Deque<Future<CallResult>> results = new ConcurrentLinkedDeque<>();
+  private final int maxRetries;
+  private final int waitDurationMillis;
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(FullUpdateInitializer.class);
+
+  private static final ObjectMapping emptyObjectMapping =
+          new ObjectMapping(Collections.<String, Set<String>>emptyMap());
+  private final HiveConnectionFactory clientFactory;
+
+  /**
+   * Extract path (not starting with "/") from the full URI
+   * @param uri - resource URI (usually with scheme)
+   * @return path if uri is valid or null
+   */
+  private static String pathFromURI(String uri) {
+    try {
+      return PathsUpdate.parsePath(uri);
+    } catch (SentryMalformedPathException e) {
+      LOGGER.warn(String.format("Ignoring invalid uri %s: %s",
+              uri, e.getReason()));
+      return null;
+    }
+  }
+
+  /**
+   * Mapping of object to set of paths.
+   * Used to represent partial results from executor threads. Multiple
+   * ObjectMapping objects are combined in a single mapping
+   * to get the final result.
+   */
+  private static final class ObjectMapping {
+    private final Map<String, Set<String>> objects;
+
+    ObjectMapping(Map<String, Set<String>> objects) {
+      this.objects = objects;
+    }
+
+    ObjectMapping(String authObject, String path) {
+      Set<String> values = Collections.singleton(safeIntern(path));
+      objects = ImmutableMap.of(authObject, values);
+    }
+
+    ObjectMapping(String authObject, Collection<String> paths) {
+      Set<String> values = new HashSet<>(paths);
+      objects = ImmutableMap.of(authObject, values);
+    }
+
+    Map<String, Set<String>> getObjects() {
+      return objects;
+    }
+  }
+
+  private static final class CallResult {
+    private final Exception failure;
+    private final boolean successStatus;
+    private final ObjectMapping objectMapping;
+
+    CallResult(Exception ex) {
+      failure = ex;
+      successStatus = false;
+      objectMapping = emptyObjectMapping;
+    }
+
+    CallResult(ObjectMapping objectMapping) {
+      failure = null;
+      successStatus = true;
+      this.objectMapping = objectMapping;
+    }
+
+    boolean success() {
+      return successStatus;
+    }
+
+    ObjectMapping getObjectMapping() {
+      return objectMapping;
+    }
+
+    public Exception getFailure() {
+      return failure;
+    }
+  }
+
+  private abstract class BaseTask implements Callable<CallResult> {
+
+    /**
+     *  Class represents retry strategy for BaseTask.
+     */
+    private final class RetryStrategy {
+      private int retryStrategyMaxRetries = 0;
+      private final int retryStrategyWaitDurationMillis;
+
+      private RetryStrategy(int retryStrategyMaxRetries, int retryStrategyWaitDurationMillis) {
+        this.retryStrategyMaxRetries = retryStrategyMaxRetries;
+
+        // Assign default wait duration if negative value is provided.
+        this.retryStrategyWaitDurationMillis = (retryStrategyWaitDurationMillis > 0) ?
+                retryStrategyWaitDurationMillis : 1000;
+      }
+
+      @SuppressWarnings({"squid:S1141", "squid:S2142"})
+      public CallResult exec()  {
+        // Retry logic is happening inside callable/task to avoid
+        // synchronous waiting on getting the result.
+        // Retry the failure task until reach the max retry number.
+        // Wait configurable duration for next retry.
+        //
+        // Only thrift exceptions are retried.
+        // Other exceptions are propagated up the stack.
+        Exception exception = null;
+        try {
+          // We catch all exceptions except Thrift exceptions which are retried
+          for (int i = 0; i < retryStrategyMaxRetries; i++) {
+            //noinspection NestedTryStatement
+            try {
+              return new CallResult(doTask());
+            } catch (TException ex) {
+              LOGGER.debug("Failed to execute task on " + (i + 1) + " attempts." +
+                      " Sleeping for " + retryStrategyWaitDurationMillis + " ms. Exception: " +
+                      ex.toString(), ex);
+              exception = ex;
+
+              try {
+                Thread.sleep(retryStrategyWaitDurationMillis);
+              } catch (InterruptedException ignored) {
+                // Skip the rest retries if get InterruptedException.
+                // And set the corresponding retries number.
+                LOGGER.warn("Interrupted during update fetch during iteration " + (i + 1));
+                break;
+              }
+            }
+          }
+        } catch (Exception ex) {
+          exception = ex;
+        }
+        LOGGER.error("Failed to execute task", exception);
+        // We will fail in the end, so we are shutting down the pool to prevent
+        // new tasks from being scheduled.
+        threadPool.shutdown();
+        return new CallResult(exception);
+      }
+    }
+
+    private final RetryStrategy retryStrategy;
+
+    BaseTask() {
+      retryStrategy = new RetryStrategy(maxRetries, waitDurationMillis);
+    }
+
+    @Override
+    public CallResult call() throws Exception {
+      return retryStrategy.exec();
+    }
+
+    abstract ObjectMapping doTask() throws Exception;
+  }
+
+  private class PartitionTask extends BaseTask {
+    private final String dbName;
+    private final String tblName;
+    private final String authName;
+    private final List<String> partNames;
+
+    PartitionTask(String dbName, String tblName, String authName,
+                  List<String> partNames) {
+      this.dbName = safeIntern(dbName);
+      this.tblName = safeIntern(tblName);
+      this.authName = safeIntern(authName);
+      this.partNames = partNames;
+    }
+
+    @Override
+    ObjectMapping doTask() throws Exception {
+      List<Partition> tblParts;
+      HMSClient c = null;
+      try (HMSClient client = clientFactory.connect()) {
+        c = client;
+        tblParts = client.getClient().getPartitionsByNames(dbName, tblName, partNames);
+      } catch (Exception e) {
+        if (c != null) {
+          c.invalidate();
+        }
+        throw e;
+      }
+
+      LOGGER.debug("Fetched partitions for db = {}, table = {}",
+              dbName, tblName);
+
+      Collection<String> partitionNames = new ArrayList<>(tblParts.size());
+      for (Partition part : tblParts) {
+        String partPath = pathFromURI(part.getSd().getLocation());
+        if (partPath != null) {
+          partitionNames.add(partPath.intern());
+        }
+      }
+      return new ObjectMapping(authName, partitionNames);
+    }
+  }
+
+  private class TableTask extends BaseTask {
+    private final String dbName;
+    private final List<String> tableNames;
+
+    TableTask(Database db, List<String> tableNames) {
+      dbName = safeIntern(db.getName());
+      this.tableNames = tableNames;
+    }
+
+    @Override
+    @SuppressWarnings({"squid:S2629", "squid:S135"})
+    ObjectMapping doTask() throws Exception {
+      HMSClient c = null;
+      try (HMSClient client = clientFactory.connect()) {
+        c = client;
+        List<Table> tables = client.getClient().getTableObjectsByName(dbName, tableNames);
+
+        LOGGER.debug("Fetching tables for db = {}, tables = {}", dbName, tableNames);
+
+        Map<String, Set<String>> objectMapping = new HashMap<>(tables.size());
+        for (Table tbl : tables) {
+          // Table names are case insensitive
+          if (!tbl.getDbName().equalsIgnoreCase(dbName)) {
+            // Inconsistency in HMS data
+            LOGGER.warn(String.format("DB name %s for table %s does not match %s",
+                    tbl.getDbName(), tbl.getTableName(), dbName));
+            continue;
+          }
+
+          String tableName = safeIntern(tbl.getTableName().toLowerCase());
+          String authzObject = (dbName + "." + tableName).intern();
+          List<String> tblPartNames = client.getClient().listPartitionNames(dbName, tableName, (short) -1);
+          for (int i = 0; i < tblPartNames.size(); i += maxPartitionsPerCall) {
+            List<String> partsToFetch = tblPartNames.subList(i,
+                    Math.min(i + maxPartitionsPerCall, tblPartNames.size()));
+            Callable<CallResult> partTask = new PartitionTask(dbName,
+                    tableName, authzObject, partsToFetch);
+            results.add(threadPool.submit(partTask));
+          }
+          String tblPath = safeIntern(pathFromURI(tbl.getSd().getLocation()));
+          if (tblPath == null) {
+            continue;
+          }
+          Set<String> paths = objectMapping.get(authzObject);
+          if (paths == null) {
+            paths = new HashSet<>(1);
+            objectMapping.put(authzObject, paths);
+          }
+          paths.add(tblPath);
+        }
+        return new ObjectMapping(Collections.unmodifiableMap(objectMapping));
+      } catch (Exception e) {
+        if (c != null) {
+          c.invalidate();
+        }
+        throw e;
+      }
+    }
+  }
+
+  private class DbTask extends BaseTask {
+
+    private final String dbName;
+
+    DbTask(String dbName) {
+      //Database names are case insensitive
+      this.dbName = safeIntern(dbName.toLowerCase());
+    }
+
+    @Override
+    ObjectMapping doTask() throws Exception {
+      HMSClient c = null;
+      try (HMSClient client = clientFactory.connect()) {
+        c = client;
+        Database db = client.getClient().getDatabase(dbName);
+        if (!dbName.equalsIgnoreCase(db.getName())) {
+          LOGGER.warn("Database name {} does not match {}", db.getName(), dbName);
+          return emptyObjectMapping;
+        }
+        List<String> allTblStr = client.getClient().getAllTables(dbName);
+        for (int i = 0; i < allTblStr.size(); i += maxTablesPerCall) {
+          List<String> tablesToFetch = allTblStr.subList(i,
+                  Math.min(i + maxTablesPerCall, allTblStr.size()));
+          Callable<CallResult> tableTask = new TableTask(db, tablesToFetch);
+          results.add(threadPool.submit(tableTask));
+        }
+        String dbPath = safeIntern(pathFromURI(db.getLocationUri()));
+        return (dbPath != null) ? new ObjectMapping(dbName, dbPath) :
+                emptyObjectMapping;
+      } catch (Exception e) {
+        if (c != null) {
+          c.invalidate();
+        }
+        throw e;
+      }
+    }
+  }
+
+  FullUpdateInitializer(HiveConnectionFactory clientFactory, Configuration conf) {
+    this.clientFactory = clientFactory;
+    maxPartitionsPerCall = conf.getInt(
+        ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC,
+        ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC_DEFAULT);
+    maxTablesPerCall = conf.getInt(
+        ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC,
+        ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC_DEFAULT);
+    maxRetries = conf.getInt(
+            ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM,
+            ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_MAX_NUM_DEFAULT);
+    waitDurationMillis = conf.getInt(
+            ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS,
+            ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS_DEFAULT);
+    threadPool = Executors.newFixedThreadPool(conf.getInt(
+        ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS,
+        ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_INIT_THREADS_DEFAULT));
+  }
+
+  /**
+   * Get Full HMS snapshot.
+   * @return Full snapshot of HMS objects.
+   * @throws TException if Thrift error occured
+   * @throws ExecutionException if there was a scheduling error
+   * @throws InterruptedException if processing was interrupted
+   */
+  @SuppressWarnings("squid:S00112")
+  Map<String, Set<String>> getFullHMSSnapshot() throws Exception {
+    // Get list of all HMS databases
+    List<String> allDbStr;
+    HMSClient c = null;
+    try (HMSClient client = clientFactory.connect()) {
+      c = client;
+      allDbStr = client.getClient().getAllDatabases();
+    } catch (Exception e) {
+      if (c != null) {
+        c.invalidate();
+      }
+      throw e;
+    }
+
+    // Schedule async task for each database responsible for fetching per-database
+    // objects.
+    for (String dbName : allDbStr) {
+      results.add(threadPool.submit(new DbTask(dbName)));
+    }
+
+    // Resulting full snapshot
+    Map<String, Set<String>> fullSnapshot = new HashMap<>();
+
+    // As async tasks complete, merge their results into full snapshot.
+    while (!results.isEmpty()) {
+      // This is the only thread that takes elements off the results list - all other threads
+      // only add to it. Once the list is empty it can't become non-empty
+      // This means that if we check that results is non-empty we can safely call pop() and
+      // know that the result of poll() is not null.
+      Future<CallResult> result = results.pop();
+      // Wait for the task to complete
+      CallResult callResult = result.get();
+      // Fail if we got errors
+      if (!callResult.success()) {
+        throw callResult.getFailure();
+      }
+      // Merge values into fullUpdate
+      Map<String, Set<String>> objectMapping =
+              callResult.getObjectMapping().getObjects();
+      for (Map.Entry<String, Set<String>> entry: objectMapping.entrySet()) {
+        String key = entry.getKey();
+        Set<String> val = entry.getValue();
+        Set<String> existingSet = fullSnapshot.get(key);
+        if (existingSet == null) {
+          fullSnapshot.put(key, val);
+          continue;
+        }
+        existingSet.addAll(val);
+      }
+    }
+    return fullSnapshot;
+  }
+
+  @Override
+  public void close() {
+    threadPool.shutdownNow();
+    try {
+      threadPool.awaitTermination(1, TimeUnit.SECONDS);
+    } catch (InterruptedException ignored) {
+      LOGGER.warn("Interrupted shutdown");
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  /**
+   * Intern a string but only if it is not null
+   * @param arg String to be interned, may be null
+   * @return interned string or null
+   */
+  static String safeIntern(String arg) {
+    return (arg != null) ? arg.intern() : null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/747c2260/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSClient.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSClient.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSClient.java
new file mode 100644
index 0000000..86ff47e
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSClient.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+
+/**
+ * AutoCloseable wrapper around HiveMetaStoreClient.
+ * It is only used to provide try-with-resource semantics for
+ * {@link HiveMetaStoreClient}.
+ */
+class HMSClient implements AutoCloseable {
+  private final HiveMetaStoreClient client;
+  private boolean valid;
+
+  HMSClient(HiveMetaStoreClient client) {
+    this.client = Preconditions.checkNotNull(client);
+    valid = true;
+  }
+
+  public HiveMetaStoreClient getClient() {
+    return client;
+  }
+
+  public void invalidate() {
+    if (valid) {
+      client.close();
+      valid = false;
+    }
+  }
+
+  @Override
+  public void close() {
+    if (valid) {
+      client.close();
+      valid = false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/747c2260/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
index 1f7eb18..2d581f7 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
@@ -18,7 +18,6 @@
 package org.apache.sentry.service.thrift;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -27,17 +26,12 @@ import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SaslRpcServer;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hive.hcatalog.messaging.HCatEventMessage;
 import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
 import org.apache.sentry.core.common.exception.SentryInvalidHMSEventException;
 import org.apache.sentry.core.common.exception.SentryInvalidInputException;
 import org.apache.sentry.core.common.exception.SentryNoSuchObjectException;
 import org.apache.sentry.hdfs.PermissionsUpdate;
-import org.apache.sentry.hdfs.FullUpdateInitializer;
 import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges;
 import org.apache.sentry.provider.db.SentryPolicyStorePlugin;
 import org.apache.sentry.provider.db.service.persistent.SentryStore;
@@ -50,10 +44,8 @@ import org.apache.sentry.binding.metastore.messaging.json.*;
 import javax.jdo.JDODataStoreException;
 import javax.security.auth.login.LoginException;
 
-import java.io.File;
 import java.io.IOException;
 import java.net.SocketException;
-import java.security.PrivilegedExceptionAction;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -73,32 +65,33 @@ import static org.apache.sentry.hdfs.Updateable.Update;
 @SuppressWarnings("PMD")
 public class HMSFollower implements Runnable, AutoCloseable {
   private static final Logger LOGGER = LoggerFactory.getLogger(HMSFollower.class);
+  private HiveSimpleConnectionFactory hiveConnectionFactory;
   // Track the latest eventId of the event that has been logged. So we don't log the same message
   private long lastLoggedEventId = SentryStore.EMPTY_CHANGE_ID;
   private static boolean connectedToHMS = false;
-  private HiveMetaStoreClient client;
-  private SentryKerberosContext kerberosContext;
+  private HMSClient client;
   private final Configuration authzConf;
-  private boolean kerberos;
   private final SentryStore sentryStore;
   private String hiveInstance;
 
   private boolean needLogHMSSupportReady = true;
   private final LeaderStatusMonitor leaderMonitor;
 
-  HMSFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor) {
-    LOGGER.info("HMSFollower is being initialized");
+  HMSFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor,
+              HiveSimpleConnectionFactory hiveConnectionFactory) {
     authzConf = conf;
     this.leaderMonitor = leaderMonitor;
     sentryStore = store;
+    this.hiveConnectionFactory = hiveConnectionFactory;
   }
 
   @VisibleForTesting
-  HMSFollower(Configuration conf, SentryStore sentryStore, String hiveInstance) {
-    this.authzConf = conf;
-    this.sentryStore = sentryStore;
+  HMSFollower(Configuration conf, SentryStore sentryStore, String hiveInstance)
+      throws IOException, LoginException {
+    this(conf, sentryStore, null, null);
     this.hiveInstance = hiveInstance;
-    this.leaderMonitor = null;
+    hiveConnectionFactory = new HiveSimpleConnectionFactory(conf, new HiveConf());
+    hiveConnectionFactory.init();
   }
 
   @VisibleForTesting
@@ -110,6 +103,11 @@ public class HMSFollower implements Runnable, AutoCloseable {
   public void close() {
     // Close any outstanding connections to HMS
     closeHMSConnection();
+    try {
+      hiveConnectionFactory.close();
+    } catch (Exception e) {
+      LOGGER.error("failed to close Hive Connection Factory", e);
+    }
   }
 
   /**
@@ -117,77 +115,13 @@ public class HMSFollower implements Runnable, AutoCloseable {
    * Throws @LoginException if Kerberos context creation failed using Sentry's kerberos credentials
    * Throws @MetaException if there was a problem on creating an HMSClient
    */
-  private HiveMetaStoreClient getMetaStoreClient(Configuration conf)
-    throws IOException, InterruptedException, LoginException, MetaException {
-    if (client != null) {
-      return client;
-    }
-
-    final HiveConf hiveConf = new HiveConf();
-    hiveInstance = hiveConf.get(HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME.getVar());
-
-    String principal, keytab;
-
-    //TODO: Is this the right(standard) way to create a HMS client? HiveMetastoreClientFactoryImpl?
-    //TODO: Check if HMS is using kerberos instead of relying on Sentry conf
-    kerberos = ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase(
-      conf.get(ServiceConstants.ServerConfig.SECURITY_MODE, ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS).trim());
-    if (kerberos) {
-      LOGGER.info("Making a kerberos connection to HMS");
-      try {
-        int port = conf.getInt(ServiceConstants.ServerConfig.RPC_PORT, ServiceConstants.ServerConfig.RPC_PORT_DEFAULT);
-        String rawPrincipal = Preconditions.checkNotNull(conf.get(ServiceConstants.ServerConfig.PRINCIPAL),
-          ServiceConstants.ServerConfig.PRINCIPAL + " is required");
-        principal = SecurityUtil.getServerPrincipal(rawPrincipal, NetUtils.createSocketAddr(
-          conf.get(ServiceConstants.ServerConfig.RPC_ADDRESS, ServiceConstants.ServerConfig.RPC_ADDRESS_DEFAULT), port).getAddress());
-      } catch (IOException io) {
-        throw new RuntimeException("Can't translate kerberos principal'", io);
-      }
-
-      LOGGER.info("Using kerberos principal: " + principal);
-      final String[] principalParts = SaslRpcServer.splitKerberosName(principal);
-      Preconditions.checkArgument(principalParts.length == 3,
-        "Kerberos principal should have 3 parts: " + principal);
-
-      keytab = Preconditions.checkNotNull(conf.get(ServiceConstants.ServerConfig.KEY_TAB),
-        ServiceConstants.ServerConfig.KEY_TAB + " is required");
-      File keytabFile = new File(keytab);
-      Preconditions.checkState(keytabFile.isFile() && keytabFile.canRead(),
-        "Keytab " + keytab + " does not exist or is not readable.");
-
-      try {
-        // Instantiating SentryKerberosContext in non-server mode handles the ticket renewal.
-        kerberosContext = new SentryKerberosContext(principal, keytab, false);
-
-        UserGroupInformation.setConfiguration(hiveConf);
-        UserGroupInformation clientUGI = UserGroupInformation.getUGIFromSubject(kerberosContext.getSubject());
-
-        // HiveMetaStoreClient handles the connection retry logic to HMS and can be configured using properties:
-        // hive.metastore.connect.retries, hive.metastore.client.connect.retry.delay
-        client = clientUGI.doAs(new PrivilegedExceptionAction<HiveMetaStoreClient>() {
-          @Override
-          public HiveMetaStoreClient run() throws Exception {
-            return new HiveMetaStoreClient(hiveConf);
-          }
-        });
-        LOGGER.info("Secure connection established with HMS");
-      } catch (LoginException e) {
-        // Kerberos login failed
-        LOGGER.error("Failed to setup kerberos context.");
-        throw e;
-      } finally {
-        // Shutdown kerberos context if HMS connection failed to setup to avoid thread leaks.
-        if ((kerberosContext != null) && (client == null)) {
-          kerberosContext.shutDown();
-          kerberosContext = null;
-        }
-      }
-    } else {
-      //This is only for testing purposes. Sentry strongly recommends strong authentication
-      client = new HiveMetaStoreClient(hiveConf);
-      LOGGER.info("Established non-secure connection with HMS");
+  private HiveMetaStoreClient getMetaStoreClient()
+    throws IOException, InterruptedException, MetaException {
+    if (client == null) {
+      client = hiveConnectionFactory.connect();
+      connectedToHMS = true;
     }
-    return client;
+    return client.getClient();
   }
 
   @Override
@@ -209,7 +143,7 @@ public class HMSFollower implements Runnable, AutoCloseable {
       closeHMSConnection();
       return;
     }
-    processHiveMetastoreUpdates(lastProcessedNotificationID);
+    processHiveMetastoreUpdates();
   }
 
   /**
@@ -236,26 +170,11 @@ public class HMSFollower implements Runnable, AutoCloseable {
    *
    * Clients connections waiting for an event notification will be woken up afterwards.
    */
-  private void processHiveMetastoreUpdates(Long lastProcessedNotificationID) {
-    if (client == null) {
-      try {
-        client = getMetaStoreClient(authzConf);
-        if (client == null) {
-          //TODO: Do we want to throw an exception after a certain timeout?
-          return;
-        } else {
-          connectedToHMS = true;
-          LOGGER.info("HMSFollower of Sentry successfully connected to HMS");
-        }
-      } catch (Throwable e) {
-        LOGGER.error("HMSFollower cannot connect to HMS!!", e);
-        return;
-      }
-    }
-
+  private void processHiveMetastoreUpdates() {
     try {
       // Decision of taking full snapshot is based on AuthzPathsMapping information persisted
       // in the sentry persistent store. If AuthzPathsMapping is empty, shapshot is needed.
+      Long lastProcessedNotificationID;
       if (sentryStore.isAuthzPathsMappingEmpty()) {
         // TODO: expose time used for full update in the metrics
 
@@ -270,27 +189,26 @@ public class HMSFollower implements Runnable, AutoCloseable {
         // will be dropped. A new attempts will be made after 500 milliseconds when
         // HMSFollower run again.
 
-        Map<String, Set<String>> pathsFullSnapshot;
-        CurrentNotificationEventId eventIDBefore = client.getCurrentNotificationEventId();
-        LOGGER.info(String.format("Before fetching hive full snapshot, Current NotificationID = %s.", eventIDBefore));
+        CurrentNotificationEventId eventIDBefore = getMetaStoreClient().getCurrentNotificationEventId();
+        LOGGER.info("Before fetching hive full snapshot, Current NotificationID = {}", eventIDBefore);
 
-        pathsFullSnapshot = fetchFullUpdate();
+        Map<String, Set<String>> pathsFullSnapshot = fetchFullUpdate();
         if(pathsFullSnapshot.isEmpty()) {
           LOGGER.info("Hive full snapshot is Empty. Perhaps, HMS does not have any data");
           return;
         }
 
-        CurrentNotificationEventId eventIDAfter = client.getCurrentNotificationEventId();
-        LOGGER.info(String.format("After fetching hive full snapshot, Current NotificationID = %s.", eventIDAfter));
+        CurrentNotificationEventId eventIDAfter = getMetaStoreClient().getCurrentNotificationEventId();
+        LOGGER.info("After fetching hive full snapshot, Current NotificationID = {}", eventIDAfter);
 
         if (!eventIDBefore.equals(eventIDAfter)) {
-          LOGGER.error("#### Fail to get a point-in-time hive full snapshot !! Current NotificationID = " +
-            eventIDAfter.toString());
+          LOGGER.error("Fail to get a point-in-time hive full snapshot. Current ID = {}",
+            eventIDAfter);
           return;
         }
 
-        LOGGER.info(String.format("Successfully fetched hive full snapshot, Current NotificationID = %s.",
-          eventIDAfter));
+        LOGGER.info("Successfully fetched hive full snapshot, Current NotificationID = {}",
+          eventIDAfter);
         // As eventIDAfter is the last event that was processed, eventIDAfter is used to update
         // lastProcessedNotificationID instead of getting it from persistent store.
         lastProcessedNotificationID = eventIDAfter.getEventId();
@@ -314,18 +232,18 @@ public class HMSFollower implements Runnable, AutoCloseable {
       // HIVE-15761: Currently getNextNotification API may return an empty
       // NotificationEventResponse causing TProtocolException.
       // Workaround: Only processes the notification events newer than the last updated one.
-      CurrentNotificationEventId eventId = client.getCurrentNotificationEventId();
+      CurrentNotificationEventId eventId = getMetaStoreClient().getCurrentNotificationEventId();
       LOGGER.debug("Last Notification in HMS {} lastProcessedNotificationID is {}",
         eventId.getEventId(), lastProcessedNotificationID);
       if (eventId.getEventId() > lastProcessedNotificationID) {
         NotificationEventResponse response =
-          client.getNextNotification(lastProcessedNotificationID, Integer.MAX_VALUE, null);
+          getMetaStoreClient().getNextNotification(lastProcessedNotificationID, Integer.MAX_VALUE, null);
         if (response.isSetEvents()) {
           if (!response.getEvents().isEmpty()) {
             if (lastProcessedNotificationID != lastLoggedEventId) {
               // Only log when there are updates and the notification ID has changed.
-              LOGGER.debug(String.format("lastProcessedNotificationID = %s. Processing %s events",
-                lastProcessedNotificationID, response.getEvents().size()));
+              LOGGER.debug("lastProcessedNotificationID = {}. Processing {} events",
+                      lastProcessedNotificationID, response.getEvents().size());
               lastLoggedEventId = lastProcessedNotificationID;
             }
 
@@ -337,6 +255,7 @@ public class HMSFollower implements Runnable, AutoCloseable {
       // If the underlying exception is around socket exception, it is better to retry connection to HMS
       if (e.getCause() instanceof SocketException) {
         LOGGER.error("Encountered Socket Exception during fetching Notification entries, will reconnect to HMS", e);
+        client.invalidate();
         closeHMSConnection();
       } else {
         LOGGER.error("ThriftException occured fetching Notification entries, will try", e);
@@ -360,16 +279,10 @@ public class HMSFollower implements Runnable, AutoCloseable {
       if (client != null) {
         LOGGER.info("Closing the HMS client connection");
         client.close();
+        connectedToHMS = false;
       }
-      if (kerberosContext != null) {
-        LOGGER.info("Shutting down kerberos context associated with the HMS client connection");
-        kerberosContext.shutDown();
-      }
-    } catch (LoginException le) {
-      LOGGER.warn("Failed to stop kerberos context (potential to cause thread leak)", le);
     } finally {
       client = null;
-      kerberosContext = null;
     }
   }
 
@@ -385,7 +298,8 @@ public class HMSFollower implements Runnable, AutoCloseable {
   private Map<String, Set<String>> fetchFullUpdate()
     throws TException, ExecutionException {
     LOGGER.info("Request full HMS snapshot");
-    try (FullUpdateInitializer updateInitializer = new FullUpdateInitializer(client, authzConf)) {
+    try (FullUpdateInitializer updateInitializer =
+                 new FullUpdateInitializer(hiveConnectionFactory, authzConf)) {
       Map<String, Set<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot();
       LOGGER.info("Obtained full HMS snapshot");
       return pathsUpdate;

http://git-wip-us.apache.org/repos/asf/sentry/blob/747c2260/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java
new file mode 100644
index 0000000..62542c3
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveConnectionFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import org.apache.hadoop.hive.metastore.api.MetaException;
+
+import java.io.IOException;
+
+public interface HiveConnectionFactory extends AutoCloseable {
+  /**
+   * Open a new connection to HMS.
+   *
+   * @return connection to HMS.
+   * @throws IOException          if connection establishement failed.
+   * @throws InterruptedException if connection establishment was interrupted.
+   * @throws MetaException        if connection establishement failed.
+   */
+  HMSClient connect() throws IOException, InterruptedException, MetaException;
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/747c2260/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java
new file mode 100644
index 0000000..3d67401
--- /dev/null
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveSimpleConnectionFactory.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.service.thrift;
+
+import com.google.common.base.Preconditions;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import javax.security.auth.login.LoginException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Factory used to generate Hive connections.
+ * Supports insecure and Kerberos connections.
+ * For Kerberos connections starts the ticket renewal thread and sets
+ * up Kerberos credentials.
+ */
+public final class HiveSimpleConnectionFactory implements HiveConnectionFactory {
+  private static final Logger LOGGER = LoggerFactory.getLogger(HiveSimpleConnectionFactory.class);
+
+  /** Sentty configuration */
+  private final Configuration conf;
+  /** Hive configuration */
+  private final HiveConf hiveConf;
+  private final boolean insecure;
+  private SentryKerberosContext kerberosContext = null;
+
+  HiveSimpleConnectionFactory(Configuration sentryConf, HiveConf hiveConf) {
+    this.conf = sentryConf;
+    this.hiveConf = hiveConf;
+    insecure = !ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase(
+        sentryConf.get(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_NONE).trim());
+  }
+
+  /**
+   * Initialize the Factory.
+   * For insecure connections there is nothing to initialize.
+   * For Kerberos connections sets up ticket renewal thread.
+   * @throws IOException
+   * @throws LoginException
+   */
+  void init() throws IOException, LoginException {
+    if (insecure) {
+      LOGGER.info("Using insecure connection to HMS");
+      return;
+    }
+
+    int port = conf.getInt(ServerConfig.RPC_PORT, ServerConfig.RPC_PORT_DEFAULT);
+    String rawPrincipal = Preconditions.checkNotNull(conf.get(ServerConfig.PRINCIPAL),
+        "%s is required", ServerConfig.PRINCIPAL);
+    String principal = SecurityUtil.getServerPrincipal(rawPrincipal, NetUtils.createSocketAddr(
+        conf.get(ServerConfig.RPC_ADDRESS, ServerConfig.RPC_ADDRESS_DEFAULT),
+        port).getAddress());
+    LOGGER.debug("Opening kerberos connection to HMS using kerberos principal {}", principal);
+    String[] principalParts = SaslRpcServer.splitKerberosName(principal);
+    Preconditions.checkArgument(principalParts.length == 3,
+        "Kerberos principal %s should have 3 parts", principal);
+    String keytab = Preconditions.checkNotNull(conf.get(ServerConfig.KEY_TAB),
+        "Configuration is missing required %s paraeter", ServerConfig.KEY_TAB);
+    File keytabFile = new File(keytab);
+    Preconditions.checkState(keytabFile.isFile() && keytabFile.canRead(),
+        "Keytab %s does not exist or is not readable", keytab);
+    // Instantiating SentryKerberosContext in non-server mode handles the ticket renewal.
+    kerberosContext = new SentryKerberosContext(principal, keytab, false);
+    UserGroupInformation.setConfiguration(conf);
+    LOGGER.info("Using secure connection to HMS");
+  }
+
+  /**
+   * Connect to HMS in unsecure mode or in Kerberos mode according to config.
+   *
+   * @return HMS connection
+   * @throws IOException          if could not establish connection
+   * @throws InterruptedException if connection was interrupted
+   * @throws MetaException        if other errors happened
+   */
+  public HMSClient connect() throws IOException, InterruptedException, MetaException {
+    if (insecure) {
+      return new HMSClient(new HiveMetaStoreClient(hiveConf));
+    }
+    UserGroupInformation clientUGI =
+        UserGroupInformation.getUGIFromSubject(kerberosContext.getSubject());
+    return new HMSClient(clientUGI.doAs(new PrivilegedExceptionAction<HiveMetaStoreClient>() {
+      @Override
+      public HiveMetaStoreClient run() throws MetaException {
+        return new HiveMetaStoreClient(hiveConf);
+      }
+    }));
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (kerberosContext != null) {
+      kerberosContext.shutDown();
+      kerberosContext = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sentry/blob/747c2260/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryKerberosContext.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryKerberosContext.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryKerberosContext.java
index 8d78d1d..edb8006 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryKerberosContext.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryKerberosContext.java
@@ -40,9 +40,9 @@ public class SentryKerberosContext implements Runnable {
   private LoginContext loginContext;
   private Subject subject;
   private final javax.security.auth.login.Configuration kerberosConfig;
-  @Deprecated
+  
   private Thread renewerThread;
-  @Deprecated
+
   private boolean shutDownRenewer = false;
 
   public SentryKerberosContext(String principal, String keyTab, boolean server)
@@ -113,7 +113,6 @@ public class SentryKerberosContext implements Runnable {
    * Ticket renewer thread
    * wait till 80% time interval left on the ticket and then renew it
    */
-  @Deprecated
   @Override
   public void run() {
     try {
@@ -145,7 +144,6 @@ public class SentryKerberosContext implements Runnable {
     }
   }
 
-  @Deprecated
   public void startRenewerThread() {
     renewerThread = new Thread(this);
     renewerThread.start();

http://git-wip-us.apache.org/repos/asf/sentry/blob/747c2260/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
index ec938da..322197b 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java
@@ -40,6 +40,7 @@ import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
@@ -74,6 +75,7 @@ import static org.apache.sentry.core.common.utils.SigUtils.registerSigListener;
 public class SentryService implements Callable, SigUtils.SigListener {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(SentryService.class);
+  private HiveSimpleConnectionFactory hiveConnectionFactory;
 
   private enum Status {
     NOT_STARTED,
@@ -276,7 +278,7 @@ public class SentryService implements Callable, SigUtils.SigListener {
     thriftServer.serve();
   }
 
-  private void startHMSFollower(Configuration conf) throws Exception{
+  private void startHMSFollower(Configuration conf) throws Exception {
     if (!hdfsSyncEnabled) {
       LOGGER.info("HMS follower is not started because HDFS sync is disabled.");
       return;
@@ -296,13 +298,11 @@ public class SentryService implements Callable, SigUtils.SigListener {
 
     Preconditions.checkState(hmsFollower == null);
     Preconditions.checkState(hmsFollowerExecutor == null);
+    Preconditions.checkState(hiveConnectionFactory == null);
 
-    try {
-      hmsFollower = new HMSFollower(conf, sentryStore, leaderMonitor);
-    } catch (Exception ex) {
-      LOGGER.error("Could not create HMSFollower", ex);
-      throw ex;
-    }
+    hiveConnectionFactory = new HiveSimpleConnectionFactory(conf, new HiveConf());
+    hiveConnectionFactory.init();
+    hmsFollower = new HMSFollower(conf, sentryStore, leaderMonitor, hiveConnectionFactory);
 
     long initDelay = conf.getLong(ServerConfig.SENTRY_HMSFOLLOWER_INIT_DELAY_MILLS,
             ServerConfig.SENTRY_HMSFOLLOWER_INIT_DELAY_MILLS_DEFAULT);
@@ -334,6 +334,7 @@ public class SentryService implements Callable, SigUtils.SigListener {
 
     Preconditions.checkNotNull(hmsFollowerExecutor);
     Preconditions.checkNotNull(hmsFollower);
+    Preconditions.checkNotNull(hiveConnectionFactory);
 
     // use follower scheduling interval as timeout for shutting down its executor as
     // such scheduling interval should be an upper bound of how long the task normally takes to finish
@@ -343,7 +344,13 @@ public class SentryService implements Callable, SigUtils.SigListener {
       SentryServiceUtil.shutdownAndAwaitTermination(hmsFollowerExecutor, "hmsFollowerExecutor",
               timeoutValue, TimeUnit.MILLISECONDS, LOGGER);
     } finally {
+      try {
+        hiveConnectionFactory.close();
+      } catch (Exception e) {
+        LOGGER.error("Can't close HiveConnectionFactory", e);
+      }
       hmsFollowerExecutor = null;
+      hiveConnectionFactory = null;
       try {
         // close connections
         hmsFollower.close();